package s3api import ( "net/http" "net/http/httptest" "strings" "testing" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" ) // TestCheckGovernanceBypassPermissionResourceGeneration tests that the function // correctly generates resource paths for the permission check func TestCheckGovernanceBypassPermissionResourceGeneration(t *testing.T) { tests := []struct { name string bucket string object string expectedPath string description string }{ { name: "simple_object", bucket: "test-bucket", object: "test-object.txt", expectedPath: "test-bucket/test-object.txt", description: "Simple bucket and object should be joined with slash", }, { name: "object_with_leading_slash", bucket: "test-bucket", object: "/test-object.txt", expectedPath: "test-bucket/test-object.txt", description: "Leading slash should be trimmed from object name", }, { name: "nested_object", bucket: "test-bucket", object: "/folder/subfolder/test-object.txt", expectedPath: "test-bucket/folder/subfolder/test-object.txt", description: "Nested object path should be handled correctly", }, { name: "empty_object", bucket: "test-bucket", object: "", expectedPath: "test-bucket/", description: "Empty object should result in bucket with trailing slash", }, { name: "root_object", bucket: "test-bucket", object: "/", expectedPath: "test-bucket/", description: "Root object should result in bucket with trailing slash", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Test the resource generation logic used in checkGovernanceBypassPermission resource := strings.TrimPrefix(tt.object, "/") actualPath := tt.bucket + "/" + resource if actualPath != tt.expectedPath { t.Errorf("Resource path generation failed. Expected: %s, Got: %s. %s", tt.expectedPath, actualPath, tt.description) } }) } } // TestCheckGovernanceBypassPermissionActionGeneration tests that the function // correctly generates action strings for IAM checking func TestCheckGovernanceBypassPermissionActionGeneration(t *testing.T) { tests := []struct { name string bucket string object string expectedBypassAction string expectedAdminAction string description string }{ { name: "bypass_action_generation", bucket: "test-bucket", object: "test-object.txt", expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt", expectedAdminAction: "Admin:test-bucket/test-object.txt", description: "Actions should be properly formatted with resource path", }, { name: "leading_slash_handling", bucket: "test-bucket", object: "/test-object.txt", expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt", expectedAdminAction: "Admin:test-bucket/test-object.txt", description: "Leading slash should be trimmed in action generation", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Test the action generation logic used in checkGovernanceBypassPermission resource := strings.TrimPrefix(tt.object, "/") resourcePath := tt.bucket + "/" + resource bypassAction := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + resourcePath adminAction := s3_constants.ACTION_ADMIN + ":" + resourcePath if bypassAction != tt.expectedBypassAction { t.Errorf("Bypass action generation failed. Expected: %s, Got: %s. %s", tt.expectedBypassAction, bypassAction, tt.description) } if adminAction != tt.expectedAdminAction { t.Errorf("Admin action generation failed. Expected: %s, Got: %s. %s", tt.expectedAdminAction, adminAction, tt.description) } }) } } // TestCheckGovernanceBypassPermissionErrorHandling tests error handling scenarios func TestCheckGovernanceBypassPermissionErrorHandling(t *testing.T) { // Note: This test demonstrates the expected behavior for different error scenarios // without requiring full IAM setup tests := []struct { name string bucket string object string description string }{ { name: "empty_bucket", bucket: "", object: "test-object.txt", description: "Empty bucket should be handled gracefully", }, { name: "special_characters", bucket: "test-bucket", object: "test object with spaces.txt", description: "Objects with special characters should be handled", }, { name: "unicode_characters", bucket: "test-bucket", object: "测试文件.txt", description: "Objects with unicode characters should be handled", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Test that the function doesn't panic with various inputs // This would normally call checkGovernanceBypassPermission // but since we don't have a full S3ApiServer setup, we just test // that the resource generation logic works without panicking resource := strings.TrimPrefix(tt.object, "/") resourcePath := tt.bucket + "/" + resource // Verify the resource path is generated if resourcePath == "" { t.Errorf("Resource path should not be empty for test case: %s", tt.description) } t.Logf("Generated resource path for %s: %s", tt.description, resourcePath) }) } } // TestCheckGovernanceBypassPermissionIntegrationBehavior documents the expected behavior // when integrated with a full IAM system func TestCheckGovernanceBypassPermissionIntegrationBehavior(t *testing.T) { t.Skip("Documentation test - describes expected behavior with full IAM integration") // This test documents the expected behavior when checkGovernanceBypassPermission // is called with a full IAM system: // // 1. Function calls s3a.iam.authRequest() with the bypass action // 2. If authRequest returns errCode != s3err.ErrNone, function returns false // 3. If authRequest succeeds, function checks identity.canDo() with the bypass action // 4. If canDo() returns true, function returns true // 5. If bypass permission fails, function checks admin action with identity.canDo() // 6. If admin action succeeds, function returns true and logs admin access // 7. If all checks fail, function returns false // // The function correctly uses: // - s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION for bypass permission // - s3_constants.ACTION_ADMIN for admin permission // - Proper resource path generation with bucket/object format // - Trimming of leading slashes from object names } // TestGovernanceBypassPermission was removed because it tested the old // insecure behavior of trusting the AmzIsAdmin header. The new implementation // uses proper IAM authentication instead of relying on client-provided headers. // Test specifically for users with IAM bypass permission func TestGovernanceBypassWithIAMPermission(t *testing.T) { // This test demonstrates the expected behavior for non-admin users with bypass permission // In a real implementation, this would integrate with the full IAM system t.Skip("Integration test requires full IAM setup - demonstrates expected behavior") // The expected behavior would be: // 1. Non-admin user makes request with bypass header // 2. checkGovernanceBypassPermission calls s3a.iam.authRequest // 3. authRequest validates user identity and checks permissions // 4. If user has s3:BypassGovernanceRetention permission, return true // 5. Otherwise return false // For now, the function correctly returns false for non-admin users // when the IAM system doesn't have the user configured with bypass permission } func TestGovernancePermissionIntegration(t *testing.T) { // Note: This test demonstrates the expected integration behavior // In a real implementation, this would require setting up a proper IAM mock // with identities that have the bypass governance permission t.Skip("Integration test requires full IAM setup - demonstrates expected behavior") // This test would verify: // 1. User with BypassGovernanceRetention permission can bypass governance // 2. User without permission cannot bypass governance // 3. Admin users can always bypass governance // 4. Anonymous users cannot bypass governance } func TestGovernanceBypassHeader(t *testing.T) { tests := []struct { name string headerValue string expectedResult bool description string }{ { name: "bypass_header_true", headerValue: "true", expectedResult: true, description: "Header with 'true' value should enable bypass", }, { name: "bypass_header_false", headerValue: "false", expectedResult: false, description: "Header with 'false' value should not enable bypass", }, { name: "bypass_header_empty", headerValue: "", expectedResult: false, description: "Empty header should not enable bypass", }, { name: "bypass_header_invalid", headerValue: "invalid", expectedResult: false, description: "Invalid header value should not enable bypass", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { req := httptest.NewRequest("DELETE", "/bucket/object", nil) if tt.headerValue != "" { req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue) } result := req.Header.Get("x-amz-bypass-governance-retention") == "true" if result != tt.expectedResult { t.Errorf("bypass header check = %v, want %v. %s", result, tt.expectedResult, tt.description) } }) } } func TestGovernanceRetentionModeChecking(t *testing.T) { tests := []struct { name string retentionMode string bypassGovernance bool hasPermission bool expectedError bool expectedErrorType string description string }{ { name: "compliance_mode_cannot_bypass", retentionMode: s3_constants.RetentionModeCompliance, bypassGovernance: true, hasPermission: true, expectedError: true, expectedErrorType: "compliance mode", description: "Compliance mode should not be bypassable even with permission", }, { name: "governance_mode_without_bypass", retentionMode: s3_constants.RetentionModeGovernance, bypassGovernance: false, hasPermission: false, expectedError: true, expectedErrorType: "governance mode", description: "Governance mode should be blocked without bypass", }, { name: "governance_mode_with_bypass_no_permission", retentionMode: s3_constants.RetentionModeGovernance, bypassGovernance: true, hasPermission: false, expectedError: true, expectedErrorType: "permission", description: "Governance mode bypass should fail without permission", }, { name: "governance_mode_with_bypass_and_permission", retentionMode: s3_constants.RetentionModeGovernance, bypassGovernance: true, hasPermission: true, expectedError: false, expectedErrorType: "", description: "Governance mode bypass should succeed with permission", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Test validates the logic without actually needing the full implementation // This demonstrates the expected behavior patterns var hasError bool var errorType string if tt.retentionMode == s3_constants.RetentionModeCompliance { hasError = true errorType = "compliance mode" } else if tt.retentionMode == s3_constants.RetentionModeGovernance { if !tt.bypassGovernance { hasError = true errorType = "governance mode" } else if !tt.hasPermission { hasError = true errorType = "permission" } } if hasError != tt.expectedError { t.Errorf("expected error: %v, got error: %v. %s", tt.expectedError, hasError, tt.description) } if tt.expectedError && !strings.Contains(errorType, tt.expectedErrorType) { t.Errorf("expected error type containing '%s', got '%s'. %s", tt.expectedErrorType, errorType, tt.description) } }) } } func TestGovernancePermissionActionGeneration(t *testing.T) { tests := []struct { name string bucket string object string expectedAction string description string }{ { name: "bucket_and_object_action", bucket: "test-bucket", object: "/test-object", // Object has "/" prefix from GetBucketAndObject expectedAction: "BypassGovernanceRetention:test-bucket/test-object", description: "Action should be generated correctly for bucket and object", }, { name: "bucket_only_action", bucket: "test-bucket", object: "", expectedAction: "BypassGovernanceRetention:test-bucket", description: "Action should be generated correctly for bucket only", }, { name: "nested_object_action", bucket: "test-bucket", object: "/folder/subfolder/object", // Object has "/" prefix from GetBucketAndObject expectedAction: "BypassGovernanceRetention:test-bucket/folder/subfolder/object", description: "Action should be generated correctly for nested objects", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { action := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + tt.bucket + tt.object if action != tt.expectedAction { t.Errorf("generated action: %s, expected: %s. %s", action, tt.expectedAction, tt.description) } }) } } // TestGovernancePermissionEndToEnd tests the complete object lock permission flow func TestGovernancePermissionEndToEnd(t *testing.T) { t.Skip("End-to-end testing requires full S3 API server setup - demonstrates expected behavior") // This test demonstrates the end-to-end flow that would be tested in a full integration test // The checkObjectLockPermissions method is called by: // 1. DeleteObjectHandler - when versioning is enabled and object lock is configured // 2. DeleteMultipleObjectsHandler - for each object in versioned buckets // 3. PutObjectHandler - via checkObjectLockPermissionsForPut for versioned buckets // 4. PutObjectRetentionHandler - when setting retention on objects // // Each handler: // - Extracts bypassGovernance from "x-amz-bypass-governance-retention" header // - Calls checkObjectLockPermissions with the appropriate parameters // - Handles the returned errors appropriately (ErrAccessDenied, etc.) // // The method integrates with the IAM system through checkGovernanceBypassPermission // which validates the s3:BypassGovernanceRetention permission } // TestGovernancePermissionHTTPFlow tests the HTTP header processing and method calls func TestGovernancePermissionHTTPFlow(t *testing.T) { tests := []struct { name string headerValue string expectedBypassGovernance bool }{ { name: "bypass_header_true", headerValue: "true", expectedBypassGovernance: true, }, { name: "bypass_header_false", headerValue: "false", expectedBypassGovernance: false, }, { name: "bypass_header_missing", headerValue: "", expectedBypassGovernance: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create a mock HTTP request req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil) if tt.headerValue != "" { req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue) } // Test the header processing logic used in handlers bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true" if bypassGovernance != tt.expectedBypassGovernance { t.Errorf("Expected bypassGovernance to be %v, got %v", tt.expectedBypassGovernance, bypassGovernance) } }) } } // TestGovernancePermissionMethodCalls tests that the governance permission methods are called correctly func TestGovernancePermissionMethodCalls(t *testing.T) { // Test that demonstrates the method call pattern used in handlers // This is the pattern used in DeleteObjectHandler: t.Run("delete_object_handler_pattern", func(t *testing.T) { req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil) req.Header.Set("x-amz-bypass-governance-retention", "true") // Extract parameters as done in the handler bucket, object := s3_constants.GetBucketAndObject(req) versionId := req.URL.Query().Get("versionId") bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true" // Verify the parameters are extracted correctly // Note: The actual bucket and object extraction depends on the URL structure t.Logf("Extracted bucket: %s, object: %s", bucket, object) if versionId != "" { t.Errorf("Expected versionId to be empty, got %v", versionId) } if !bypassGovernance { t.Errorf("Expected bypassGovernance to be true") } }) // This is the pattern used in PutObjectHandler: t.Run("put_object_handler_pattern", func(t *testing.T) { req, _ := http.NewRequest("PUT", "/bucket/test-object", nil) req.Header.Set("x-amz-bypass-governance-retention", "true") // Extract parameters as done in the handler bucket, object := s3_constants.GetBucketAndObject(req) bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true" versioningEnabled := true // Would be determined by isVersioningEnabled(bucket) // Verify the parameters are extracted correctly // Note: The actual bucket and object extraction depends on the URL structure t.Logf("Extracted bucket: %s, object: %s", bucket, object) if !bypassGovernance { t.Errorf("Expected bypassGovernance to be true") } if !versioningEnabled { t.Errorf("Expected versioningEnabled to be true") } }) } // TestGovernanceBypassNotPermittedError tests that ErrGovernanceBypassNotPermitted // is returned when bypass is requested but the user lacks permission func TestGovernanceBypassNotPermittedError(t *testing.T) { // Test the error constant itself if ErrGovernanceBypassNotPermitted == nil { t.Error("ErrGovernanceBypassNotPermitted should be defined") } // Verify the error message expectedMessage := "user does not have permission to bypass governance retention" if ErrGovernanceBypassNotPermitted.Error() != expectedMessage { t.Errorf("expected error message '%s', got '%s'", expectedMessage, ErrGovernanceBypassNotPermitted.Error()) } // Test the scenario where this error should be returned // This documents the expected behavior when: // 1. Object is under governance retention // 2. bypassGovernance is true // 3. checkGovernanceBypassPermission returns false testCases := []struct { name string retentionMode string bypassGovernance bool hasPermission bool expectedError error description string }{ { name: "governance_bypass_without_permission", retentionMode: s3_constants.RetentionModeGovernance, bypassGovernance: true, hasPermission: false, expectedError: ErrGovernanceBypassNotPermitted, description: "Should return ErrGovernanceBypassNotPermitted when bypass is requested but user lacks permission", }, { name: "governance_bypass_with_permission", retentionMode: s3_constants.RetentionModeGovernance, bypassGovernance: true, hasPermission: true, expectedError: nil, description: "Should succeed when bypass is requested and user has permission", }, { name: "governance_no_bypass", retentionMode: s3_constants.RetentionModeGovernance, bypassGovernance: false, hasPermission: false, expectedError: ErrGovernanceModeActive, description: "Should return ErrGovernanceModeActive when bypass is not requested", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // This test documents the expected behavior pattern // The actual checkObjectLockPermissions method implements this logic: // if retention.Mode == s3_constants.RetentionModeGovernance { // if !bypassGovernance { // return ErrGovernanceModeActive // } // if !s3a.checkGovernanceBypassPermission(request, bucket, object) { // return ErrGovernanceBypassNotPermitted // } // } var simulatedError error if tc.retentionMode == s3_constants.RetentionModeGovernance { if !tc.bypassGovernance { simulatedError = ErrGovernanceModeActive } else if !tc.hasPermission { simulatedError = ErrGovernanceBypassNotPermitted } } if simulatedError != tc.expectedError { t.Errorf("expected error %v, got %v. %s", tc.expectedError, simulatedError, tc.description) } // Verify ErrGovernanceBypassNotPermitted is returned in the right case if tc.name == "governance_bypass_without_permission" && simulatedError != ErrGovernanceBypassNotPermitted { t.Errorf("Test case should return ErrGovernanceBypassNotPermitted but got %v", simulatedError) } }) } }