mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-24 20:42:47 +02:00
599 lines
21 KiB
Go
599 lines
21 KiB
Go
package s3api
|
|
|
|
import (
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
)
|
|
|
|
// TestCheckGovernanceBypassPermissionResourceGeneration tests that the function
|
|
// correctly generates resource paths for the permission check
|
|
func TestCheckGovernanceBypassPermissionResourceGeneration(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
bucket string
|
|
object string
|
|
expectedPath string
|
|
description string
|
|
}{
|
|
{
|
|
name: "simple_object",
|
|
bucket: "test-bucket",
|
|
object: "test-object.txt",
|
|
expectedPath: "test-bucket/test-object.txt",
|
|
description: "Simple bucket and object should be joined with slash",
|
|
},
|
|
{
|
|
name: "object_with_leading_slash",
|
|
bucket: "test-bucket",
|
|
object: "/test-object.txt",
|
|
expectedPath: "test-bucket/test-object.txt",
|
|
description: "Leading slash should be trimmed from object name",
|
|
},
|
|
{
|
|
name: "nested_object",
|
|
bucket: "test-bucket",
|
|
object: "/folder/subfolder/test-object.txt",
|
|
expectedPath: "test-bucket/folder/subfolder/test-object.txt",
|
|
description: "Nested object path should be handled correctly",
|
|
},
|
|
{
|
|
name: "empty_object",
|
|
bucket: "test-bucket",
|
|
object: "",
|
|
expectedPath: "test-bucket/",
|
|
description: "Empty object should result in bucket with trailing slash",
|
|
},
|
|
{
|
|
name: "root_object",
|
|
bucket: "test-bucket",
|
|
object: "/",
|
|
expectedPath: "test-bucket/",
|
|
description: "Root object should result in bucket with trailing slash",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Test the resource generation logic used in checkGovernanceBypassPermission
|
|
resource := strings.TrimPrefix(tt.object, "/")
|
|
actualPath := tt.bucket + "/" + resource
|
|
|
|
if actualPath != tt.expectedPath {
|
|
t.Errorf("Resource path generation failed. Expected: %s, Got: %s. %s",
|
|
tt.expectedPath, actualPath, tt.description)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCheckGovernanceBypassPermissionActionGeneration tests that the function
|
|
// correctly generates action strings for IAM checking
|
|
func TestCheckGovernanceBypassPermissionActionGeneration(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
bucket string
|
|
object string
|
|
expectedBypassAction string
|
|
expectedAdminAction string
|
|
description string
|
|
}{
|
|
{
|
|
name: "bypass_action_generation",
|
|
bucket: "test-bucket",
|
|
object: "test-object.txt",
|
|
expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
|
|
expectedAdminAction: "Admin:test-bucket/test-object.txt",
|
|
description: "Actions should be properly formatted with resource path",
|
|
},
|
|
{
|
|
name: "leading_slash_handling",
|
|
bucket: "test-bucket",
|
|
object: "/test-object.txt",
|
|
expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
|
|
expectedAdminAction: "Admin:test-bucket/test-object.txt",
|
|
description: "Leading slash should be trimmed in action generation",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Test the action generation logic used in checkGovernanceBypassPermission
|
|
resource := strings.TrimPrefix(tt.object, "/")
|
|
resourcePath := tt.bucket + "/" + resource
|
|
|
|
bypassAction := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + resourcePath
|
|
adminAction := s3_constants.ACTION_ADMIN + ":" + resourcePath
|
|
|
|
if bypassAction != tt.expectedBypassAction {
|
|
t.Errorf("Bypass action generation failed. Expected: %s, Got: %s. %s",
|
|
tt.expectedBypassAction, bypassAction, tt.description)
|
|
}
|
|
|
|
if adminAction != tt.expectedAdminAction {
|
|
t.Errorf("Admin action generation failed. Expected: %s, Got: %s. %s",
|
|
tt.expectedAdminAction, adminAction, tt.description)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCheckGovernanceBypassPermissionErrorHandling tests error handling scenarios
|
|
func TestCheckGovernanceBypassPermissionErrorHandling(t *testing.T) {
|
|
// Note: This test demonstrates the expected behavior for different error scenarios
|
|
// without requiring full IAM setup
|
|
|
|
tests := []struct {
|
|
name string
|
|
bucket string
|
|
object string
|
|
description string
|
|
}{
|
|
{
|
|
name: "empty_bucket",
|
|
bucket: "",
|
|
object: "test-object.txt",
|
|
description: "Empty bucket should be handled gracefully",
|
|
},
|
|
{
|
|
name: "special_characters",
|
|
bucket: "test-bucket",
|
|
object: "test object with spaces.txt",
|
|
description: "Objects with special characters should be handled",
|
|
},
|
|
{
|
|
name: "unicode_characters",
|
|
bucket: "test-bucket",
|
|
object: "测试文件.txt",
|
|
description: "Objects with unicode characters should be handled",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Test that the function doesn't panic with various inputs
|
|
// This would normally call checkGovernanceBypassPermission
|
|
// but since we don't have a full S3ApiServer setup, we just test
|
|
// that the resource generation logic works without panicking
|
|
resource := strings.TrimPrefix(tt.object, "/")
|
|
resourcePath := tt.bucket + "/" + resource
|
|
|
|
// Verify the resource path is generated
|
|
if resourcePath == "" {
|
|
t.Errorf("Resource path should not be empty for test case: %s", tt.description)
|
|
}
|
|
|
|
t.Logf("Generated resource path for %s: %s", tt.description, resourcePath)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCheckGovernanceBypassPermissionIntegrationBehavior documents the expected behavior
|
|
// when integrated with a full IAM system
|
|
func TestCheckGovernanceBypassPermissionIntegrationBehavior(t *testing.T) {
|
|
t.Skip("Documentation test - describes expected behavior with full IAM integration")
|
|
|
|
// This test documents the expected behavior when checkGovernanceBypassPermission
|
|
// is called with a full IAM system:
|
|
//
|
|
// 1. Function calls s3a.iam.authRequest() with the bypass action
|
|
// 2. If authRequest returns errCode != s3err.ErrNone, function returns false
|
|
// 3. If authRequest succeeds, function checks identity.canDo() with the bypass action
|
|
// 4. If canDo() returns true, function returns true
|
|
// 5. If bypass permission fails, function checks admin action with identity.canDo()
|
|
// 6. If admin action succeeds, function returns true and logs admin access
|
|
// 7. If all checks fail, function returns false
|
|
//
|
|
// The function correctly uses:
|
|
// - s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION for bypass permission
|
|
// - s3_constants.ACTION_ADMIN for admin permission
|
|
// - Proper resource path generation with bucket/object format
|
|
// - Trimming of leading slashes from object names
|
|
}
|
|
|
|
// TestGovernanceBypassPermission was removed because it tested the old
|
|
// insecure behavior of trusting the AmzIsAdmin header. The new implementation
|
|
// uses proper IAM authentication instead of relying on client-provided headers.
|
|
|
|
// Test specifically for users with IAM bypass permission
|
|
func TestGovernanceBypassWithIAMPermission(t *testing.T) {
|
|
// This test demonstrates the expected behavior for non-admin users with bypass permission
|
|
// In a real implementation, this would integrate with the full IAM system
|
|
|
|
t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
|
|
|
|
// The expected behavior would be:
|
|
// 1. Non-admin user makes request with bypass header
|
|
// 2. checkGovernanceBypassPermission calls s3a.iam.authRequest
|
|
// 3. authRequest validates user identity and checks permissions
|
|
// 4. If user has s3:BypassGovernanceRetention permission, return true
|
|
// 5. Otherwise return false
|
|
|
|
// For now, the function correctly returns false for non-admin users
|
|
// when the IAM system doesn't have the user configured with bypass permission
|
|
}
|
|
|
|
func TestGovernancePermissionIntegration(t *testing.T) {
|
|
// Note: This test demonstrates the expected integration behavior
|
|
// In a real implementation, this would require setting up a proper IAM mock
|
|
// with identities that have the bypass governance permission
|
|
|
|
t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
|
|
|
|
// This test would verify:
|
|
// 1. User with BypassGovernanceRetention permission can bypass governance
|
|
// 2. User without permission cannot bypass governance
|
|
// 3. Admin users can always bypass governance
|
|
// 4. Anonymous users cannot bypass governance
|
|
}
|
|
|
|
func TestGovernanceBypassHeader(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
headerValue string
|
|
expectedResult bool
|
|
description string
|
|
}{
|
|
{
|
|
name: "bypass_header_true",
|
|
headerValue: "true",
|
|
expectedResult: true,
|
|
description: "Header with 'true' value should enable bypass",
|
|
},
|
|
{
|
|
name: "bypass_header_false",
|
|
headerValue: "false",
|
|
expectedResult: false,
|
|
description: "Header with 'false' value should not enable bypass",
|
|
},
|
|
{
|
|
name: "bypass_header_empty",
|
|
headerValue: "",
|
|
expectedResult: false,
|
|
description: "Empty header should not enable bypass",
|
|
},
|
|
{
|
|
name: "bypass_header_invalid",
|
|
headerValue: "invalid",
|
|
expectedResult: false,
|
|
description: "Invalid header value should not enable bypass",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
req := httptest.NewRequest("DELETE", "/bucket/object", nil)
|
|
if tt.headerValue != "" {
|
|
req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
|
|
}
|
|
|
|
result := req.Header.Get("x-amz-bypass-governance-retention") == "true"
|
|
|
|
if result != tt.expectedResult {
|
|
t.Errorf("bypass header check = %v, want %v. %s", result, tt.expectedResult, tt.description)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestGovernanceRetentionModeChecking(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
retentionMode string
|
|
bypassGovernance bool
|
|
hasPermission bool
|
|
expectedError bool
|
|
expectedErrorType string
|
|
description string
|
|
}{
|
|
{
|
|
name: "compliance_mode_cannot_bypass",
|
|
retentionMode: s3_constants.RetentionModeCompliance,
|
|
bypassGovernance: true,
|
|
hasPermission: true,
|
|
expectedError: true,
|
|
expectedErrorType: "compliance mode",
|
|
description: "Compliance mode should not be bypassable even with permission",
|
|
},
|
|
{
|
|
name: "governance_mode_without_bypass",
|
|
retentionMode: s3_constants.RetentionModeGovernance,
|
|
bypassGovernance: false,
|
|
hasPermission: false,
|
|
expectedError: true,
|
|
expectedErrorType: "governance mode",
|
|
description: "Governance mode should be blocked without bypass",
|
|
},
|
|
{
|
|
name: "governance_mode_with_bypass_no_permission",
|
|
retentionMode: s3_constants.RetentionModeGovernance,
|
|
bypassGovernance: true,
|
|
hasPermission: false,
|
|
expectedError: true,
|
|
expectedErrorType: "permission",
|
|
description: "Governance mode bypass should fail without permission",
|
|
},
|
|
{
|
|
name: "governance_mode_with_bypass_and_permission",
|
|
retentionMode: s3_constants.RetentionModeGovernance,
|
|
bypassGovernance: true,
|
|
hasPermission: true,
|
|
expectedError: false,
|
|
expectedErrorType: "",
|
|
description: "Governance mode bypass should succeed with permission",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Test validates the logic without actually needing the full implementation
|
|
// This demonstrates the expected behavior patterns
|
|
|
|
var hasError bool
|
|
var errorType string
|
|
|
|
if tt.retentionMode == s3_constants.RetentionModeCompliance {
|
|
hasError = true
|
|
errorType = "compliance mode"
|
|
} else if tt.retentionMode == s3_constants.RetentionModeGovernance {
|
|
if !tt.bypassGovernance {
|
|
hasError = true
|
|
errorType = "governance mode"
|
|
} else if !tt.hasPermission {
|
|
hasError = true
|
|
errorType = "permission"
|
|
}
|
|
}
|
|
|
|
if hasError != tt.expectedError {
|
|
t.Errorf("expected error: %v, got error: %v. %s", tt.expectedError, hasError, tt.description)
|
|
}
|
|
|
|
if tt.expectedError && !strings.Contains(errorType, tt.expectedErrorType) {
|
|
t.Errorf("expected error type containing '%s', got '%s'. %s", tt.expectedErrorType, errorType, tt.description)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestGovernancePermissionActionGeneration(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
bucket string
|
|
object string
|
|
expectedAction string
|
|
description string
|
|
}{
|
|
{
|
|
name: "bucket_and_object_action",
|
|
bucket: "test-bucket",
|
|
object: "/test-object", // Object has "/" prefix from GetBucketAndObject
|
|
expectedAction: "BypassGovernanceRetention:test-bucket/test-object",
|
|
description: "Action should be generated correctly for bucket and object",
|
|
},
|
|
{
|
|
name: "bucket_only_action",
|
|
bucket: "test-bucket",
|
|
object: "",
|
|
expectedAction: "BypassGovernanceRetention:test-bucket",
|
|
description: "Action should be generated correctly for bucket only",
|
|
},
|
|
{
|
|
name: "nested_object_action",
|
|
bucket: "test-bucket",
|
|
object: "/folder/subfolder/object", // Object has "/" prefix from GetBucketAndObject
|
|
expectedAction: "BypassGovernanceRetention:test-bucket/folder/subfolder/object",
|
|
description: "Action should be generated correctly for nested objects",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
action := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + tt.bucket + tt.object
|
|
|
|
if action != tt.expectedAction {
|
|
t.Errorf("generated action: %s, expected: %s. %s", action, tt.expectedAction, tt.description)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestGovernancePermissionEndToEnd tests the complete object lock permission flow
|
|
func TestGovernancePermissionEndToEnd(t *testing.T) {
|
|
t.Skip("End-to-end testing requires full S3 API server setup - demonstrates expected behavior")
|
|
|
|
// This test demonstrates the end-to-end flow that would be tested in a full integration test
|
|
// The checkObjectLockPermissions method is called by:
|
|
// 1. DeleteObjectHandler - when versioning is enabled and object lock is configured
|
|
// 2. DeleteMultipleObjectsHandler - for each object in versioned buckets
|
|
// 3. PutObjectHandler - via checkObjectLockPermissionsForPut for versioned buckets
|
|
// 4. PutObjectRetentionHandler - when setting retention on objects
|
|
//
|
|
// Each handler:
|
|
// - Extracts bypassGovernance from "x-amz-bypass-governance-retention" header
|
|
// - Calls checkObjectLockPermissions with the appropriate parameters
|
|
// - Handles the returned errors appropriately (ErrAccessDenied, etc.)
|
|
//
|
|
// The method integrates with the IAM system through checkGovernanceBypassPermission
|
|
// which validates the s3:BypassGovernanceRetention permission
|
|
}
|
|
|
|
// TestGovernancePermissionHTTPFlow tests the HTTP header processing and method calls
|
|
func TestGovernancePermissionHTTPFlow(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
headerValue string
|
|
expectedBypassGovernance bool
|
|
}{
|
|
{
|
|
name: "bypass_header_true",
|
|
headerValue: "true",
|
|
expectedBypassGovernance: true,
|
|
},
|
|
{
|
|
name: "bypass_header_false",
|
|
headerValue: "false",
|
|
expectedBypassGovernance: false,
|
|
},
|
|
{
|
|
name: "bypass_header_missing",
|
|
headerValue: "",
|
|
expectedBypassGovernance: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Create a mock HTTP request
|
|
req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
|
|
if tt.headerValue != "" {
|
|
req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
|
|
}
|
|
|
|
// Test the header processing logic used in handlers
|
|
bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
|
|
|
|
if bypassGovernance != tt.expectedBypassGovernance {
|
|
t.Errorf("Expected bypassGovernance to be %v, got %v", tt.expectedBypassGovernance, bypassGovernance)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestGovernancePermissionMethodCalls tests that the governance permission methods are called correctly
|
|
func TestGovernancePermissionMethodCalls(t *testing.T) {
|
|
// Test that demonstrates the method call pattern used in handlers
|
|
|
|
// This is the pattern used in DeleteObjectHandler:
|
|
t.Run("delete_object_handler_pattern", func(t *testing.T) {
|
|
req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
|
|
req.Header.Set("x-amz-bypass-governance-retention", "true")
|
|
|
|
// Extract parameters as done in the handler
|
|
bucket, object := s3_constants.GetBucketAndObject(req)
|
|
versionId := req.URL.Query().Get("versionId")
|
|
bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
|
|
|
|
// Verify the parameters are extracted correctly
|
|
// Note: The actual bucket and object extraction depends on the URL structure
|
|
t.Logf("Extracted bucket: %s, object: %s", bucket, object)
|
|
if versionId != "" {
|
|
t.Errorf("Expected versionId to be empty, got %v", versionId)
|
|
}
|
|
if !bypassGovernance {
|
|
t.Errorf("Expected bypassGovernance to be true")
|
|
}
|
|
})
|
|
|
|
// This is the pattern used in PutObjectHandler:
|
|
t.Run("put_object_handler_pattern", func(t *testing.T) {
|
|
req, _ := http.NewRequest("PUT", "/bucket/test-object", nil)
|
|
req.Header.Set("x-amz-bypass-governance-retention", "true")
|
|
|
|
// Extract parameters as done in the handler
|
|
bucket, object := s3_constants.GetBucketAndObject(req)
|
|
bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
|
|
versioningEnabled := true // Would be determined by isVersioningEnabled(bucket)
|
|
|
|
// Verify the parameters are extracted correctly
|
|
// Note: The actual bucket and object extraction depends on the URL structure
|
|
t.Logf("Extracted bucket: %s, object: %s", bucket, object)
|
|
if !bypassGovernance {
|
|
t.Errorf("Expected bypassGovernance to be true")
|
|
}
|
|
if !versioningEnabled {
|
|
t.Errorf("Expected versioningEnabled to be true")
|
|
}
|
|
})
|
|
}
|
|
|
|
// TestGovernanceBypassNotPermittedError tests that ErrGovernanceBypassNotPermitted
|
|
// is returned when bypass is requested but the user lacks permission
|
|
func TestGovernanceBypassNotPermittedError(t *testing.T) {
|
|
// Test the error constant itself
|
|
if ErrGovernanceBypassNotPermitted == nil {
|
|
t.Error("ErrGovernanceBypassNotPermitted should be defined")
|
|
}
|
|
|
|
// Verify the error message
|
|
expectedMessage := "user does not have permission to bypass governance retention"
|
|
if ErrGovernanceBypassNotPermitted.Error() != expectedMessage {
|
|
t.Errorf("expected error message '%s', got '%s'",
|
|
expectedMessage, ErrGovernanceBypassNotPermitted.Error())
|
|
}
|
|
|
|
// Test the scenario where this error should be returned
|
|
// This documents the expected behavior when:
|
|
// 1. Object is under governance retention
|
|
// 2. bypassGovernance is true
|
|
// 3. checkGovernanceBypassPermission returns false
|
|
testCases := []struct {
|
|
name string
|
|
retentionMode string
|
|
bypassGovernance bool
|
|
hasPermission bool
|
|
expectedError error
|
|
description string
|
|
}{
|
|
{
|
|
name: "governance_bypass_without_permission",
|
|
retentionMode: s3_constants.RetentionModeGovernance,
|
|
bypassGovernance: true,
|
|
hasPermission: false,
|
|
expectedError: ErrGovernanceBypassNotPermitted,
|
|
description: "Should return ErrGovernanceBypassNotPermitted when bypass is requested but user lacks permission",
|
|
},
|
|
{
|
|
name: "governance_bypass_with_permission",
|
|
retentionMode: s3_constants.RetentionModeGovernance,
|
|
bypassGovernance: true,
|
|
hasPermission: true,
|
|
expectedError: nil,
|
|
description: "Should succeed when bypass is requested and user has permission",
|
|
},
|
|
{
|
|
name: "governance_no_bypass",
|
|
retentionMode: s3_constants.RetentionModeGovernance,
|
|
bypassGovernance: false,
|
|
hasPermission: false,
|
|
expectedError: ErrGovernanceModeActive,
|
|
description: "Should return ErrGovernanceModeActive when bypass is not requested",
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// This test documents the expected behavior pattern
|
|
// The actual checkObjectLockPermissions method implements this logic:
|
|
// if retention.Mode == s3_constants.RetentionModeGovernance {
|
|
// if !bypassGovernance {
|
|
// return ErrGovernanceModeActive
|
|
// }
|
|
// if !s3a.checkGovernanceBypassPermission(request, bucket, object) {
|
|
// return ErrGovernanceBypassNotPermitted
|
|
// }
|
|
// }
|
|
|
|
var simulatedError error
|
|
if tc.retentionMode == s3_constants.RetentionModeGovernance {
|
|
if !tc.bypassGovernance {
|
|
simulatedError = ErrGovernanceModeActive
|
|
} else if !tc.hasPermission {
|
|
simulatedError = ErrGovernanceBypassNotPermitted
|
|
}
|
|
}
|
|
|
|
if simulatedError != tc.expectedError {
|
|
t.Errorf("expected error %v, got %v. %s", tc.expectedError, simulatedError, tc.description)
|
|
}
|
|
|
|
// Verify ErrGovernanceBypassNotPermitted is returned in the right case
|
|
if tc.name == "governance_bypass_without_permission" && simulatedError != ErrGovernanceBypassNotPermitted {
|
|
t.Errorf("Test case should return ErrGovernanceBypassNotPermitted but got %v", simulatedError)
|
|
}
|
|
})
|
|
}
|
|
}
|