1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-24 20:42:47 +02:00
seaweedfs/weed/s3api/s3api_governance_permissions_test.go
2025-07-13 16:21:36 -07:00

599 lines
21 KiB
Go

package s3api
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
// TestCheckGovernanceBypassPermissionResourceGeneration tests that the function
// correctly generates resource paths for the permission check
func TestCheckGovernanceBypassPermissionResourceGeneration(t *testing.T) {
tests := []struct {
name string
bucket string
object string
expectedPath string
description string
}{
{
name: "simple_object",
bucket: "test-bucket",
object: "test-object.txt",
expectedPath: "test-bucket/test-object.txt",
description: "Simple bucket and object should be joined with slash",
},
{
name: "object_with_leading_slash",
bucket: "test-bucket",
object: "/test-object.txt",
expectedPath: "test-bucket/test-object.txt",
description: "Leading slash should be trimmed from object name",
},
{
name: "nested_object",
bucket: "test-bucket",
object: "/folder/subfolder/test-object.txt",
expectedPath: "test-bucket/folder/subfolder/test-object.txt",
description: "Nested object path should be handled correctly",
},
{
name: "empty_object",
bucket: "test-bucket",
object: "",
expectedPath: "test-bucket/",
description: "Empty object should result in bucket with trailing slash",
},
{
name: "root_object",
bucket: "test-bucket",
object: "/",
expectedPath: "test-bucket/",
description: "Root object should result in bucket with trailing slash",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test the resource generation logic used in checkGovernanceBypassPermission
resource := strings.TrimPrefix(tt.object, "/")
actualPath := tt.bucket + "/" + resource
if actualPath != tt.expectedPath {
t.Errorf("Resource path generation failed. Expected: %s, Got: %s. %s",
tt.expectedPath, actualPath, tt.description)
}
})
}
}
// TestCheckGovernanceBypassPermissionActionGeneration tests that the function
// correctly generates action strings for IAM checking
func TestCheckGovernanceBypassPermissionActionGeneration(t *testing.T) {
tests := []struct {
name string
bucket string
object string
expectedBypassAction string
expectedAdminAction string
description string
}{
{
name: "bypass_action_generation",
bucket: "test-bucket",
object: "test-object.txt",
expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
expectedAdminAction: "Admin:test-bucket/test-object.txt",
description: "Actions should be properly formatted with resource path",
},
{
name: "leading_slash_handling",
bucket: "test-bucket",
object: "/test-object.txt",
expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
expectedAdminAction: "Admin:test-bucket/test-object.txt",
description: "Leading slash should be trimmed in action generation",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test the action generation logic used in checkGovernanceBypassPermission
resource := strings.TrimPrefix(tt.object, "/")
resourcePath := tt.bucket + "/" + resource
bypassAction := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + resourcePath
adminAction := s3_constants.ACTION_ADMIN + ":" + resourcePath
if bypassAction != tt.expectedBypassAction {
t.Errorf("Bypass action generation failed. Expected: %s, Got: %s. %s",
tt.expectedBypassAction, bypassAction, tt.description)
}
if adminAction != tt.expectedAdminAction {
t.Errorf("Admin action generation failed. Expected: %s, Got: %s. %s",
tt.expectedAdminAction, adminAction, tt.description)
}
})
}
}
// TestCheckGovernanceBypassPermissionErrorHandling tests error handling scenarios
func TestCheckGovernanceBypassPermissionErrorHandling(t *testing.T) {
// Note: This test demonstrates the expected behavior for different error scenarios
// without requiring full IAM setup
tests := []struct {
name string
bucket string
object string
description string
}{
{
name: "empty_bucket",
bucket: "",
object: "test-object.txt",
description: "Empty bucket should be handled gracefully",
},
{
name: "special_characters",
bucket: "test-bucket",
object: "test object with spaces.txt",
description: "Objects with special characters should be handled",
},
{
name: "unicode_characters",
bucket: "test-bucket",
object: "测试文件.txt",
description: "Objects with unicode characters should be handled",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test that the function doesn't panic with various inputs
// This would normally call checkGovernanceBypassPermission
// but since we don't have a full S3ApiServer setup, we just test
// that the resource generation logic works without panicking
resource := strings.TrimPrefix(tt.object, "/")
resourcePath := tt.bucket + "/" + resource
// Verify the resource path is generated
if resourcePath == "" {
t.Errorf("Resource path should not be empty for test case: %s", tt.description)
}
t.Logf("Generated resource path for %s: %s", tt.description, resourcePath)
})
}
}
// TestCheckGovernanceBypassPermissionIntegrationBehavior documents the expected behavior
// when integrated with a full IAM system
func TestCheckGovernanceBypassPermissionIntegrationBehavior(t *testing.T) {
t.Skip("Documentation test - describes expected behavior with full IAM integration")
// This test documents the expected behavior when checkGovernanceBypassPermission
// is called with a full IAM system:
//
// 1. Function calls s3a.iam.authRequest() with the bypass action
// 2. If authRequest returns errCode != s3err.ErrNone, function returns false
// 3. If authRequest succeeds, function checks identity.canDo() with the bypass action
// 4. If canDo() returns true, function returns true
// 5. If bypass permission fails, function checks admin action with identity.canDo()
// 6. If admin action succeeds, function returns true and logs admin access
// 7. If all checks fail, function returns false
//
// The function correctly uses:
// - s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION for bypass permission
// - s3_constants.ACTION_ADMIN for admin permission
// - Proper resource path generation with bucket/object format
// - Trimming of leading slashes from object names
}
// TestGovernanceBypassPermission was removed because it tested the old
// insecure behavior of trusting the AmzIsAdmin header. The new implementation
// uses proper IAM authentication instead of relying on client-provided headers.
// Test specifically for users with IAM bypass permission
func TestGovernanceBypassWithIAMPermission(t *testing.T) {
// This test demonstrates the expected behavior for non-admin users with bypass permission
// In a real implementation, this would integrate with the full IAM system
t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
// The expected behavior would be:
// 1. Non-admin user makes request with bypass header
// 2. checkGovernanceBypassPermission calls s3a.iam.authRequest
// 3. authRequest validates user identity and checks permissions
// 4. If user has s3:BypassGovernanceRetention permission, return true
// 5. Otherwise return false
// For now, the function correctly returns false for non-admin users
// when the IAM system doesn't have the user configured with bypass permission
}
func TestGovernancePermissionIntegration(t *testing.T) {
// Note: This test demonstrates the expected integration behavior
// In a real implementation, this would require setting up a proper IAM mock
// with identities that have the bypass governance permission
t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
// This test would verify:
// 1. User with BypassGovernanceRetention permission can bypass governance
// 2. User without permission cannot bypass governance
// 3. Admin users can always bypass governance
// 4. Anonymous users cannot bypass governance
}
func TestGovernanceBypassHeader(t *testing.T) {
tests := []struct {
name string
headerValue string
expectedResult bool
description string
}{
{
name: "bypass_header_true",
headerValue: "true",
expectedResult: true,
description: "Header with 'true' value should enable bypass",
},
{
name: "bypass_header_false",
headerValue: "false",
expectedResult: false,
description: "Header with 'false' value should not enable bypass",
},
{
name: "bypass_header_empty",
headerValue: "",
expectedResult: false,
description: "Empty header should not enable bypass",
},
{
name: "bypass_header_invalid",
headerValue: "invalid",
expectedResult: false,
description: "Invalid header value should not enable bypass",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := httptest.NewRequest("DELETE", "/bucket/object", nil)
if tt.headerValue != "" {
req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
}
result := req.Header.Get("x-amz-bypass-governance-retention") == "true"
if result != tt.expectedResult {
t.Errorf("bypass header check = %v, want %v. %s", result, tt.expectedResult, tt.description)
}
})
}
}
func TestGovernanceRetentionModeChecking(t *testing.T) {
tests := []struct {
name string
retentionMode string
bypassGovernance bool
hasPermission bool
expectedError bool
expectedErrorType string
description string
}{
{
name: "compliance_mode_cannot_bypass",
retentionMode: s3_constants.RetentionModeCompliance,
bypassGovernance: true,
hasPermission: true,
expectedError: true,
expectedErrorType: "compliance mode",
description: "Compliance mode should not be bypassable even with permission",
},
{
name: "governance_mode_without_bypass",
retentionMode: s3_constants.RetentionModeGovernance,
bypassGovernance: false,
hasPermission: false,
expectedError: true,
expectedErrorType: "governance mode",
description: "Governance mode should be blocked without bypass",
},
{
name: "governance_mode_with_bypass_no_permission",
retentionMode: s3_constants.RetentionModeGovernance,
bypassGovernance: true,
hasPermission: false,
expectedError: true,
expectedErrorType: "permission",
description: "Governance mode bypass should fail without permission",
},
{
name: "governance_mode_with_bypass_and_permission",
retentionMode: s3_constants.RetentionModeGovernance,
bypassGovernance: true,
hasPermission: true,
expectedError: false,
expectedErrorType: "",
description: "Governance mode bypass should succeed with permission",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test validates the logic without actually needing the full implementation
// This demonstrates the expected behavior patterns
var hasError bool
var errorType string
if tt.retentionMode == s3_constants.RetentionModeCompliance {
hasError = true
errorType = "compliance mode"
} else if tt.retentionMode == s3_constants.RetentionModeGovernance {
if !tt.bypassGovernance {
hasError = true
errorType = "governance mode"
} else if !tt.hasPermission {
hasError = true
errorType = "permission"
}
}
if hasError != tt.expectedError {
t.Errorf("expected error: %v, got error: %v. %s", tt.expectedError, hasError, tt.description)
}
if tt.expectedError && !strings.Contains(errorType, tt.expectedErrorType) {
t.Errorf("expected error type containing '%s', got '%s'. %s", tt.expectedErrorType, errorType, tt.description)
}
})
}
}
func TestGovernancePermissionActionGeneration(t *testing.T) {
tests := []struct {
name string
bucket string
object string
expectedAction string
description string
}{
{
name: "bucket_and_object_action",
bucket: "test-bucket",
object: "/test-object", // Object has "/" prefix from GetBucketAndObject
expectedAction: "BypassGovernanceRetention:test-bucket/test-object",
description: "Action should be generated correctly for bucket and object",
},
{
name: "bucket_only_action",
bucket: "test-bucket",
object: "",
expectedAction: "BypassGovernanceRetention:test-bucket",
description: "Action should be generated correctly for bucket only",
},
{
name: "nested_object_action",
bucket: "test-bucket",
object: "/folder/subfolder/object", // Object has "/" prefix from GetBucketAndObject
expectedAction: "BypassGovernanceRetention:test-bucket/folder/subfolder/object",
description: "Action should be generated correctly for nested objects",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
action := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + tt.bucket + tt.object
if action != tt.expectedAction {
t.Errorf("generated action: %s, expected: %s. %s", action, tt.expectedAction, tt.description)
}
})
}
}
// TestGovernancePermissionEndToEnd tests the complete object lock permission flow
func TestGovernancePermissionEndToEnd(t *testing.T) {
t.Skip("End-to-end testing requires full S3 API server setup - demonstrates expected behavior")
// This test demonstrates the end-to-end flow that would be tested in a full integration test
// The checkObjectLockPermissions method is called by:
// 1. DeleteObjectHandler - when versioning is enabled and object lock is configured
// 2. DeleteMultipleObjectsHandler - for each object in versioned buckets
// 3. PutObjectHandler - via checkObjectLockPermissionsForPut for versioned buckets
// 4. PutObjectRetentionHandler - when setting retention on objects
//
// Each handler:
// - Extracts bypassGovernance from "x-amz-bypass-governance-retention" header
// - Calls checkObjectLockPermissions with the appropriate parameters
// - Handles the returned errors appropriately (ErrAccessDenied, etc.)
//
// The method integrates with the IAM system through checkGovernanceBypassPermission
// which validates the s3:BypassGovernanceRetention permission
}
// TestGovernancePermissionHTTPFlow tests the HTTP header processing and method calls
func TestGovernancePermissionHTTPFlow(t *testing.T) {
tests := []struct {
name string
headerValue string
expectedBypassGovernance bool
}{
{
name: "bypass_header_true",
headerValue: "true",
expectedBypassGovernance: true,
},
{
name: "bypass_header_false",
headerValue: "false",
expectedBypassGovernance: false,
},
{
name: "bypass_header_missing",
headerValue: "",
expectedBypassGovernance: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a mock HTTP request
req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
if tt.headerValue != "" {
req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
}
// Test the header processing logic used in handlers
bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
if bypassGovernance != tt.expectedBypassGovernance {
t.Errorf("Expected bypassGovernance to be %v, got %v", tt.expectedBypassGovernance, bypassGovernance)
}
})
}
}
// TestGovernancePermissionMethodCalls tests that the governance permission methods are called correctly
func TestGovernancePermissionMethodCalls(t *testing.T) {
// Test that demonstrates the method call pattern used in handlers
// This is the pattern used in DeleteObjectHandler:
t.Run("delete_object_handler_pattern", func(t *testing.T) {
req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
req.Header.Set("x-amz-bypass-governance-retention", "true")
// Extract parameters as done in the handler
bucket, object := s3_constants.GetBucketAndObject(req)
versionId := req.URL.Query().Get("versionId")
bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
// Verify the parameters are extracted correctly
// Note: The actual bucket and object extraction depends on the URL structure
t.Logf("Extracted bucket: %s, object: %s", bucket, object)
if versionId != "" {
t.Errorf("Expected versionId to be empty, got %v", versionId)
}
if !bypassGovernance {
t.Errorf("Expected bypassGovernance to be true")
}
})
// This is the pattern used in PutObjectHandler:
t.Run("put_object_handler_pattern", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/bucket/test-object", nil)
req.Header.Set("x-amz-bypass-governance-retention", "true")
// Extract parameters as done in the handler
bucket, object := s3_constants.GetBucketAndObject(req)
bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
versioningEnabled := true // Would be determined by isVersioningEnabled(bucket)
// Verify the parameters are extracted correctly
// Note: The actual bucket and object extraction depends on the URL structure
t.Logf("Extracted bucket: %s, object: %s", bucket, object)
if !bypassGovernance {
t.Errorf("Expected bypassGovernance to be true")
}
if !versioningEnabled {
t.Errorf("Expected versioningEnabled to be true")
}
})
}
// TestGovernanceBypassNotPermittedError tests that ErrGovernanceBypassNotPermitted
// is returned when bypass is requested but the user lacks permission
func TestGovernanceBypassNotPermittedError(t *testing.T) {
// Test the error constant itself
if ErrGovernanceBypassNotPermitted == nil {
t.Error("ErrGovernanceBypassNotPermitted should be defined")
}
// Verify the error message
expectedMessage := "user does not have permission to bypass governance retention"
if ErrGovernanceBypassNotPermitted.Error() != expectedMessage {
t.Errorf("expected error message '%s', got '%s'",
expectedMessage, ErrGovernanceBypassNotPermitted.Error())
}
// Test the scenario where this error should be returned
// This documents the expected behavior when:
// 1. Object is under governance retention
// 2. bypassGovernance is true
// 3. checkGovernanceBypassPermission returns false
testCases := []struct {
name string
retentionMode string
bypassGovernance bool
hasPermission bool
expectedError error
description string
}{
{
name: "governance_bypass_without_permission",
retentionMode: s3_constants.RetentionModeGovernance,
bypassGovernance: true,
hasPermission: false,
expectedError: ErrGovernanceBypassNotPermitted,
description: "Should return ErrGovernanceBypassNotPermitted when bypass is requested but user lacks permission",
},
{
name: "governance_bypass_with_permission",
retentionMode: s3_constants.RetentionModeGovernance,
bypassGovernance: true,
hasPermission: true,
expectedError: nil,
description: "Should succeed when bypass is requested and user has permission",
},
{
name: "governance_no_bypass",
retentionMode: s3_constants.RetentionModeGovernance,
bypassGovernance: false,
hasPermission: false,
expectedError: ErrGovernanceModeActive,
description: "Should return ErrGovernanceModeActive when bypass is not requested",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// This test documents the expected behavior pattern
// The actual checkObjectLockPermissions method implements this logic:
// if retention.Mode == s3_constants.RetentionModeGovernance {
// if !bypassGovernance {
// return ErrGovernanceModeActive
// }
// if !s3a.checkGovernanceBypassPermission(request, bucket, object) {
// return ErrGovernanceBypassNotPermitted
// }
// }
var simulatedError error
if tc.retentionMode == s3_constants.RetentionModeGovernance {
if !tc.bypassGovernance {
simulatedError = ErrGovernanceModeActive
} else if !tc.hasPermission {
simulatedError = ErrGovernanceBypassNotPermitted
}
}
if simulatedError != tc.expectedError {
t.Errorf("expected error %v, got %v. %s", tc.expectedError, simulatedError, tc.description)
}
// Verify ErrGovernanceBypassNotPermitted is returned in the right case
if tc.name == "governance_bypass_without_permission" && simulatedError != ErrGovernanceBypassNotPermitted {
t.Errorf("Test case should return ErrGovernanceBypassNotPermitted but got %v", simulatedError)
}
})
}
}