1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/s3api/s3api_streaming_copy.go
Chris Lu b7b73016dd
S3 API: Add SSE-KMS (#7144)
* implement sse-c

* fix Content-Range

* adding tests

* Update s3_sse_c_test.go

* copy sse-c objects

* adding tests

* refactor

* multi reader

* remove extra write header call

* refactor

* SSE-C encrypted objects do not support HTTP Range requests

* robust

* fix server starts

* Update Makefile

* Update Makefile

* ci: remove SSE-C integration tests and workflows; delete test/s3/encryption/

* s3: SSE-C MD5 must be base64 (case-sensitive); fix validation, comparisons, metadata storage; update tests

* minor

* base64

* Update SSE-C_IMPLEMENTATION.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/s3api/s3api_object_handlers.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update SSE-C_IMPLEMENTATION.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* address comments

* fix test

* fix compilation

* Bucket Default Encryption

To complete the SSE-KMS implementation for production use:
Add AWS KMS Provider - Implement weed/kms/aws/aws_kms.go using AWS SDK
Integrate with S3 Handlers - Update PUT/GET object handlers to use SSE-KMS
Add Multipart Upload Support - Extend SSE-KMS to multipart uploads
Configuration Integration - Add KMS configuration to filer.toml
Documentation - Update SeaweedFS wiki with SSE-KMS usage examples

* store bucket sse config in proto

* add more tests

* Update SSE-C_IMPLEMENTATION.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Fix rebase errors and restore structured BucketMetadata API

Merge Conflict Fixes:
- Fixed merge conflicts in header.go (SSE-C and SSE-KMS headers)
- Fixed merge conflicts in s3api_errors.go (SSE-C and SSE-KMS error codes)
- Fixed merge conflicts in s3_sse_c.go (copy strategy constants)
- Fixed merge conflicts in s3api_object_handlers_copy.go (copy strategy usage)

API Restoration:
- Restored BucketMetadata struct with Tags, CORS, and Encryption fields
- Restored structured API functions: GetBucketMetadata, SetBucketMetadata, UpdateBucketMetadata
- Restored helper functions: UpdateBucketTags, UpdateBucketCORS, UpdateBucketEncryption
- Restored clear functions: ClearBucketTags, ClearBucketCORS, ClearBucketEncryption

Handler Updates:
- Updated GetBucketTaggingHandler to use GetBucketMetadata() directly
- Updated PutBucketTaggingHandler to use UpdateBucketTags()
- Updated DeleteBucketTaggingHandler to use ClearBucketTags()
- Updated CORS handlers to use UpdateBucketCORS() and ClearBucketCORS()
- Updated loadCORSFromBucketContent to use GetBucketMetadata()

Internal Function Updates:
- Updated getBucketMetadata() to return *BucketMetadata struct
- Updated setBucketMetadata() to accept *BucketMetadata struct
- Updated getBucketEncryptionMetadata() to use GetBucketMetadata()
- Updated setBucketEncryptionMetadata() to use SetBucketMetadata()

Benefits:
- Resolved all rebase conflicts while preserving both SSE-C and SSE-KMS functionality
- Maintained consistent structured API throughout the codebase
- Eliminated intermediate wrapper functions for cleaner code
- Proper error handling with better granularity
- All tests passing and build successful

The bucket metadata system now uses a unified, type-safe, structured API
that supports tags, CORS, and encryption configuration consistently.

* Fix updateEncryptionConfiguration for first-time bucket encryption setup

- Change getBucketEncryptionMetadata to getBucketMetadata to avoid failures when no encryption config exists
- Change setBucketEncryptionMetadata to setBucketMetadataWithEncryption for consistency
- This fixes the critical issue where bucket encryption configuration failed for buckets without existing encryption

Fixes: https://github.com/seaweedfs/seaweedfs/pull/7144#discussion_r2285669572

* Fix rebase conflicts and maintain structured BucketMetadata API

Resolved Conflicts:
- Fixed merge conflicts in s3api_bucket_config.go between structured API (HEAD) and old intermediate functions
- Kept modern structured API approach: UpdateBucketCORS, ClearBucketCORS, UpdateBucketEncryption
- Removed old intermediate functions: setBucketTags, deleteBucketTags, setBucketMetadataWithEncryption

API Consistency Maintained:
- updateCORSConfiguration: Uses UpdateBucketCORS() directly
- removeCORSConfiguration: Uses ClearBucketCORS() directly
- updateEncryptionConfiguration: Uses UpdateBucketEncryption() directly
- All structured API functions preserved: GetBucketMetadata, SetBucketMetadata, UpdateBucketMetadata

Benefits:
- Maintains clean separation between API layers
- Preserves atomic metadata updates with proper error handling
- Eliminates function indirection for better performance
- Consistent API usage pattern throughout codebase
- All tests passing and build successful

The bucket metadata system continues to use the unified, type-safe, structured API
that properly handles tags, CORS, and encryption configuration without any
intermediate wrapper functions.

* Fix complex rebase conflicts and maintain clean structured BucketMetadata API

Resolved Complex Conflicts:
- Fixed merge conflicts between modern structured API (HEAD) and mixed approach
- Removed duplicate function declarations that caused compilation errors
- Consistently chose structured API approach over intermediate functions

Fixed Functions:
- BucketMetadata struct: Maintained clean field alignment
- loadCORSFromBucketContent: Uses GetBucketMetadata() directly
- updateCORSConfiguration: Uses UpdateBucketCORS() directly
- removeCORSConfiguration: Uses ClearBucketCORS() directly
- getBucketMetadata: Returns *BucketMetadata struct consistently
- setBucketMetadata: Accepts *BucketMetadata struct consistently

Removed Duplicates:
- Eliminated duplicate GetBucketMetadata implementations
- Eliminated duplicate SetBucketMetadata implementations
- Eliminated duplicate UpdateBucketMetadata implementations
- Eliminated duplicate helper functions (UpdateBucketTags, etc.)

API Consistency Achieved:
- Single, unified BucketMetadata struct for all operations
- Atomic updates through UpdateBucketMetadata with function callbacks
- Type-safe operations with proper error handling
- No intermediate wrapper functions cluttering the API

Benefits:
- Clean, maintainable codebase with no function duplication
- Consistent structured API usage throughout all bucket operations
- Proper error handling and type safety
- Build successful and all tests passing

The bucket metadata system now has a completely clean, structured API
without any conflicts, duplicates, or inconsistencies.

* Update remaining functions to use new structured BucketMetadata APIs directly

Updated functions to follow the pattern established in bucket config:
- getEncryptionConfiguration() -> Uses GetBucketMetadata() directly
- removeEncryptionConfiguration() -> Uses ClearBucketEncryption() directly

Benefits:
- Consistent API usage pattern across all bucket metadata operations
- Simpler, more readable code that leverages the structured API
- Eliminates calls to intermediate legacy functions
- Better error handling and logging consistency
- All tests pass with improved functionality

This completes the transition to using the new structured BucketMetadata API
throughout the entire bucket configuration and encryption subsystem.

* Fix GitHub PR #7144 code review comments

Address all code review comments from Gemini Code Assist bot:

1. **High Priority - SSE-KMS Key Validation**: Fixed ValidateSSEKMSKey to allow empty KMS key ID
   - Empty key ID now indicates use of default KMS key (consistent with AWS behavior)
   - Updated ParseSSEKMSHeaders to call validation after parsing
   - Enhanced isValidKMSKeyID to reject keys with spaces and invalid characters

2. **Medium Priority - KMS Registry Error Handling**: Improved error collection in CloseAll
   - Now collects all provider close errors instead of only returning the last one
   - Uses proper error formatting with %w verb for error wrapping
   - Returns single error for one failure, combined message for multiple failures

3. **Medium Priority - Local KMS Aliases Consistency**: Fixed alias handling in CreateKey
   - Now updates the aliases slice in-place to maintain consistency
   - Ensures both p.keys map and key.Aliases slice use the same prefixed format

All changes maintain backward compatibility and improve error handling robustness.
Tests updated and passing for all scenarios including edge cases.

* Use errors.Join for KMS registry error handling

Replace manual string building with the more idiomatic errors.Join function:

- Removed manual error message concatenation with strings.Builder
- Simplified error handling logic by using errors.Join(allErrors...)
- Removed unnecessary string import
- Added errors import for errors.Join

This approach is cleaner, more idiomatic, and automatically handles:
- Returning nil for empty error slice
- Returning single error for one-element slice
- Properly formatting multiple errors with newlines

The errors.Join function was introduced in Go 1.20 and is the
recommended way to combine multiple errors.

* Update registry.go

* Fix GitHub PR #7144 latest review comments

Address all new code review comments from Gemini Code Assist bot:

1. **High Priority - SSE-KMS Detection Logic**: Tightened IsSSEKMSEncrypted function
   - Now relies only on the canonical x-amz-server-side-encryption header
   - Removed redundant check for x-amz-encrypted-data-key metadata
   - Prevents misinterpretation of objects with inconsistent metadata state
   - Updated test case to reflect correct behavior (encrypted data key only = false)

2. **Medium Priority - UUID Validation**: Enhanced KMS key ID validation
   - Replaced simplistic length/hyphen count check with proper regex validation
   - Added regexp import for robust UUID format checking
   - Regex pattern: ^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$
   - Prevents invalid formats like '------------------------------------' from passing

3. **Medium Priority - Alias Mutation Fix**: Avoided input slice modification
   - Changed CreateKey to not mutate the input aliases slice in-place
   - Uses local variable for modified alias to prevent side effects
   - Maintains backward compatibility while being safer for callers

All changes improve code robustness and follow AWS S3 standards more closely.
Tests updated and passing for all scenarios including edge cases.

* Fix failing SSE tests

Address two failing test cases:

1. **TestSSEHeaderConflicts**: Fixed SSE-C and SSE-KMS mutual exclusion
   - Modified IsSSECRequest to return false if SSE-KMS headers are present
   - Modified IsSSEKMSRequest to return false if SSE-C headers are present
   - This prevents both detection functions from returning true simultaneously
   - Aligns with AWS S3 behavior where SSE-C and SSE-KMS are mutually exclusive

2. **TestBucketEncryptionEdgeCases**: Fixed XML namespace validation
   - Added namespace validation in encryptionConfigFromXMLBytes function
   - Now rejects XML with invalid namespaces (only allows empty or AWS standard namespace)
   - Validates XMLName.Space to ensure proper XML structure
   - Prevents acceptance of malformed XML with incorrect namespaces

Both fixes improve compliance with AWS S3 standards and prevent invalid
configurations from being accepted. All SSE and bucket encryption tests
now pass successfully.

* Fix GitHub PR #7144 latest review comments

Address two new code review comments from Gemini Code Assist bot:

1. **High Priority - Race Condition in UpdateBucketMetadata**: Fixed thread safety issue
   - Added per-bucket locking mechanism to prevent race conditions
   - Introduced bucketMetadataLocks map with RWMutex for each bucket
   - Added getBucketMetadataLock helper with double-checked locking pattern
   - UpdateBucketMetadata now uses bucket-specific locks to serialize metadata updates
   - Prevents last-writer-wins scenarios when concurrent requests update different metadata parts

2. **Medium Priority - KMS Key ARN Validation**: Improved robustness of ARN validation
   - Enhanced isValidKMSKeyID function to strictly validate ARN structure
   - Changed from 'len(parts) >= 6' to 'len(parts) != 6' for exact part count
   - Added proper resource validation for key/ and alias/ prefixes
   - Prevents malformed ARNs with incorrect structure from being accepted
   - Now validates: arn:aws:kms:region:account:key/keyid or arn:aws:kms:region:account:alias/aliasname

Both fixes improve system reliability and prevent edge cases that could cause
data corruption or security issues. All existing tests continue to pass.

* format

* address comments

* Configuration Adapter

* Regex Optimization

* Caching Integration

* add negative cache for non-existent buckets

* remove bucketMetadataLocks

* address comments

* address comments

* copying objects with sse-kms

* copying strategy

* store IV in entry metadata

* implement compression reader

* extract json map as sse kms context

* bucket key

* comments

* rotate sse chunks

* KMS Data Keys use AES-GCM + nonce

* add comments

* Update weed/s3api/s3_sse_kms.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update s3api_object_handlers_put.go

* get IV from response header

* set sse headers

* Update s3api_object_handlers.go

* deterministic JSON marshaling

* store iv in entry metadata

* address comments

* not used

* store iv in destination metadata

ensures that SSE-C copy operations with re-encryption (decrypt/re-encrypt scenario) now properly store the destination encryption metadata

* add todo

* address comments

* SSE-S3 Deserialization

* add BucketKMSCache to BucketConfig

* fix test compilation

* already not empty

* use constants

* fix: critical metadata (encrypted data keys, encryption context, etc.) was never stored during PUT/copy operations

* address comments

* fix tests

* Fix SSE-KMS Copy Re-encryption

* Cache now persists across requests

* fix test

* iv in metadata only

* SSE-KMS copy operations should follow the same pattern as SSE-C

* fix size overhead calculation

* Filer-Side SSE Metadata Processing

* SSE Integration Tests

* fix tests

* clean up

* Update s3_sse_multipart_test.go

* add s3 sse tests

* unused

* add logs

* Update Makefile

* Update Makefile

* s3 health check

* The tests were failing because they tried to run both SSE-C and SSE-KMS tests

* Update weed/s3api/s3_sse_c.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update Makefile

* add back

* Update Makefile

* address comments

* fix tests

* Update s3-sse-tests.yml

* Update s3-sse-tests.yml

* fix sse-kms for PUT operation

* IV

* Update auth_credentials.go

* fix multipart with kms

* constants

* multipart sse kms

Modified handleSSEKMSResponse to detect multipart SSE-KMS objects
Added createMultipartSSEKMSDecryptedReader to handle each chunk independently
Each chunk now gets its own decrypted reader before combining into the final stream

* validate key id

* add SSEType

* permissive kms key format

* Update s3_sse_kms_test.go

* format

* assert equal

* uploading SSE-KMS metadata per chunk

* persist sse type and metadata

* avoid re-chunk multipart uploads

* decryption process to use stored PartOffset values

* constants

* sse-c multipart upload

* Unified Multipart SSE Copy

* purge

* fix fatalf

* avoid io.MultiReader which does not close underlying readers

* unified cross-encryption

* fix Single-object SSE-C

* adjust constants

* range read sse files

* remove debug logs

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-08-21 08:28:07 -07:00

561 lines
17 KiB
Go

package s3api
import (
"context"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// StreamingCopySpec defines the specification for streaming copy operations
type StreamingCopySpec struct {
SourceReader io.Reader
TargetSize int64
EncryptionSpec *EncryptionSpec
CompressionSpec *CompressionSpec
HashCalculation bool
BufferSize int
}
// EncryptionSpec defines encryption parameters for streaming
type EncryptionSpec struct {
NeedsDecryption bool
NeedsEncryption bool
SourceKey interface{} // SSECustomerKey or SSEKMSKey
DestinationKey interface{} // SSECustomerKey or SSEKMSKey
SourceType EncryptionType
DestinationType EncryptionType
SourceMetadata map[string][]byte // Source metadata for IV extraction
DestinationIV []byte // Generated IV for destination
}
// CompressionSpec defines compression parameters for streaming
type CompressionSpec struct {
IsCompressed bool
CompressionType string
NeedsDecompression bool
NeedsCompression bool
}
// StreamingCopyManager handles streaming copy operations
type StreamingCopyManager struct {
s3a *S3ApiServer
bufferSize int
}
// NewStreamingCopyManager creates a new streaming copy manager
func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
return &StreamingCopyManager{
s3a: s3a,
bufferSize: 64 * 1024, // 64KB default buffer
}
}
// ExecuteStreamingCopy performs a streaming copy operation
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
// Create streaming copy specification
spec, err := scm.createStreamingSpec(entry, r, state)
if err != nil {
return nil, fmt.Errorf("create streaming spec: %w", err)
}
// Create source reader from entry
sourceReader, err := scm.createSourceReader(entry)
if err != nil {
return nil, fmt.Errorf("create source reader: %w", err)
}
defer sourceReader.Close()
spec.SourceReader = sourceReader
// Create processing pipeline
processedReader, err := scm.createProcessingPipeline(spec)
if err != nil {
return nil, fmt.Errorf("create processing pipeline: %w", err)
}
// Stream to destination
return scm.streamToDestination(ctx, processedReader, spec, dstPath)
}
// createStreamingSpec creates a streaming specification based on copy parameters
func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
spec := &StreamingCopySpec{
BufferSize: scm.bufferSize,
HashCalculation: true,
}
// Calculate target size
sizeCalc := NewCopySizeCalculator(entry, r)
spec.TargetSize = sizeCalc.CalculateTargetSize()
// Create encryption specification
encSpec, err := scm.createEncryptionSpec(entry, r, state)
if err != nil {
return nil, err
}
spec.EncryptionSpec = encSpec
// Create compression specification
spec.CompressionSpec = scm.createCompressionSpec(entry, r)
return spec, nil
}
// createEncryptionSpec creates encryption specification for streaming
func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
spec := &EncryptionSpec{
NeedsDecryption: state.IsSourceEncrypted(),
NeedsEncryption: state.IsTargetEncrypted(),
SourceMetadata: entry.Extended, // Pass source metadata for IV extraction
}
// Set source encryption details
if state.SrcSSEC {
spec.SourceType = EncryptionTypeSSEC
sourceKey, err := ParseSSECCopySourceHeaders(r)
if err != nil {
return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
}
spec.SourceKey = sourceKey
} else if state.SrcSSEKMS {
spec.SourceType = EncryptionTypeSSEKMS
// Extract SSE-KMS key from metadata
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
sseKey, err := DeserializeSSEKMSMetadata(keyData)
if err != nil {
return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
}
spec.SourceKey = sseKey
}
} else if state.SrcSSES3 {
spec.SourceType = EncryptionTypeSSES3
// Extract SSE-S3 key from metadata
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
// TODO: This should use a proper SSE-S3 key manager from S3ApiServer
// For now, create a temporary key manager to handle deserialization
tempKeyManager := NewSSES3KeyManager()
sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
if err != nil {
return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
}
spec.SourceKey = sseKey
}
}
// Set destination encryption details
if state.DstSSEC {
spec.DestinationType = EncryptionTypeSSEC
destKey, err := ParseSSECHeaders(r)
if err != nil {
return nil, fmt.Errorf("parse SSE-C headers: %w", err)
}
spec.DestinationKey = destKey
} else if state.DstSSEKMS {
spec.DestinationType = EncryptionTypeSSEKMS
// Parse KMS parameters
keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
if err != nil {
return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
}
// Create SSE-KMS key for destination
sseKey := &SSEKMSKey{
KeyID: keyID,
EncryptionContext: encryptionContext,
BucketKeyEnabled: bucketKeyEnabled,
}
spec.DestinationKey = sseKey
} else if state.DstSSES3 {
spec.DestinationType = EncryptionTypeSSES3
// Generate or retrieve SSE-S3 key
keyManager := GetSSES3KeyManager()
sseKey, err := keyManager.GetOrCreateKey("")
if err != nil {
return nil, fmt.Errorf("get SSE-S3 key: %w", err)
}
spec.DestinationKey = sseKey
}
return spec, nil
}
// createCompressionSpec creates compression specification for streaming
func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
return &CompressionSpec{
IsCompressed: isCompressedEntry(entry),
// For now, we don't change compression during copy
NeedsDecompression: false,
NeedsCompression: false,
}
}
// createSourceReader creates a reader for the source entry
func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
// Create a multi-chunk reader that streams from all chunks
return scm.s3a.createMultiChunkReader(entry)
}
// createProcessingPipeline creates a processing pipeline for the copy operation
func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
reader := spec.SourceReader
// Add decryption if needed
if spec.EncryptionSpec.NeedsDecryption {
decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
if err != nil {
return nil, fmt.Errorf("create decryption reader: %w", err)
}
reader = decryptedReader
}
// Add decompression if needed
if spec.CompressionSpec.NeedsDecompression {
decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
if err != nil {
return nil, fmt.Errorf("create decompression reader: %w", err)
}
reader = decompressedReader
}
// Add compression if needed
if spec.CompressionSpec.NeedsCompression {
compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
if err != nil {
return nil, fmt.Errorf("create compression reader: %w", err)
}
reader = compressedReader
}
// Add encryption if needed
if spec.EncryptionSpec.NeedsEncryption {
encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
if err != nil {
return nil, fmt.Errorf("create encryption reader: %w", err)
}
reader = encryptedReader
}
// Add hash calculation if needed
if spec.HashCalculation {
reader = scm.createHashReader(reader)
}
return reader, nil
}
// createDecryptionReader creates a decryption reader based on encryption type
func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
switch encSpec.SourceType {
case EncryptionTypeSSEC:
if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
// Get IV from metadata
iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
if err != nil {
return nil, fmt.Errorf("get IV from metadata: %w", err)
}
return CreateSSECDecryptedReader(reader, sourceKey, iv)
}
return nil, fmt.Errorf("invalid SSE-C source key type")
case EncryptionTypeSSEKMS:
if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
return CreateSSEKMSDecryptedReader(reader, sseKey)
}
return nil, fmt.Errorf("invalid SSE-KMS source key type")
case EncryptionTypeSSES3:
if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
// Get IV from metadata
iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
if err != nil {
return nil, fmt.Errorf("get IV from metadata: %w", err)
}
return CreateSSES3DecryptedReader(reader, sseKey, iv)
}
return nil, fmt.Errorf("invalid SSE-S3 source key type")
default:
return reader, nil
}
}
// createEncryptionReader creates an encryption reader based on encryption type
func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
switch encSpec.DestinationType {
case EncryptionTypeSSEC:
if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
if err != nil {
return nil, err
}
// Store IV in destination metadata (this would need to be handled by caller)
encSpec.DestinationIV = iv
return encryptedReader, nil
}
return nil, fmt.Errorf("invalid SSE-C destination key type")
case EncryptionTypeSSEKMS:
if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
if err != nil {
return nil, err
}
// Store IV from the updated key
encSpec.DestinationIV = updatedKey.IV
return encryptedReader, nil
}
return nil, fmt.Errorf("invalid SSE-KMS destination key type")
case EncryptionTypeSSES3:
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
if err != nil {
return nil, err
}
// Store IV for metadata
encSpec.DestinationIV = iv
return encryptedReader, nil
}
return nil, fmt.Errorf("invalid SSE-S3 destination key type")
default:
return reader, nil
}
}
// createDecompressionReader creates a decompression reader
func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
if !compSpec.NeedsDecompression {
return reader, nil
}
switch compSpec.CompressionType {
case "gzip":
// Use SeaweedFS's streaming gzip decompression
pr, pw := io.Pipe()
go func() {
defer pw.Close()
_, err := util.GunzipStream(pw, reader)
if err != nil {
pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
}
}()
return pr, nil
default:
// Unknown compression type, return as-is
return reader, nil
}
}
// createCompressionReader creates a compression reader
func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
if !compSpec.NeedsCompression {
return reader, nil
}
switch compSpec.CompressionType {
case "gzip":
// Use SeaweedFS's streaming gzip compression
pr, pw := io.Pipe()
go func() {
defer pw.Close()
_, err := util.GzipStream(pw, reader)
if err != nil {
pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
}
}()
return pr, nil
default:
// Unknown compression type, return as-is
return reader, nil
}
}
// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
type HashReader struct {
reader io.Reader
md5Hash hash.Hash
sha256Hash hash.Hash
}
// NewHashReader creates a new hash calculating reader
func NewHashReader(reader io.Reader) *HashReader {
return &HashReader{
reader: reader,
md5Hash: md5.New(),
sha256Hash: sha256.New(),
}
}
// Read implements io.Reader and calculates hashes as data flows through
func (hr *HashReader) Read(p []byte) (n int, err error) {
n, err = hr.reader.Read(p)
if n > 0 {
// Update both hashes with the data read
hr.md5Hash.Write(p[:n])
hr.sha256Hash.Write(p[:n])
}
return n, err
}
// MD5Sum returns the current MD5 hash
func (hr *HashReader) MD5Sum() []byte {
return hr.md5Hash.Sum(nil)
}
// SHA256Sum returns the current SHA256 hash
func (hr *HashReader) SHA256Sum() []byte {
return hr.sha256Hash.Sum(nil)
}
// MD5Hex returns the MD5 hash as a hex string
func (hr *HashReader) MD5Hex() string {
return hex.EncodeToString(hr.MD5Sum())
}
// SHA256Hex returns the SHA256 hash as a hex string
func (hr *HashReader) SHA256Hex() string {
return hex.EncodeToString(hr.SHA256Sum())
}
// createHashReader creates a hash calculation reader
func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
return NewHashReader(reader)
}
// streamToDestination streams the processed data to the destination
func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
// For now, we'll use the existing chunk-based approach
// In a full implementation, this would stream directly to the destination
// without creating intermediate chunks
// This is a placeholder that converts back to chunk-based approach
// A full streaming implementation would write directly to the destination
return scm.streamToChunks(ctx, reader, spec, dstPath)
}
// streamToChunks converts streaming data back to chunks (temporary implementation)
func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
// This is a simplified implementation that reads the stream and creates chunks
// A full implementation would be more sophisticated
var chunks []*filer_pb.FileChunk
buffer := make([]byte, spec.BufferSize)
offset := int64(0)
for {
n, err := reader.Read(buffer)
if n > 0 {
// Create chunk for this data
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
if chunkErr != nil {
return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
}
chunks = append(chunks, chunk)
offset += int64(n)
}
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("read stream: %w", err)
}
}
return chunks, nil
}
// createChunkFromData creates a chunk from streaming data
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
// Assign new volume
assignResult, err := scm.s3a.assignNewVolume(dstPath)
if err != nil {
return nil, fmt.Errorf("assign volume: %w", err)
}
// Create chunk
chunk := &filer_pb.FileChunk{
Offset: offset,
Size: uint64(len(data)),
}
// Set file ID
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
return nil, err
}
// Upload data
if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
return nil, fmt.Errorf("upload chunk data: %w", err)
}
return chunk, nil
}
// createMultiChunkReader creates a reader that streams from multiple chunks
func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
// Create a multi-reader that combines all chunks
var readers []io.Reader
for _, chunk := range entry.GetChunks() {
chunkReader, err := s3a.createChunkReader(chunk)
if err != nil {
return nil, fmt.Errorf("create chunk reader: %w", err)
}
readers = append(readers, chunkReader)
}
multiReader := io.MultiReader(readers...)
return &multiReadCloser{reader: multiReader}, nil
}
// createChunkReader creates a reader for a single chunk
func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
// Get chunk URL
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
if err != nil {
return nil, fmt.Errorf("lookup volume URL: %w", err)
}
// Create HTTP request for chunk data
req, err := http.NewRequest("GET", srcUrl, nil)
if err != nil {
return nil, fmt.Errorf("create HTTP request: %w", err)
}
// Execute request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("execute HTTP request: %w", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
}
return resp.Body, nil
}
// multiReadCloser wraps a multi-reader with a close method
type multiReadCloser struct {
reader io.Reader
}
func (mrc *multiReadCloser) Read(p []byte) (int, error) {
return mrc.reader.Read(p)
}
func (mrc *multiReadCloser) Close() error {
return nil
}