mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-25 21:12:47 +02:00
* test versioning also * fix some versioning tests * fall back * fixes Never-versioned buckets: No VersionId headers, no Status field Pre-versioning objects: Regular files, VersionId="null", included in all operations Post-versioning objects: Stored in .versions directories with real version IDs Suspended versioning: Proper status handling and null version IDs * fixes Bucket Versioning Status Compliance Fixed: New buckets now return no Status field (AWS S3 compliant) Before: Always returned "Suspended" ❌ After: Returns empty VersioningConfiguration for unconfigured buckets ✅ 2. Multi-Object Delete Versioning Support Fixed: DeleteMultipleObjectsHandler now fully versioning-aware Before: Always deleted physical files, breaking versioning ❌ After: Creates delete markers or deletes specific versions properly ✅ Added: DeleteMarker field in response structure for AWS compatibility 3. Copy Operations Versioning Support Fixed: CopyObjectHandler and CopyObjectPartHandler now versioning-aware Before: Only copied regular files, couldn't handle versioned sources ❌ After: Parses version IDs from copy source, creates versions in destination ✅ Added: pathToBucketObjectAndVersion() function for version ID parsing 4. Pre-versioning Object Handling Fixed: getLatestObjectVersion() now has proper fallback logic Before: Failed when .versions directory didn't exist ❌ After: Falls back to regular objects for pre-versioning scenarios ✅ 5. Enhanced Object Version Listings Fixed: listObjectVersions() includes both versioned AND pre-versioning objects Before: Only showed .versions directories, ignored pre-versioning objects ❌ After: Shows complete version history with VersionId="null" for pre-versioning ✅ 6. Null Version ID Handling Fixed: getSpecificObjectVersion() properly handles versionId="null" Before: Couldn't retrieve pre-versioning objects by version ID ❌ After: Returns regular object files for "null" version requests ✅ 7. Version ID Response Headers Fixed: PUT operations only return x-amz-version-id when appropriate Before: Returned version IDs for non-versioned buckets ❌ After: Only returns version IDs for explicitly configured versioning ✅ * more fixes * fix copying with versioning, multipart upload * more fixes * reduce volume size for easier dev test * fix * fix version id * fix versioning * Update filer_multipart.go * fix multipart versioned upload * more fixes * more fixes * fix versioning on suspended * fixes * fixing test_versioning_obj_suspended_copy * Update s3api_object_versioning.go * fix versions * skipping test_versioning_obj_suspend_versions * > If the versioning state has never been set on a bucket, it has no versioning state; a GetBucketVersioning request does not return a versioning state value. * fix tests, avoid duplicated bucket creation, skip tests * only run s3tests_boto3/functional/test_s3.py * fix checking filer_pb.ErrNotFound * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_handlers_copy.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_bucket_config.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/s3/versioning/s3_versioning_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
449 lines
14 KiB
Go
449 lines
14 KiB
Go
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/k0kubun/pp"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// S3TestConfig holds configuration for S3 tests
|
|
type S3TestConfig struct {
|
|
Endpoint string
|
|
AccessKey string
|
|
SecretKey string
|
|
Region string
|
|
BucketPrefix string
|
|
UseSSL bool
|
|
SkipVerifySSL bool
|
|
}
|
|
|
|
// Default test configuration - should match s3tests.conf
|
|
var defaultConfig = &S3TestConfig{
|
|
Endpoint: "http://localhost:8333", // Default SeaweedFS S3 port
|
|
AccessKey: "some_access_key1",
|
|
SecretKey: "some_secret_key1",
|
|
Region: "us-east-1",
|
|
BucketPrefix: "test-versioning-",
|
|
UseSSL: false,
|
|
SkipVerifySSL: true,
|
|
}
|
|
|
|
// getS3Client creates an AWS S3 client for testing
|
|
func getS3Client(t *testing.T) *s3.Client {
|
|
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
|
config.WithRegion(defaultConfig.Region),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
defaultConfig.AccessKey,
|
|
defaultConfig.SecretKey,
|
|
"",
|
|
)),
|
|
config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
|
|
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
|
return aws.Endpoint{
|
|
URL: defaultConfig.Endpoint,
|
|
SigningRegion: defaultConfig.Region,
|
|
HostnameImmutable: true,
|
|
}, nil
|
|
})),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
return s3.NewFromConfig(cfg, func(o *s3.Options) {
|
|
o.UsePathStyle = true // Important for SeaweedFS
|
|
})
|
|
}
|
|
|
|
// getNewBucketName generates a unique bucket name
|
|
func getNewBucketName() string {
|
|
timestamp := time.Now().UnixNano()
|
|
return fmt.Sprintf("%s%d", defaultConfig.BucketPrefix, timestamp)
|
|
}
|
|
|
|
// createBucket creates a new bucket for testing
|
|
func createBucket(t *testing.T, client *s3.Client, bucketName string) {
|
|
_, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// deleteBucket deletes a bucket and all its contents
|
|
func deleteBucket(t *testing.T, client *s3.Client, bucketName string) {
|
|
// First, delete all objects and versions
|
|
err := deleteAllObjectVersions(t, client, bucketName)
|
|
if err != nil {
|
|
t.Logf("Warning: failed to delete all object versions: %v", err)
|
|
}
|
|
|
|
// Then delete the bucket
|
|
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
if err != nil {
|
|
t.Logf("Warning: failed to delete bucket %s: %v", bucketName, err)
|
|
}
|
|
}
|
|
|
|
// deleteAllObjectVersions deletes all object versions in a bucket
|
|
func deleteAllObjectVersions(t *testing.T, client *s3.Client, bucketName string) error {
|
|
// List all object versions
|
|
paginator := s3.NewListObjectVersionsPaginator(client, &s3.ListObjectVersionsInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(context.TODO())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var objectsToDelete []types.ObjectIdentifier
|
|
|
|
// Add versions
|
|
for _, version := range page.Versions {
|
|
objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
|
|
Key: version.Key,
|
|
VersionId: version.VersionId,
|
|
})
|
|
}
|
|
|
|
// Add delete markers
|
|
for _, deleteMarker := range page.DeleteMarkers {
|
|
objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
|
|
Key: deleteMarker.Key,
|
|
VersionId: deleteMarker.VersionId,
|
|
})
|
|
}
|
|
|
|
// Delete objects in batches
|
|
if len(objectsToDelete) > 0 {
|
|
_, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{
|
|
Bucket: aws.String(bucketName),
|
|
Delete: &types.Delete{
|
|
Objects: objectsToDelete,
|
|
Quiet: aws.Bool(true),
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// enableVersioning enables versioning on a bucket
|
|
func enableVersioning(t *testing.T, client *s3.Client, bucketName string) {
|
|
_, err := client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
|
|
Bucket: aws.String(bucketName),
|
|
VersioningConfiguration: &types.VersioningConfiguration{
|
|
Status: types.BucketVersioningStatusEnabled,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// checkVersioningStatus verifies the versioning status of a bucket
|
|
func checkVersioningStatus(t *testing.T, client *s3.Client, bucketName string, expectedStatus types.BucketVersioningStatus) {
|
|
resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, expectedStatus, resp.Status)
|
|
}
|
|
|
|
// checkVersioningStatusEmpty verifies that a bucket has no versioning configuration (newly created bucket)
|
|
func checkVersioningStatusEmpty(t *testing.T, client *s3.Client, bucketName string) {
|
|
resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
require.NoError(t, err)
|
|
// AWS S3 returns an empty versioning configuration (no Status field) for buckets that have never had versioning configured, such as newly created buckets.
|
|
assert.Empty(t, resp.Status, "Newly created bucket should have empty versioning status")
|
|
}
|
|
|
|
// putObject puts an object into a bucket
|
|
func putObject(t *testing.T, client *s3.Client, bucketName, key, content string) *s3.PutObjectOutput {
|
|
resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(key),
|
|
Body: strings.NewReader(content),
|
|
})
|
|
require.NoError(t, err)
|
|
return resp
|
|
}
|
|
|
|
// headObject gets object metadata
|
|
func headObject(t *testing.T, client *s3.Client, bucketName, key string) *s3.HeadObjectOutput {
|
|
resp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(key),
|
|
})
|
|
require.NoError(t, err)
|
|
return resp
|
|
}
|
|
|
|
// TestBucketListReturnDataVersioning is the Go equivalent of test_bucket_list_return_data_versioning
|
|
func TestBucketListReturnDataVersioning(t *testing.T) {
|
|
client := getS3Client(t)
|
|
bucketName := getNewBucketName()
|
|
|
|
// Create bucket
|
|
createBucket(t, client, bucketName)
|
|
defer deleteBucket(t, client, bucketName)
|
|
|
|
// Enable versioning
|
|
enableVersioning(t, client, bucketName)
|
|
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
|
|
|
|
// Create test objects
|
|
keyNames := []string{"bar", "baz", "foo"}
|
|
objectData := make(map[string]map[string]interface{})
|
|
|
|
for _, keyName := range keyNames {
|
|
// Put the object
|
|
putResp := putObject(t, client, bucketName, keyName, keyName) // content = key name
|
|
|
|
// Get object metadata
|
|
headResp := headObject(t, client, bucketName, keyName)
|
|
|
|
// Store expected data for later comparison
|
|
objectData[keyName] = map[string]interface{}{
|
|
"ETag": *headResp.ETag,
|
|
"LastModified": *headResp.LastModified,
|
|
"ContentLength": headResp.ContentLength,
|
|
"VersionId": *headResp.VersionId,
|
|
}
|
|
|
|
// Verify version ID was returned
|
|
require.NotNil(t, putResp.VersionId)
|
|
require.NotEmpty(t, *putResp.VersionId)
|
|
assert.Equal(t, *putResp.VersionId, *headResp.VersionId)
|
|
}
|
|
|
|
// List object versions
|
|
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Verify we have the expected number of versions
|
|
assert.Len(t, resp.Versions, len(keyNames))
|
|
|
|
// Check each version matches our stored data
|
|
versionsByKey := make(map[string]types.ObjectVersion)
|
|
for _, version := range resp.Versions {
|
|
versionsByKey[*version.Key] = version
|
|
}
|
|
|
|
for _, keyName := range keyNames {
|
|
version, exists := versionsByKey[keyName]
|
|
require.True(t, exists, "Expected version for key %s", keyName)
|
|
|
|
expectedData := objectData[keyName]
|
|
|
|
// Compare ETag
|
|
assert.Equal(t, expectedData["ETag"], *version.ETag)
|
|
|
|
// Compare Size
|
|
assert.Equal(t, expectedData["ContentLength"], version.Size)
|
|
|
|
// Compare VersionId
|
|
assert.Equal(t, expectedData["VersionId"], *version.VersionId)
|
|
|
|
// Compare LastModified (within reasonable tolerance)
|
|
expectedTime := expectedData["LastModified"].(time.Time)
|
|
actualTime := *version.LastModified
|
|
timeDiff := actualTime.Sub(expectedTime)
|
|
if timeDiff < 0 {
|
|
timeDiff = -timeDiff
|
|
}
|
|
assert.True(t, timeDiff < time.Minute, "LastModified times should be close")
|
|
|
|
// Verify this is marked as the latest version
|
|
assert.True(t, *version.IsLatest)
|
|
|
|
// Verify it's not a delete marker
|
|
// (delete markers should be in resp.DeleteMarkers, not resp.Versions)
|
|
}
|
|
|
|
// Verify no delete markers
|
|
assert.Empty(t, resp.DeleteMarkers)
|
|
|
|
t.Logf("Successfully verified %d versioned objects", len(keyNames))
|
|
}
|
|
|
|
// TestVersioningBasicWorkflow tests basic versioning operations
|
|
func TestVersioningBasicWorkflow(t *testing.T) {
|
|
client := getS3Client(t)
|
|
bucketName := getNewBucketName()
|
|
|
|
// Create bucket
|
|
createBucket(t, client, bucketName)
|
|
defer deleteBucket(t, client, bucketName)
|
|
|
|
// Initially, versioning should be unset/empty (not suspended) for newly created buckets
|
|
// This matches AWS S3 behavior where new buckets have no versioning status
|
|
checkVersioningStatusEmpty(t, client, bucketName)
|
|
|
|
// Enable versioning
|
|
enableVersioning(t, client, bucketName)
|
|
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
|
|
|
|
// Put same object multiple times to create versions
|
|
key := "test-object"
|
|
version1 := putObject(t, client, bucketName, key, "content-v1")
|
|
version2 := putObject(t, client, bucketName, key, "content-v2")
|
|
version3 := putObject(t, client, bucketName, key, "content-v3")
|
|
|
|
// Verify each put returned a different version ID
|
|
require.NotEqual(t, *version1.VersionId, *version2.VersionId)
|
|
require.NotEqual(t, *version2.VersionId, *version3.VersionId)
|
|
require.NotEqual(t, *version1.VersionId, *version3.VersionId)
|
|
|
|
// List versions
|
|
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Should have 3 versions
|
|
assert.Len(t, resp.Versions, 3)
|
|
|
|
// Only the latest should be marked as latest
|
|
latestCount := 0
|
|
for _, version := range resp.Versions {
|
|
if *version.IsLatest {
|
|
latestCount++
|
|
assert.Equal(t, *version3.VersionId, *version.VersionId)
|
|
}
|
|
}
|
|
assert.Equal(t, 1, latestCount, "Only one version should be marked as latest")
|
|
|
|
t.Logf("Successfully created and verified %d versions", len(resp.Versions))
|
|
}
|
|
|
|
// TestVersioningDeleteMarkers tests delete marker creation
|
|
func TestVersioningDeleteMarkers(t *testing.T) {
|
|
client := getS3Client(t)
|
|
bucketName := getNewBucketName()
|
|
|
|
// Create bucket and enable versioning
|
|
createBucket(t, client, bucketName)
|
|
defer deleteBucket(t, client, bucketName)
|
|
enableVersioning(t, client, bucketName)
|
|
|
|
// Put an object
|
|
key := "test-delete-marker"
|
|
putResp := putObject(t, client, bucketName, key, "content")
|
|
require.NotNil(t, putResp.VersionId)
|
|
|
|
// Delete the object (should create delete marker)
|
|
deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(key),
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, deleteResp.VersionId)
|
|
|
|
// List versions to see the delete marker
|
|
listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Should have 1 version and 1 delete marker
|
|
assert.Len(t, listResp.Versions, 1)
|
|
assert.Len(t, listResp.DeleteMarkers, 1)
|
|
|
|
// The delete marker should be the latest
|
|
deleteMarker := listResp.DeleteMarkers[0]
|
|
assert.True(t, *deleteMarker.IsLatest)
|
|
assert.Equal(t, *deleteResp.VersionId, *deleteMarker.VersionId)
|
|
|
|
// The original version should not be latest
|
|
version := listResp.Versions[0]
|
|
assert.False(t, *version.IsLatest)
|
|
assert.Equal(t, *putResp.VersionId, *version.VersionId)
|
|
|
|
t.Logf("Successfully created and verified delete marker")
|
|
}
|
|
|
|
// TestVersioningConcurrentOperations tests concurrent versioning operations
|
|
func TestVersioningConcurrentOperations(t *testing.T) {
|
|
client := getS3Client(t)
|
|
bucketName := getNewBucketName()
|
|
|
|
// Create bucket and enable versioning
|
|
createBucket(t, client, bucketName)
|
|
defer deleteBucket(t, client, bucketName)
|
|
enableVersioning(t, client, bucketName)
|
|
|
|
// Concurrently create multiple objects
|
|
numObjects := 10
|
|
objectKey := "concurrent-test"
|
|
|
|
// Channel to collect version IDs
|
|
versionIds := make(chan string, numObjects)
|
|
errors := make(chan error, numObjects)
|
|
|
|
// Launch concurrent puts
|
|
for i := 0; i < numObjects; i++ {
|
|
go func(index int) {
|
|
content := fmt.Sprintf("content-%d", index)
|
|
resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
Body: strings.NewReader(content),
|
|
})
|
|
if err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
versionIds <- *resp.VersionId
|
|
}(i)
|
|
}
|
|
|
|
// Collect results
|
|
var collectedVersionIds []string
|
|
for i := 0; i < numObjects; i++ {
|
|
select {
|
|
case versionId := <-versionIds:
|
|
t.Logf("Received Version ID %d: %s", i, versionId)
|
|
collectedVersionIds = append(collectedVersionIds, versionId)
|
|
case err := <-errors:
|
|
t.Fatalf("Concurrent put failed: %v", err)
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatalf("Timeout waiting for concurrent operations")
|
|
}
|
|
}
|
|
|
|
// Verify all version IDs are unique
|
|
versionIdSet := make(map[string]bool)
|
|
for _, versionId := range collectedVersionIds {
|
|
assert.False(t, versionIdSet[versionId], "Version ID should be unique: %s", versionId)
|
|
versionIdSet[versionId] = true
|
|
}
|
|
|
|
// List versions and verify count
|
|
listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
pp.Println(listResp)
|
|
require.NoError(t, err)
|
|
assert.Len(t, listResp.Versions, numObjects)
|
|
|
|
t.Logf("Successfully created %d concurrent versions with unique IDs", numObjects)
|
|
}
|