1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-10-14 22:10:23 +02:00
seaweedfs/weed/s3api/filer_multipart.go
Chris Lu c5a9c27449
Migrate from deprecated azure-storage-blob-go to modern Azure SDK (#7310)
* Migrate from deprecated azure-storage-blob-go to modern Azure SDK

Migrates Azure Blob Storage integration from the deprecated
github.com/Azure/azure-storage-blob-go to the modern
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob SDK.

## Changes

### Removed Files
- weed/remote_storage/azure/azure_highlevel.go
  - Custom upload helper no longer needed with new SDK

### Updated Files
- weed/remote_storage/azure/azure_storage_client.go
  - Migrated from ServiceURL/ContainerURL/BlobURL to Client-based API
  - Updated client creation using NewClientWithSharedKeyCredential
  - Replaced ListBlobsFlatSegment with NewListBlobsFlatPager
  - Updated Download to DownloadStream with proper HTTPRange
  - Replaced custom uploadReaderAtToBlockBlob with UploadStream
  - Updated GetProperties, SetMetadata, Delete to use new client methods
  - Fixed metadata conversion to return map[string]*string

- weed/replication/sink/azuresink/azure_sink.go
  - Migrated from ContainerURL to Client-based API
  - Updated client initialization
  - Replaced AppendBlobURL with AppendBlobClient
  - Updated error handling to use azcore.ResponseError
  - Added streaming.NopCloser for AppendBlock

### New Test Files
- weed/remote_storage/azure/azure_storage_client_test.go
  - Comprehensive unit tests for all client operations
  - Tests for Traverse, ReadFile, WriteFile, UpdateMetadata, Delete
  - Tests for metadata conversion function
  - Benchmark tests
  - Integration tests (skippable without credentials)

- weed/replication/sink/azuresink/azure_sink_test.go
  - Unit tests for Azure sink operations
  - Tests for CreateEntry, UpdateEntry, DeleteEntry
  - Tests for cleanKey function
  - Tests for configuration-based initialization
  - Integration tests (skippable without credentials)
  - Benchmark tests

### Dependency Updates
- go.mod: Removed github.com/Azure/azure-storage-blob-go v0.15.0
- go.mod: Made github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 direct dependency
- All deprecated dependencies automatically cleaned up

## API Migration Summary

Old SDK → New SDK mappings:
- ServiceURL → Client (service-level operations)
- ContainerURL → ContainerClient
- BlobURL → BlobClient
- BlockBlobURL → BlockBlobClient
- AppendBlobURL → AppendBlobClient
- ListBlobsFlatSegment() → NewListBlobsFlatPager()
- Download() → DownloadStream()
- Upload() → UploadStream()
- Marker-based pagination → Pager-based pagination
- azblob.ResponseError → azcore.ResponseError

## Testing

All tests pass:
-  Unit tests for metadata conversion
-  Unit tests for helper functions (cleanKey)
-  Interface implementation tests
-  Build successful
-  No compilation errors
-  Integration tests available (require Azure credentials)

## Benefits

-  Uses actively maintained SDK
-  Better performance with modern API design
-  Improved error handling
-  Removes ~200 lines of custom upload code
-  Reduces dependency count
-  Better async/streaming support
-  Future-proof against SDK deprecation

## Backward Compatibility

The changes are transparent to users:
- Same configuration parameters (account name, account key)
- Same functionality and behavior
- No changes to SeaweedFS API or user-facing features
- Existing Azure storage configurations continue to work

## Breaking Changes

None - this is an internal implementation change only.

* Address Gemini Code Assist review comments

Fixed three issues identified by Gemini Code Assist:

1. HIGH: ReadFile now uses blob.CountToEnd when size is 0
   - Old SDK: size=0 meant "read to end"
   - New SDK: size=0 means "read 0 bytes"
   - Fix: Use blob.CountToEnd (-1) to read entire blob from offset

2. MEDIUM: Use to.Ptr() instead of slice trick for DeleteSnapshots
   - Replaced &[]Type{value}[0] with to.Ptr(value)
   - Cleaner, more idiomatic Azure SDK pattern
   - Applied to both azure_storage_client.go and azure_sink.go

3. Added missing imports:
   - github.com/Azure/azure-sdk-for-go/sdk/azcore/to

These changes improve code clarity and correctness while following
Azure SDK best practices.

* Address second round of Gemini Code Assist review comments

Fixed all issues identified in the second review:

1. MEDIUM: Added constants for hardcoded values
   - Defined defaultBlockSize (4 MB) and defaultConcurrency (16)
   - Applied to WriteFile UploadStream options
   - Improves maintainability and readability

2. MEDIUM: Made DeleteFile idempotent
   - Now returns nil (no error) if blob doesn't exist
   - Uses bloberror.HasCode(err, bloberror.BlobNotFound)
   - Consistent with idempotent operation expectations

3. Fixed TestToMetadata test failures
   - Test was using lowercase 'x-amz-meta-' but constant is 'X-Amz-Meta-'
   - Updated test to use s3_constants.AmzUserMetaPrefix
   - All tests now pass

Changes:
- Added import: github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror
- Added constants: defaultBlockSize, defaultConcurrency
- Updated WriteFile to use constants
- Updated DeleteFile to be idempotent
- Fixed test to use correct S3 metadata prefix constant

All tests pass. Build succeeds. Code follows Azure SDK best practices.

* Address third round of Gemini Code Assist review comments

Fixed all issues identified in the third review:

1. MEDIUM: Use bloberror.HasCode for ContainerAlreadyExists
   - Replaced fragile string check with bloberror.HasCode()
   - More robust and aligned with Azure SDK best practices
   - Applied to CreateBucket test

2. MEDIUM: Use bloberror.HasCode for BlobNotFound in test
   - Replaced generic error check with specific BlobNotFound check
   - Makes test more precise and verifies correct error returned
   - Applied to VerifyDeleted test

3. MEDIUM: Made DeleteEntry idempotent in azure_sink.go
   - Now returns nil (no error) if blob doesn't exist
   - Uses bloberror.HasCode(err, bloberror.BlobNotFound)
   - Consistent with DeleteFile implementation
   - Makes replication sink more robust to retries

Changes:
- Added import to azure_storage_client_test.go: bloberror
- Added import to azure_sink.go: bloberror
- Updated CreateBucket test to use bloberror.HasCode
- Updated VerifyDeleted test to use bloberror.HasCode
- Updated DeleteEntry to be idempotent

All tests pass. Build succeeds. Code uses Azure SDK best practices.

* Address fourth round of Gemini Code Assist review comments

Fixed two critical issues identified in the fourth review:

1. HIGH: Handle BlobAlreadyExists in append blob creation
   - Problem: If append blob already exists, Create() fails causing replication failure
   - Fix: Added bloberror.HasCode(err, bloberror.BlobAlreadyExists) check
   - Behavior: Existing append blobs are now acceptable, appends can proceed
   - Impact: Makes replication sink more robust, prevents unnecessary failures
   - Location: azure_sink.go CreateEntry function

2. MEDIUM: Configure custom retry policy for download resiliency
   - Problem: Old SDK had MaxRetryRequests: 20, new SDK defaults to 3 retries
   - Fix: Configured policy.RetryOptions with MaxRetries: 10
   - Settings: TryTimeout=1min, RetryDelay=2s, MaxRetryDelay=1min
   - Impact: Maintains similar resiliency in unreliable network conditions
   - Location: azure_storage_client.go client initialization

Changes:
- Added import: github.com/Azure/azure-sdk-for-go/sdk/azcore/policy
- Updated NewClientWithSharedKeyCredential to include ClientOptions with retry policy
- Updated CreateEntry error handling to allow BlobAlreadyExists

Technical details:
- Retry policy uses exponential backoff (default SDK behavior)
- MaxRetries=10 provides good balance (was 20 in old SDK, default is 3)
- TryTimeout prevents individual requests from hanging indefinitely
- BlobAlreadyExists handling allows idempotent append operations

All tests pass. Build succeeds. Code is more resilient and robust.

* Update weed/replication/sink/azuresink/azure_sink.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Revert "Update weed/replication/sink/azuresink/azure_sink.go"

This reverts commit 605e41cadf.

* Address fifth round of Gemini Code Assist review comment

Added retry policy to azure_sink.go for consistency and resiliency:

1. MEDIUM: Configure retry policy in azure_sink.go client
   - Problem: azure_sink.go was using default retry policy (3 retries) while
     azure_storage_client.go had custom policy (10 retries)
   - Fix: Added same retry policy configuration for consistency
   - Settings: MaxRetries=10, TryTimeout=1min, RetryDelay=2s, MaxRetryDelay=1min
   - Impact: Replication sink now has same resiliency as storage client
   - Rationale: Replication sink needs to be robust against transient network errors

Changes:
- Added import: github.com/Azure/azure-sdk-for-go/sdk/azcore/policy
- Updated NewClientWithSharedKeyCredential call in initialize() function
- Both azure_storage_client.go and azure_sink.go now have identical retry policies

Benefits:
- Consistency: Both Azure clients now use same retry configuration
- Resiliency: Replication operations more robust to network issues
- Best practices: Follows Azure SDK recommended patterns for production use

All tests pass. Build succeeds. Code is consistent and production-ready.

* fmt

* Address sixth round of Gemini Code Assist review comment

Fixed HIGH priority metadata key validation for Azure compliance:

1. HIGH: Handle metadata keys starting with digits
   - Problem: Azure Blob Storage requires metadata keys to be valid C# identifiers
   - Constraint: C# identifiers cannot start with a digit (0-9)
   - Issue: S3 metadata like 'x-amz-meta-123key' would fail with InvalidInput error
   - Fix: Prefix keys starting with digits with underscore '_'
   - Example: '123key' becomes '_123key', '456-test' becomes '_456_test'

2. Code improvement: Use strings.ReplaceAll for better readability
   - Changed from: strings.Replace(str, "-", "_", -1)
   - Changed to: strings.ReplaceAll(str, "-", "_")
   - Both are functionally equivalent, ReplaceAll is more readable

Changes:
- Updated toMetadata() function in azure_storage_client.go
- Added digit prefix check: if key[0] >= '0' && key[0] <= '9'
- Added comprehensive test case 'keys starting with digits'
- Tests cover: '123key' -> '_123key', '456-test' -> '_456_test', '789' -> '_789'

Technical details:
- Azure SDK validates metadata keys as C# identifiers
- C# identifier rules: must start with letter or underscore
- Digits allowed in identifiers but not as first character
- This prevents SetMetadata() and UploadStream() failures

All tests pass including new test case. Build succeeds.
Code is now fully compliant with Azure metadata requirements.

* Address seventh round of Gemini Code Assist review comment

Normalize metadata keys to lowercase for S3 compatibility:

1. MEDIUM: Convert metadata keys to lowercase
   - Rationale: S3 specification stores user-defined metadata keys in lowercase
   - Consistency: Azure Blob Storage metadata is case-insensitive
   - Best practice: Normalizing to lowercase ensures consistent behavior
   - Example: 'x-amz-meta-My-Key' -> 'my_key' (not 'My_Key')

Changes:
- Updated toMetadata() to apply strings.ToLower() to keys
- Added comment explaining S3 lowercase normalization
- Order of operations: strip prefix -> lowercase -> replace dashes -> check digits

Test coverage:
- Added new test case 'uppercase and mixed case keys'
- Tests: 'My-Key' -> 'my_key', 'UPPERCASE' -> 'uppercase', 'MiXeD-CaSe' -> 'mixed_case'
- All 6 test cases pass

Benefits:
- S3 compatibility: Matches S3 metadata key behavior
- Azure consistency: Case-insensitive keys work predictably
- Cross-platform: Same metadata keys work identically on both S3 and Azure
- Prevents issues: No surprises from case-sensitive key handling

Implementation:
```go
key := strings.ReplaceAll(strings.ToLower(k[len(s3_constants.AmzUserMetaPrefix):]), "-", "_")
```

All tests pass. Build succeeds. Metadata handling is now fully S3-compatible.

* Address eighth round of Gemini Code Assist review comments

Use %w instead of %v for error wrapping across both files:

1. MEDIUM: Error wrapping in azure_storage_client.go
   - Problem: Using %v in fmt.Errorf loses error type information
   - Modern Go practice: Use %w to preserve error chains
   - Benefit: Enables errors.Is() and errors.As() for callers
   - Example: Can check for bloberror.BlobNotFound after wrapping

2. MEDIUM: Error wrapping in azure_sink.go
   - Applied same improvement for consistency
   - All error wrapping now preserves underlying errors
   - Improved debugging and error handling capabilities

Changes applied to all fmt.Errorf calls:
- azure_storage_client.go: 10 instances changed from %v to %w
  - Invalid credential error
  - Client creation error
  - Traverse errors
  - Download errors (2)
  - Upload error
  - Delete error
  - Create/Delete bucket errors (2)

- azure_sink.go: 3 instances changed from %v to %w
  - Credential creation error
  - Client creation error
  - Delete entry error
  - Create append blob error

Benefits:
- Error inspection: Callers can use errors.Is(err, target)
- Error unwrapping: Callers can use errors.As(err, &target)
- Type preservation: Original error types maintained through wraps
- Better debugging: Full error chain available for inspection
- Modern Go: Follows Go 1.13+ error wrapping best practices

Example usage after this change:
```go
err := client.ReadFile(...)
if errors.Is(err, bloberror.BlobNotFound) {
    // Can detect specific Azure errors even after wrapping
}
```

All tests pass. Build succeeds. Error handling is now modern and robust.

* Address ninth round of Gemini Code Assist review comment

Improve metadata key sanitization with comprehensive character validation:

1. MEDIUM: Complete Azure C# identifier validation
   - Problem: Previous implementation only handled dashes, not all invalid chars
   - Issue: Keys like 'my.key', 'key+plus', 'key@symbol' would cause InvalidMetadata
   - Azure requirement: Metadata keys must be valid C# identifiers
   - Valid characters: letters (a-z, A-Z), digits (0-9), underscore (_) only

2. Implemented robust regex-based sanitization
   - Added package-level regex: `[^a-zA-Z0-9_]`
   - Matches ANY character that's not alphanumeric or underscore
   - Replaces all invalid characters with underscore
   - Compiled once at package init for performance

Implementation details:
- Regex declared at package level: var invalidMetadataChars = regexp.MustCompile(`[^a-zA-Z0-9_]`)
- Avoids recompiling regex on every toMetadata() call
- Efficient single-pass replacement of all invalid characters
- Processing order: lowercase -> regex replace -> digit check

Examples of character transformations:
- Dots: 'my.key' -> 'my_key'
- Plus: 'key+plus' -> 'key_plus'
- At symbol: 'key@symbol' -> 'key_symbol'
- Mixed: 'key-with.' -> 'key_with_'
- Slash: 'key/slash' -> 'key_slash'
- Combined: '123-key.value+test' -> '_123_key_value_test'

Test coverage:
- Added comprehensive test case 'keys with invalid characters'
- Tests: dot, plus, at-symbol, dash+dot, slash
- All 7 test cases pass (was 6, now 7)

Benefits:
- Complete Azure compliance: Handles ALL invalid characters
- Robust: Works with any S3 metadata key format
- Performant: Regex compiled once, reused efficiently
- Maintainable: Single source of truth for valid characters
- Prevents errors: No more InvalidMetadata errors during upload

All tests pass. Build succeeds. Metadata sanitization is now bulletproof.

* Address tenth round review - HIGH: Fix metadata key collision issue

Prevent metadata loss by using hex encoding for invalid characters:

1. HIGH PRIORITY: Metadata key collision prevention
   - Critical Issue: Different S3 keys mapping to same Azure key causes data loss
   - Example collisions (BEFORE):
     * 'my-key' -> 'my_key'
     * 'my.key' -> 'my_key'   COLLISION! Second overwrites first
     * 'my_key' -> 'my_key'   All three map to same key!

   - Fixed with hex encoding (AFTER):
     * 'my-key' -> 'my_2d_key' (dash = 0x2d)
     * 'my.key' -> 'my_2e_key' (dot = 0x2e)
     * 'my_key' -> 'my_key'    (underscore is valid)
      All three are now unique!

2. Implemented collision-proof hex encoding
   - Pattern: Invalid chars -> _XX_ where XX is hex code
   - Dash (0x2d): 'content-type' -> 'content_2d_type'
   - Dot (0x2e): 'my.key' -> 'my_2e_key'
   - Plus (0x2b): 'key+plus' -> 'key_2b_plus'
   - At (0x40): 'key@symbol' -> 'key_40_symbol'
   - Slash (0x2f): 'key/slash' -> 'key_2f_slash'

3. Created sanitizeMetadataKey() function
   - Encapsulates hex encoding logic
   - Uses ReplaceAllStringFunc for efficient transformation
   - Maintains digit prefix check for Azure C# identifier rules
   - Clear documentation with examples

Implementation details:
```go
func sanitizeMetadataKey(key string) string {
    // Replace each invalid character with _XX_ where XX is the hex code
    result := invalidMetadataChars.ReplaceAllStringFunc(key, func(s string) string {
        return fmt.Sprintf("_%02x_", s[0])
    })

    // Azure metadata keys cannot start with a digit
    if len(result) > 0 && result[0] >= '0' && result[0] <= '9' {
        result = "_" + result
    }

    return result
}
```

Why hex encoding solves the collision problem:
- Each invalid character gets unique hex representation
- Two-digit hex ensures no confusion (always _XX_ format)
- Preserves all information from original key
- Reversible (though not needed for this use case)
- Azure-compliant (hex codes don't introduce new invalid chars)

Test coverage:
- Updated all test expectations to match hex encoding
- Added 'collision prevention' test case demonstrating uniqueness:
  * Tests my-key, my.key, my_key all produce different results
  * Proves metadata from different S3 keys won't collide
- Total test cases: 8 (was 7, added collision prevention)

Examples from tests:
- 'content-type' -> 'content_2d_type' (0x2d = dash)
- '456-test' -> '_456_2d_test' (digit prefix + dash)
- 'My-Key' -> 'my_2d_key' (lowercase + hex encode dash)
- 'key-with.' -> 'key_2d_with_2e_' (multiple chars: dash, dot, trailing dot)

Benefits:
-  Zero collision risk: Every unique S3 key -> unique Azure key
-  Data integrity: No metadata loss from overwrites
-  Complete info preservation: Original key distinguishable
-  Azure compliant: Hex-encoded keys are valid C# identifiers
-  Maintainable: Clean function with clear purpose
-  Testable: Collision prevention explicitly tested

All tests pass. Build succeeds. Metadata integrity is now guaranteed.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-10-08 23:12:03 -07:00

779 lines
30 KiB
Go

package s3api
import (
"cmp"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/xml"
"fmt"
"math"
"path/filepath"
"slices"
"sort"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
const (
multipartExt = ".part"
multiPartMinSize = 5 * 1024 * 1024
)
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
s3.CreateMultipartUploadOutput
}
func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input)
uploadIdString := s3a.generateUploadID(*input.Key)
uploadIdString = uploadIdString + "_" + strings.ReplaceAll(uuid.New().String(), "-", "")
// Prepare error handling outside callback scope
var encryptionError error
if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended["key"] = []byte(*input.Key)
// Set object owner for multipart upload
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range input.Metadata {
entry.Extended[k] = []byte(*v)
}
if input.ContentType != nil {
entry.Attributes.Mime = *input.ContentType
}
// Prepare and apply encryption configuration within directory creation
// This ensures encryption resources are only allocated if directory creation succeeds
encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, uploadIdString)
if prepErr != nil {
encryptionError = prepErr
return // Exit callback, letting mkdir handle the error
}
s3a.applyMultipartEncryptionConfig(entry, encryptionConfig)
// Extract and store object lock metadata from request headers
// This ensures object lock settings from create_multipart_upload are preserved
if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
glog.Errorf("createMultipartUpload: failed to extract object lock metadata: %v", err)
// Don't fail the upload - this matches AWS behavior for invalid metadata
}
}); err != nil {
_, errorCode := handleMultipartInternalError("create multipart upload directory", err)
return nil, errorCode
}
// Check for encryption configuration errors that occurred within the callback
if encryptionError != nil {
_, errorCode := handleMultipartInternalError("prepare encryption configuration", encryptionError)
return nil, errorCode
}
output = &InitiateMultipartUploadResult{
CreateMultipartUploadOutput: s3.CreateMultipartUploadOutput{
Bucket: input.Bucket,
Key: objectKey(input.Key),
UploadId: aws.String(uploadIdString),
},
}
return
}
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
Location *string `xml:"Location,omitempty"`
Bucket *string `xml:"Bucket,omitempty"`
Key *string `xml:"Key,omitempty"`
ETag *string `xml:"ETag,omitempty"`
// VersionId is NOT included in XML body - it should only be in x-amz-version-id HTTP header
// Store the VersionId internally for setting HTTP header, but don't marshal to XML
VersionId *string `xml:"-"`
}
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
if len(parts.Parts) == 0 {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
completedPartNumbers := []int{}
completedPartMap := make(map[int][]string)
maxPartNo := 1
for _, part := range parts.Parts {
if _, ok := completedPartMap[part.PartNumber]; !ok {
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
}
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
maxPartNo = maxInt(maxPartNo, part.PartNumber)
}
sort.Ints(completedPartNumbers)
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
entries, _, err := s3a.list(uploadDirectory, "", "", false, 0)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
if len(entries) == 0 {
entryName, dirName := s3a.getEntryNameAndDir(input)
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
return &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
Key: objectKey(input.Key),
}, s3err.ErrNone
}
}
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
deleteEntries := []*filer_pb.Entry{}
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
entityTooSmall := false
for _, entry := range entries {
foundEntry := false
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
continue
}
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err)
continue
}
completedPartsByNumber, ok := completedPartMap[partNumber]
if !ok {
continue
}
for _, partETag := range completedPartsByNumber {
partETag = strings.Trim(partETag, `"`)
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
if partETag != "" && len(partETag) == 32 && entryETag != "" {
if entryETag != partETag {
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
continue
}
} else {
glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
}
if len(entry.Chunks) == 0 && partNumber != maxPartNo {
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
continue
}
//there maybe multi same part, because of client retry
partEntries[partNumber] = append(partEntries[partNumber], entry)
foundEntry = true
}
if foundEntry {
if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] &&
entry.Attributes.FileSize < multiPartMinSize {
glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name)
entityTooSmall = true
}
} else {
deleteEntries = append(deleteEntries, entry)
}
}
if entityTooSmall {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc()
return nil, s3err.ErrEntityTooSmall
}
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
glog.Errorf("part %d has no entry", partNumber)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
return nil, s3err.ErrInvalidPart
}
found := false
if len(partEntriesByNumber) > 1 {
slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int {
return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs)
})
}
for _, entry := range partEntriesByNumber {
if found {
deleteEntries = append(deleteEntries, entry)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc()
continue
}
// Track within-part offset for SSE-KMS IV calculation
var withinPartOffset int64 = 0
for _, chunk := range entry.GetChunks() {
// Update SSE metadata with correct within-part offset (unified approach for KMS and SSE-C)
sseKmsMetadata := chunk.SseMetadata
if chunk.SseType == filer_pb.SSEType_SSE_KMS && len(chunk.SseMetadata) > 0 {
// Deserialize, update offset, and re-serialize SSE-KMS metadata
if kmsKey, err := DeserializeSSEKMSMetadata(chunk.SseMetadata); err == nil {
kmsKey.ChunkOffset = withinPartOffset
if updatedMetadata, serErr := SerializeSSEKMSMetadata(kmsKey); serErr == nil {
sseKmsMetadata = updatedMetadata
glog.V(4).Infof("Updated SSE-KMS metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset)
}
}
} else if chunk.SseType == filer_pb.SSEType_SSE_C {
// For SSE-C chunks, create per-chunk metadata using the part's IV
if ivData, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists {
// Get keyMD5 from entry metadata if available
var keyMD5 string
if keyMD5Data, keyExists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; keyExists {
keyMD5 = string(keyMD5Data)
}
// Create SSE-C metadata with the part's IV and this chunk's within-part offset
if ssecMetadata, serErr := SerializeSSECMetadata(ivData, keyMD5, withinPartOffset); serErr == nil {
sseKmsMetadata = ssecMetadata // Reuse the same field for unified handling
glog.V(4).Infof("Created SSE-C metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset)
} else {
glog.Errorf("Failed to serialize SSE-C metadata for chunk in part %d: %v", partNumber, serErr)
}
} else {
glog.Errorf("SSE-C chunk in part %d missing IV in entry metadata", partNumber)
}
}
p := &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
ModifiedTsNs: chunk.ModifiedTsNs,
CipherKey: chunk.CipherKey,
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
// Preserve SSE metadata with updated within-part offset
SseType: chunk.SseType,
SseMetadata: sseKmsMetadata,
}
finalParts = append(finalParts, p)
offset += int64(chunk.Size)
withinPartOffset += int64(chunk.Size)
}
found = true
}
}
entryName, dirName := s3a.getEntryNameAndDir(input)
// Check if versioning is configured for this bucket BEFORE creating any files
versioningState, vErr := s3a.getVersioningState(*input.Bucket)
if vErr == nil && versioningState == s3_constants.VersioningEnabled {
// For versioned buckets, create a version and return the version ID
versionId := generateVersionId()
versionFileName := s3a.getVersionFileName(versionId)
versionDir := dirName + "/" + entryName + ".versions"
// Move the completed object to the versions directory
err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
if versionEntry.Extended == nil {
versionEntry.Extended = make(map[string][]byte)
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Set object owner for versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != "key" {
versionEntry.Extended[k] = v
}
}
// Preserve SSE-KMS metadata from the first part (if any)
// SSE-KMS metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
if firstPartEntry.Extended != nil {
// Copy SSE-KMS metadata from the first part
if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
versionEntry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part (versioned)")
}
}
}
if pentry.Attributes.Mime != "" {
versionEntry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
versionEntry.Attributes.Mime = mime
}
versionEntry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
return nil, s3err.ErrInternalError
}
// Update the .versions directory metadata to indicate this is the latest version
err = s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName)
if err != nil {
glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err)
return nil, s3err.ErrInternalError
}
// For versioned buckets, don't create a main object file - all content is stored in .versions directory
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
}
} else if vErr == nil && versioningState == s3_constants.VersioningSuspended {
// For suspended versioning, add "null" version ID metadata and return "null" version ID
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != "key" {
entry.Extended[k] = v
}
}
// Preserve SSE-KMS metadata from the first part (if any)
// SSE-KMS metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
if firstPartEntry.Extended != nil {
// Copy SSE-KMS metadata from the first part
if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part (suspended versioning)")
}
}
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return nil, s3err.ErrInternalError
}
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
// VersionId field intentionally omitted for suspended versioning
}
} else {
// For non-versioned buckets, create main object file
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Set object owner for non-versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != "key" {
entry.Extended[k] = v
}
}
// Preserve SSE-KMS metadata from the first part (if any)
// SSE-KMS metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
if firstPartEntry.Extended != nil {
// Copy SSE-KMS metadata from the first part
if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part")
}
}
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
return nil, s3err.ErrInternalError
}
// For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
}
}
for _, deleteEntry := range deleteEntries {
//delete unused part data
glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name)
if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
}
}
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
return
}
func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) {
entryName := filepath.Base(*input.Key)
dirName := filepath.ToSlash(filepath.Dir(*input.Key))
if dirName == "." {
dirName = ""
}
if strings.HasPrefix(dirName, "/") {
dirName = dirName[1:]
}
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
// remove suffix '/'
if strings.HasSuffix(dirName, "/") {
dirName = dirName[:len(dirName)-1]
}
return entryName, dirName
}
func parsePartNumber(fileName string) (int, error) {
var partNumberString string
index := strings.Index(fileName, "_")
if index != -1 {
partNumberString = fileName[:index]
} else {
partNumberString = fileName[:len(fileName)-len(multipartExt)]
}
return strconv.Atoi(partNumberString)
}
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
glog.V(2).Infof("abortMultipartUpload input %v", input)
exists, err := s3a.exists(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true)
if err != nil {
glog.V(1).Infof("bucket %s abort upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
}
if exists {
err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true)
}
if err != nil {
glog.V(1).Infof("bucket %s remove upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrInternalError
}
return &s3.AbortMultipartUploadOutput{}, s3err.ErrNone
}
type ListMultipartUploadsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult"`
// copied from s3.ListMultipartUploadsOutput, the Uploads is not converting to <Upload></Upload>
Bucket *string `type:"string"`
Delimiter *string `type:"string"`
EncodingType *string `type:"string" enum:"EncodingType"`
IsTruncated *bool `type:"boolean"`
KeyMarker *string `type:"string"`
MaxUploads *int64 `type:"integer"`
NextKeyMarker *string `type:"string"`
NextUploadIdMarker *string `type:"string"`
Prefix *string `type:"string"`
UploadIdMarker *string `type:"string"`
Upload []*s3.MultipartUpload `locationName:"Upload" type:"list" flattened:"true"`
}
func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html
glog.V(2).Infof("listMultipartUploads input %v", input)
output = &ListMultipartUploadsResult{
Bucket: input.Bucket,
Delimiter: input.Delimiter,
EncodingType: input.EncodingType,
KeyMarker: input.KeyMarker,
MaxUploads: input.MaxUploads,
Prefix: input.Prefix,
IsTruncated: aws.Bool(false),
}
entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32)
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
}
uploadsCount := int64(0)
for _, entry := range entries {
if entry.Extended != nil {
key := string(entry.Extended["key"])
if *input.KeyMarker != "" && *input.KeyMarker != key {
continue
}
if *input.Prefix != "" && !strings.HasPrefix(key, *input.Prefix) {
continue
}
output.Upload = append(output.Upload, &s3.MultipartUpload{
Key: objectKey(aws.String(key)),
UploadId: aws.String(entry.Name),
})
uploadsCount += 1
}
if uploadsCount >= *input.MaxUploads {
output.IsTruncated = aws.Bool(true)
output.NextUploadIdMarker = aws.String(entry.Name)
break
}
}
return
}
type ListPartsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult"`
// copied from s3.ListPartsOutput, the Parts is not converting to <Part></Part>
Bucket *string `type:"string"`
IsTruncated *bool `type:"boolean"`
Key *string `min:"1" type:"string"`
MaxParts *int64 `type:"integer"`
NextPartNumberMarker *int64 `type:"integer"`
PartNumberMarker *int64 `type:"integer"`
Part []*s3.Part `locationName:"Part" type:"list" flattened:"true"`
StorageClass *string `type:"string" enum:"StorageClass"`
UploadId *string `type:"string"`
}
func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
glog.V(2).Infof("listObjectParts input %v", input)
output = &ListPartsResult{
Bucket: input.Bucket,
Key: objectKey(input.Key),
UploadId: input.UploadId,
MaxParts: input.MaxParts, // the maximum number of parts to return.
PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
StorageClass: aws.String("STANDARD"),
}
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
}
// Note: The upload directory is sort of a marker of the existence of an multipart upload request.
// So can not just delete empty upload folders.
output.IsTruncated = aws.Bool(!isLast)
for _, entry := range entries {
if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory {
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue
}
output.Part = append(output.Part, &s3.Part{
PartNumber: aws.Int64(int64(partNumber)),
LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()),
Size: aws.Int64(int64(filer.FileSize(entry))),
ETag: aws.String("\"" + filer.ETag(entry) + "\""),
})
if !isLast {
output.NextPartNumberMarker = aws.Int64(int64(partNumber))
}
}
}
return
}
// maxInt returns the maximum of two int values
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
// MultipartEncryptionConfig holds pre-prepared encryption configuration to avoid error handling in callbacks
type MultipartEncryptionConfig struct {
// SSE-KMS configuration
IsSSEKMS bool
KMSKeyID string
BucketKeyEnabled bool
EncryptionContext string
KMSBaseIVEncoded string
// SSE-S3 configuration
IsSSES3 bool
S3BaseIVEncoded string
S3KeyDataEncoded string
}
// prepareMultipartEncryptionConfig prepares encryption configuration with proper error handling
// This eliminates the need for criticalError variable in callback functions
func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, uploadIdString string) (*MultipartEncryptionConfig, error) {
config := &MultipartEncryptionConfig{}
// Prepare SSE-KMS configuration
if IsSSEKMSRequest(r) {
config.IsSSEKMS = true
config.KMSKeyID = r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
config.BucketKeyEnabled = strings.ToLower(r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled)) == "true"
config.EncryptionContext = r.Header.Get(s3_constants.AmzServerSideEncryptionContext)
// Generate and encode base IV with proper error handling
baseIV := make([]byte, s3_constants.AESBlockSize)
n, err := rand.Read(baseIV)
if err != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for SSE-KMS multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
}
// Prepare SSE-S3 configuration
if IsSSES3RequestInternal(r) {
config.IsSSES3 = true
// Generate and encode base IV with proper error handling
baseIV := make([]byte, s3_constants.AESBlockSize)
n, err := rand.Read(baseIV)
if err != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for SSE-S3 multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
// Generate and serialize SSE-S3 key with proper error handling
keyManager := GetSSES3KeyManager()
sseS3Key, err := keyManager.GetOrCreateKey("")
if err != nil {
return nil, fmt.Errorf("failed to generate SSE-S3 key for multipart upload: %v", err)
}
keyData, serErr := SerializeSSES3Metadata(sseS3Key)
if serErr != nil {
return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for multipart upload: %v", serErr)
}
config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData)
// Store key in manager for later retrieval
keyManager.StoreKey(sseS3Key)
glog.V(4).Infof("Stored SSE-S3 key %s for multipart upload %s", sseS3Key.KeyID, uploadIdString)
}
return config, nil
}
// applyMultipartEncryptionConfig applies pre-prepared encryption configuration to filer entry
// This function is guaranteed not to fail since all error-prone operations were done during preparation
func (s3a *S3ApiServer) applyMultipartEncryptionConfig(entry *filer_pb.Entry, config *MultipartEncryptionConfig) {
// Apply SSE-KMS configuration
if config.IsSSEKMS {
entry.Extended[s3_constants.SeaweedFSSSEKMSKeyID] = []byte(config.KMSKeyID)
if config.BucketKeyEnabled {
entry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled] = []byte("true")
}
if config.EncryptionContext != "" {
entry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext] = []byte(config.EncryptionContext)
}
entry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV] = []byte(config.KMSBaseIVEncoded)
glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-KMS settings with keyID %s", config.KMSKeyID)
}
// Apply SSE-S3 configuration
if config.IsSSES3 {
entry.Extended[s3_constants.SeaweedFSSSES3Encryption] = []byte(s3_constants.SSEAlgorithmAES256)
entry.Extended[s3_constants.SeaweedFSSSES3BaseIV] = []byte(config.S3BaseIVEncoded)
entry.Extended[s3_constants.SeaweedFSSSES3KeyData] = []byte(config.S3KeyDataEncoded)
glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-S3 settings")
}
}