1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/s3api/s3api_bucket_config.go
Chris Lu b7b73016dd
S3 API: Add SSE-KMS (#7144)
* implement sse-c

* fix Content-Range

* adding tests

* Update s3_sse_c_test.go

* copy sse-c objects

* adding tests

* refactor

* multi reader

* remove extra write header call

* refactor

* SSE-C encrypted objects do not support HTTP Range requests

* robust

* fix server starts

* Update Makefile

* Update Makefile

* ci: remove SSE-C integration tests and workflows; delete test/s3/encryption/

* s3: SSE-C MD5 must be base64 (case-sensitive); fix validation, comparisons, metadata storage; update tests

* minor

* base64

* Update SSE-C_IMPLEMENTATION.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/s3api/s3api_object_handlers.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update SSE-C_IMPLEMENTATION.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* address comments

* fix test

* fix compilation

* Bucket Default Encryption

To complete the SSE-KMS implementation for production use:
Add AWS KMS Provider - Implement weed/kms/aws/aws_kms.go using AWS SDK
Integrate with S3 Handlers - Update PUT/GET object handlers to use SSE-KMS
Add Multipart Upload Support - Extend SSE-KMS to multipart uploads
Configuration Integration - Add KMS configuration to filer.toml
Documentation - Update SeaweedFS wiki with SSE-KMS usage examples

* store bucket sse config in proto

* add more tests

* Update SSE-C_IMPLEMENTATION.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Fix rebase errors and restore structured BucketMetadata API

Merge Conflict Fixes:
- Fixed merge conflicts in header.go (SSE-C and SSE-KMS headers)
- Fixed merge conflicts in s3api_errors.go (SSE-C and SSE-KMS error codes)
- Fixed merge conflicts in s3_sse_c.go (copy strategy constants)
- Fixed merge conflicts in s3api_object_handlers_copy.go (copy strategy usage)

API Restoration:
- Restored BucketMetadata struct with Tags, CORS, and Encryption fields
- Restored structured API functions: GetBucketMetadata, SetBucketMetadata, UpdateBucketMetadata
- Restored helper functions: UpdateBucketTags, UpdateBucketCORS, UpdateBucketEncryption
- Restored clear functions: ClearBucketTags, ClearBucketCORS, ClearBucketEncryption

Handler Updates:
- Updated GetBucketTaggingHandler to use GetBucketMetadata() directly
- Updated PutBucketTaggingHandler to use UpdateBucketTags()
- Updated DeleteBucketTaggingHandler to use ClearBucketTags()
- Updated CORS handlers to use UpdateBucketCORS() and ClearBucketCORS()
- Updated loadCORSFromBucketContent to use GetBucketMetadata()

Internal Function Updates:
- Updated getBucketMetadata() to return *BucketMetadata struct
- Updated setBucketMetadata() to accept *BucketMetadata struct
- Updated getBucketEncryptionMetadata() to use GetBucketMetadata()
- Updated setBucketEncryptionMetadata() to use SetBucketMetadata()

Benefits:
- Resolved all rebase conflicts while preserving both SSE-C and SSE-KMS functionality
- Maintained consistent structured API throughout the codebase
- Eliminated intermediate wrapper functions for cleaner code
- Proper error handling with better granularity
- All tests passing and build successful

The bucket metadata system now uses a unified, type-safe, structured API
that supports tags, CORS, and encryption configuration consistently.

* Fix updateEncryptionConfiguration for first-time bucket encryption setup

- Change getBucketEncryptionMetadata to getBucketMetadata to avoid failures when no encryption config exists
- Change setBucketEncryptionMetadata to setBucketMetadataWithEncryption for consistency
- This fixes the critical issue where bucket encryption configuration failed for buckets without existing encryption

Fixes: https://github.com/seaweedfs/seaweedfs/pull/7144#discussion_r2285669572

* Fix rebase conflicts and maintain structured BucketMetadata API

Resolved Conflicts:
- Fixed merge conflicts in s3api_bucket_config.go between structured API (HEAD) and old intermediate functions
- Kept modern structured API approach: UpdateBucketCORS, ClearBucketCORS, UpdateBucketEncryption
- Removed old intermediate functions: setBucketTags, deleteBucketTags, setBucketMetadataWithEncryption

API Consistency Maintained:
- updateCORSConfiguration: Uses UpdateBucketCORS() directly
- removeCORSConfiguration: Uses ClearBucketCORS() directly
- updateEncryptionConfiguration: Uses UpdateBucketEncryption() directly
- All structured API functions preserved: GetBucketMetadata, SetBucketMetadata, UpdateBucketMetadata

Benefits:
- Maintains clean separation between API layers
- Preserves atomic metadata updates with proper error handling
- Eliminates function indirection for better performance
- Consistent API usage pattern throughout codebase
- All tests passing and build successful

The bucket metadata system continues to use the unified, type-safe, structured API
that properly handles tags, CORS, and encryption configuration without any
intermediate wrapper functions.

* Fix complex rebase conflicts and maintain clean structured BucketMetadata API

Resolved Complex Conflicts:
- Fixed merge conflicts between modern structured API (HEAD) and mixed approach
- Removed duplicate function declarations that caused compilation errors
- Consistently chose structured API approach over intermediate functions

Fixed Functions:
- BucketMetadata struct: Maintained clean field alignment
- loadCORSFromBucketContent: Uses GetBucketMetadata() directly
- updateCORSConfiguration: Uses UpdateBucketCORS() directly
- removeCORSConfiguration: Uses ClearBucketCORS() directly
- getBucketMetadata: Returns *BucketMetadata struct consistently
- setBucketMetadata: Accepts *BucketMetadata struct consistently

Removed Duplicates:
- Eliminated duplicate GetBucketMetadata implementations
- Eliminated duplicate SetBucketMetadata implementations
- Eliminated duplicate UpdateBucketMetadata implementations
- Eliminated duplicate helper functions (UpdateBucketTags, etc.)

API Consistency Achieved:
- Single, unified BucketMetadata struct for all operations
- Atomic updates through UpdateBucketMetadata with function callbacks
- Type-safe operations with proper error handling
- No intermediate wrapper functions cluttering the API

Benefits:
- Clean, maintainable codebase with no function duplication
- Consistent structured API usage throughout all bucket operations
- Proper error handling and type safety
- Build successful and all tests passing

The bucket metadata system now has a completely clean, structured API
without any conflicts, duplicates, or inconsistencies.

* Update remaining functions to use new structured BucketMetadata APIs directly

Updated functions to follow the pattern established in bucket config:
- getEncryptionConfiguration() -> Uses GetBucketMetadata() directly
- removeEncryptionConfiguration() -> Uses ClearBucketEncryption() directly

Benefits:
- Consistent API usage pattern across all bucket metadata operations
- Simpler, more readable code that leverages the structured API
- Eliminates calls to intermediate legacy functions
- Better error handling and logging consistency
- All tests pass with improved functionality

This completes the transition to using the new structured BucketMetadata API
throughout the entire bucket configuration and encryption subsystem.

* Fix GitHub PR #7144 code review comments

Address all code review comments from Gemini Code Assist bot:

1. **High Priority - SSE-KMS Key Validation**: Fixed ValidateSSEKMSKey to allow empty KMS key ID
   - Empty key ID now indicates use of default KMS key (consistent with AWS behavior)
   - Updated ParseSSEKMSHeaders to call validation after parsing
   - Enhanced isValidKMSKeyID to reject keys with spaces and invalid characters

2. **Medium Priority - KMS Registry Error Handling**: Improved error collection in CloseAll
   - Now collects all provider close errors instead of only returning the last one
   - Uses proper error formatting with %w verb for error wrapping
   - Returns single error for one failure, combined message for multiple failures

3. **Medium Priority - Local KMS Aliases Consistency**: Fixed alias handling in CreateKey
   - Now updates the aliases slice in-place to maintain consistency
   - Ensures both p.keys map and key.Aliases slice use the same prefixed format

All changes maintain backward compatibility and improve error handling robustness.
Tests updated and passing for all scenarios including edge cases.

* Use errors.Join for KMS registry error handling

Replace manual string building with the more idiomatic errors.Join function:

- Removed manual error message concatenation with strings.Builder
- Simplified error handling logic by using errors.Join(allErrors...)
- Removed unnecessary string import
- Added errors import for errors.Join

This approach is cleaner, more idiomatic, and automatically handles:
- Returning nil for empty error slice
- Returning single error for one-element slice
- Properly formatting multiple errors with newlines

The errors.Join function was introduced in Go 1.20 and is the
recommended way to combine multiple errors.

* Update registry.go

* Fix GitHub PR #7144 latest review comments

Address all new code review comments from Gemini Code Assist bot:

1. **High Priority - SSE-KMS Detection Logic**: Tightened IsSSEKMSEncrypted function
   - Now relies only on the canonical x-amz-server-side-encryption header
   - Removed redundant check for x-amz-encrypted-data-key metadata
   - Prevents misinterpretation of objects with inconsistent metadata state
   - Updated test case to reflect correct behavior (encrypted data key only = false)

2. **Medium Priority - UUID Validation**: Enhanced KMS key ID validation
   - Replaced simplistic length/hyphen count check with proper regex validation
   - Added regexp import for robust UUID format checking
   - Regex pattern: ^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$
   - Prevents invalid formats like '------------------------------------' from passing

3. **Medium Priority - Alias Mutation Fix**: Avoided input slice modification
   - Changed CreateKey to not mutate the input aliases slice in-place
   - Uses local variable for modified alias to prevent side effects
   - Maintains backward compatibility while being safer for callers

All changes improve code robustness and follow AWS S3 standards more closely.
Tests updated and passing for all scenarios including edge cases.

* Fix failing SSE tests

Address two failing test cases:

1. **TestSSEHeaderConflicts**: Fixed SSE-C and SSE-KMS mutual exclusion
   - Modified IsSSECRequest to return false if SSE-KMS headers are present
   - Modified IsSSEKMSRequest to return false if SSE-C headers are present
   - This prevents both detection functions from returning true simultaneously
   - Aligns with AWS S3 behavior where SSE-C and SSE-KMS are mutually exclusive

2. **TestBucketEncryptionEdgeCases**: Fixed XML namespace validation
   - Added namespace validation in encryptionConfigFromXMLBytes function
   - Now rejects XML with invalid namespaces (only allows empty or AWS standard namespace)
   - Validates XMLName.Space to ensure proper XML structure
   - Prevents acceptance of malformed XML with incorrect namespaces

Both fixes improve compliance with AWS S3 standards and prevent invalid
configurations from being accepted. All SSE and bucket encryption tests
now pass successfully.

* Fix GitHub PR #7144 latest review comments

Address two new code review comments from Gemini Code Assist bot:

1. **High Priority - Race Condition in UpdateBucketMetadata**: Fixed thread safety issue
   - Added per-bucket locking mechanism to prevent race conditions
   - Introduced bucketMetadataLocks map with RWMutex for each bucket
   - Added getBucketMetadataLock helper with double-checked locking pattern
   - UpdateBucketMetadata now uses bucket-specific locks to serialize metadata updates
   - Prevents last-writer-wins scenarios when concurrent requests update different metadata parts

2. **Medium Priority - KMS Key ARN Validation**: Improved robustness of ARN validation
   - Enhanced isValidKMSKeyID function to strictly validate ARN structure
   - Changed from 'len(parts) >= 6' to 'len(parts) != 6' for exact part count
   - Added proper resource validation for key/ and alias/ prefixes
   - Prevents malformed ARNs with incorrect structure from being accepted
   - Now validates: arn:aws:kms:region:account:key/keyid or arn:aws:kms:region:account:alias/aliasname

Both fixes improve system reliability and prevent edge cases that could cause
data corruption or security issues. All existing tests continue to pass.

* format

* address comments

* Configuration Adapter

* Regex Optimization

* Caching Integration

* add negative cache for non-existent buckets

* remove bucketMetadataLocks

* address comments

* address comments

* copying objects with sse-kms

* copying strategy

* store IV in entry metadata

* implement compression reader

* extract json map as sse kms context

* bucket key

* comments

* rotate sse chunks

* KMS Data Keys use AES-GCM + nonce

* add comments

* Update weed/s3api/s3_sse_kms.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update s3api_object_handlers_put.go

* get IV from response header

* set sse headers

* Update s3api_object_handlers.go

* deterministic JSON marshaling

* store iv in entry metadata

* address comments

* not used

* store iv in destination metadata

ensures that SSE-C copy operations with re-encryption (decrypt/re-encrypt scenario) now properly store the destination encryption metadata

* add todo

* address comments

* SSE-S3 Deserialization

* add BucketKMSCache to BucketConfig

* fix test compilation

* already not empty

* use constants

* fix: critical metadata (encrypted data keys, encryption context, etc.) was never stored during PUT/copy operations

* address comments

* fix tests

* Fix SSE-KMS Copy Re-encryption

* Cache now persists across requests

* fix test

* iv in metadata only

* SSE-KMS copy operations should follow the same pattern as SSE-C

* fix size overhead calculation

* Filer-Side SSE Metadata Processing

* SSE Integration Tests

* fix tests

* clean up

* Update s3_sse_multipart_test.go

* add s3 sse tests

* unused

* add logs

* Update Makefile

* Update Makefile

* s3 health check

* The tests were failing because they tried to run both SSE-C and SSE-KMS tests

* Update weed/s3api/s3_sse_c.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update Makefile

* add back

* Update Makefile

* address comments

* fix tests

* Update s3-sse-tests.yml

* Update s3-sse-tests.yml

* fix sse-kms for PUT operation

* IV

* Update auth_credentials.go

* fix multipart with kms

* constants

* multipart sse kms

Modified handleSSEKMSResponse to detect multipart SSE-KMS objects
Added createMultipartSSEKMSDecryptedReader to handle each chunk independently
Each chunk now gets its own decrypted reader before combining into the final stream

* validate key id

* add SSEType

* permissive kms key format

* Update s3_sse_kms_test.go

* format

* assert equal

* uploading SSE-KMS metadata per chunk

* persist sse type and metadata

* avoid re-chunk multipart uploads

* decryption process to use stored PartOffset values

* constants

* sse-c multipart upload

* Unified Multipart SSE Copy

* purge

* fix fatalf

* avoid io.MultiReader which does not close underlying readers

* unified cross-encryption

* fix Single-object SSE-C

* adjust constants

* range read sse files

* remove debug logs

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-08-21 08:28:07 -07:00

947 lines
31 KiB
Go

package s3api
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/service/s3"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/kms"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/cors"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// BucketConfig represents cached bucket configuration
type BucketConfig struct {
Name string
Versioning string // "Enabled", "Suspended", or ""
Ownership string
ACL []byte
Owner string
IsPublicRead bool // Cached flag to avoid JSON parsing on every request
CORS *cors.CORSConfiguration
ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations
LastModified time.Time
Entry *filer_pb.Entry
}
// BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations
// This provides better isolation and automatic cleanup compared to global caching
type BucketKMSCache struct {
cache map[string]*BucketKMSCacheEntry // Key: contextHash, Value: cached data key
mutex sync.RWMutex
bucket string // Bucket name for logging/debugging
lastTTL time.Duration // TTL used for cache entries (typically 1 hour)
}
// BucketKMSCacheEntry represents a single cached KMS data key
type BucketKMSCacheEntry struct {
DataKey interface{} // Could be *kms.GenerateDataKeyResponse or similar
ExpiresAt time.Time
KeyID string
ContextHash string // Hash of encryption context for cache validation
}
// NewBucketKMSCache creates a new per-bucket KMS key cache
func NewBucketKMSCache(bucketName string, ttl time.Duration) *BucketKMSCache {
return &BucketKMSCache{
cache: make(map[string]*BucketKMSCacheEntry),
bucket: bucketName,
lastTTL: ttl,
}
}
// Get retrieves a cached KMS data key if it exists and hasn't expired
func (bkc *BucketKMSCache) Get(contextHash string) (*BucketKMSCacheEntry, bool) {
if bkc == nil {
return nil, false
}
bkc.mutex.RLock()
defer bkc.mutex.RUnlock()
entry, exists := bkc.cache[contextHash]
if !exists {
return nil, false
}
// Check if entry has expired
if time.Now().After(entry.ExpiresAt) {
return nil, false
}
return entry, true
}
// Set stores a KMS data key in the cache
func (bkc *BucketKMSCache) Set(contextHash, keyID string, dataKey interface{}, ttl time.Duration) {
if bkc == nil {
return
}
bkc.mutex.Lock()
defer bkc.mutex.Unlock()
bkc.cache[contextHash] = &BucketKMSCacheEntry{
DataKey: dataKey,
ExpiresAt: time.Now().Add(ttl),
KeyID: keyID,
ContextHash: contextHash,
}
bkc.lastTTL = ttl
}
// CleanupExpired removes expired entries from the cache
func (bkc *BucketKMSCache) CleanupExpired() int {
if bkc == nil {
return 0
}
bkc.mutex.Lock()
defer bkc.mutex.Unlock()
now := time.Now()
expiredCount := 0
for key, entry := range bkc.cache {
if now.After(entry.ExpiresAt) {
// Clear sensitive data before removing from cache
bkc.clearSensitiveData(entry)
delete(bkc.cache, key)
expiredCount++
}
}
return expiredCount
}
// Size returns the current number of cached entries
func (bkc *BucketKMSCache) Size() int {
if bkc == nil {
return 0
}
bkc.mutex.RLock()
defer bkc.mutex.RUnlock()
return len(bkc.cache)
}
// clearSensitiveData securely clears sensitive data from a cache entry
func (bkc *BucketKMSCache) clearSensitiveData(entry *BucketKMSCacheEntry) {
if dataKeyResp, ok := entry.DataKey.(*kms.GenerateDataKeyResponse); ok {
// Zero out the plaintext data key to prevent it from lingering in memory
if dataKeyResp.Plaintext != nil {
for i := range dataKeyResp.Plaintext {
dataKeyResp.Plaintext[i] = 0
}
dataKeyResp.Plaintext = nil
}
}
}
// Clear clears all cached KMS entries, securely zeroing sensitive data first
func (bkc *BucketKMSCache) Clear() {
if bkc == nil {
return
}
bkc.mutex.Lock()
defer bkc.mutex.Unlock()
// Clear sensitive data from all entries before deletion
for _, entry := range bkc.cache {
bkc.clearSensitiveData(entry)
}
// Clear the cache map
bkc.cache = make(map[string]*BucketKMSCacheEntry)
}
// BucketConfigCache provides caching for bucket configurations
// Cache entries are automatically updated/invalidated through metadata subscription events,
// so TTL serves as a safety fallback rather than the primary consistency mechanism
type BucketConfigCache struct {
cache map[string]*BucketConfig
negativeCache map[string]time.Time // Cache for non-existent buckets
mutex sync.RWMutex
ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events
negativeTTL time.Duration // TTL for negative cache entries
}
// BucketMetadata represents the complete metadata for a bucket
type BucketMetadata struct {
Tags map[string]string `json:"tags,omitempty"`
CORS *cors.CORSConfiguration `json:"cors,omitempty"`
Encryption *s3_pb.EncryptionConfiguration `json:"encryption,omitempty"`
// Future extensions can be added here:
// Versioning *s3_pb.VersioningConfiguration `json:"versioning,omitempty"`
// Lifecycle *s3_pb.LifecycleConfiguration `json:"lifecycle,omitempty"`
// Notification *s3_pb.NotificationConfiguration `json:"notification,omitempty"`
// Replication *s3_pb.ReplicationConfiguration `json:"replication,omitempty"`
// Analytics *s3_pb.AnalyticsConfiguration `json:"analytics,omitempty"`
// Logging *s3_pb.LoggingConfiguration `json:"logging,omitempty"`
// Website *s3_pb.WebsiteConfiguration `json:"website,omitempty"`
// RequestPayer *s3_pb.RequestPayerConfiguration `json:"requestPayer,omitempty"`
// PublicAccess *s3_pb.PublicAccessConfiguration `json:"publicAccess,omitempty"`
}
// NewBucketMetadata creates a new BucketMetadata with default values
func NewBucketMetadata() *BucketMetadata {
return &BucketMetadata{
Tags: make(map[string]string),
}
}
// IsEmpty returns true if the metadata has no configuration set
func (bm *BucketMetadata) IsEmpty() bool {
return len(bm.Tags) == 0 && bm.CORS == nil && bm.Encryption == nil
}
// HasEncryption returns true if bucket has encryption configuration
func (bm *BucketMetadata) HasEncryption() bool {
return bm.Encryption != nil
}
// HasCORS returns true if bucket has CORS configuration
func (bm *BucketMetadata) HasCORS() bool {
return bm.CORS != nil
}
// HasTags returns true if bucket has tags
func (bm *BucketMetadata) HasTags() bool {
return len(bm.Tags) > 0
}
// NewBucketConfigCache creates a new bucket configuration cache
// TTL can be set to a longer duration since cache consistency is maintained
// through real-time metadata subscription events rather than TTL expiration
func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
negativeTTL := ttl / 4 // Negative cache TTL is shorter than positive cache
if negativeTTL < 30*time.Second {
negativeTTL = 30 * time.Second // Minimum 30 seconds for negative cache
}
return &BucketConfigCache{
cache: make(map[string]*BucketConfig),
negativeCache: make(map[string]time.Time),
ttl: ttl,
negativeTTL: negativeTTL,
}
}
// Get retrieves bucket configuration from cache
func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
bcc.mutex.RLock()
defer bcc.mutex.RUnlock()
config, exists := bcc.cache[bucket]
if !exists {
return nil, false
}
// Check if cache entry is expired (safety fallback; entries are normally updated via events)
if time.Since(config.LastModified) > bcc.ttl {
return nil, false
}
return config, true
}
// Set stores bucket configuration in cache
func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
config.LastModified = time.Now()
bcc.cache[bucket] = config
}
// Remove removes bucket configuration from cache
func (bcc *BucketConfigCache) Remove(bucket string) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
delete(bcc.cache, bucket)
}
// Clear clears all cached configurations
func (bcc *BucketConfigCache) Clear() {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
bcc.cache = make(map[string]*BucketConfig)
bcc.negativeCache = make(map[string]time.Time)
}
// IsNegativelyCached checks if a bucket is in the negative cache (doesn't exist)
func (bcc *BucketConfigCache) IsNegativelyCached(bucket string) bool {
bcc.mutex.RLock()
defer bcc.mutex.RUnlock()
if cachedTime, exists := bcc.negativeCache[bucket]; exists {
// Check if the negative cache entry is still valid
if time.Since(cachedTime) < bcc.negativeTTL {
return true
}
// Entry expired, remove it
delete(bcc.negativeCache, bucket)
}
return false
}
// SetNegativeCache marks a bucket as non-existent in the negative cache
func (bcc *BucketConfigCache) SetNegativeCache(bucket string) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
bcc.negativeCache[bucket] = time.Now()
}
// RemoveNegativeCache removes a bucket from the negative cache
func (bcc *BucketConfigCache) RemoveNegativeCache(bucket string) {
bcc.mutex.Lock()
defer bcc.mutex.Unlock()
delete(bcc.negativeCache, bucket)
}
// getBucketConfig retrieves bucket configuration with caching
func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
// Check negative cache first
if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
return nil, s3err.ErrNoSuchBucket
}
// Try positive cache
if config, found := s3a.bucketConfigCache.Get(bucket); found {
return config, s3err.ErrNone
}
// Try to get from filer
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// Bucket doesn't exist - set negative cache
s3a.bucketConfigCache.SetNegativeCache(bucket)
return nil, s3err.ErrNoSuchBucket
}
glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
return nil, s3err.ErrInternalError
}
config := &BucketConfig{
Name: bucket,
Entry: entry,
IsPublicRead: false, // Explicitly default to false for private buckets
}
// Extract configuration from extended attributes
if entry.Extended != nil {
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
config.Versioning = string(versioning)
}
if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
config.Ownership = string(ownership)
}
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
config.ACL = acl
// Parse ACL once and cache public-read status
config.IsPublicRead = parseAndCachePublicReadStatus(acl)
} else {
// No ACL means private bucket
config.IsPublicRead = false
}
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
config.Owner = string(owner)
}
// Parse Object Lock configuration if present
if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
config.ObjectLockConfig = objectLockConfig
glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
}
}
// Load CORS configuration from bucket directory content
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// Missing metadata is not an error; fall back cleanly
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
} else {
// Log parsing or validation errors
glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
}
} else {
config.CORS = corsConfig
}
// Cache the result
s3a.bucketConfigCache.Set(bucket, config)
return config, s3err.ErrNone
}
// updateBucketConfig updates bucket configuration and invalidates cache
func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return errCode
}
// Apply update function
if err := updateFn(config); err != nil {
glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Prepare extended attributes
if config.Entry.Extended == nil {
config.Entry.Extended = make(map[string][]byte)
}
// Update extended attributes
if config.Versioning != "" {
config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
}
if config.Ownership != "" {
config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
}
if config.ACL != nil {
config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
}
if config.Owner != "" {
config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
}
// Update Object Lock configuration
if config.ObjectLockConfig != nil {
if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil {
glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
}
// Save to filer
err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
if err != nil {
glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Update cache
s3a.bucketConfigCache.Set(bucket, config)
return s3err.ErrNone
}
// isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return false, filer_pb.ErrNotFound
}
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
}
// Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled
// (since object lock requires versioning to be enabled)
return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
}
// isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return false, filer_pb.ErrNotFound
}
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
}
// Versioning is configured if explicitly set to either "Enabled" or "Suspended"
// OR if object lock is enabled (which forces versioning)
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
}
// getVersioningState returns the detailed versioning state for a bucket
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return "", nil
}
return "", fmt.Errorf("failed to get bucket config: %v", errCode)
}
// If object lock is enabled, versioning must be enabled regardless of explicit setting
if config.ObjectLockConfig != nil {
return s3_constants.VersioningEnabled, nil
}
// Return the explicit versioning status (empty string means never configured)
return config.Versioning, nil
}
// getBucketVersioningStatus returns the versioning status for a bucket
func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return "", errCode
}
// Return exactly what's stored - empty string means versioning was never configured
// This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
return config.Versioning, s3err.ErrNone
}
// setBucketVersioningStatus sets the versioning status for a bucket
func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
config.Versioning = status
return nil
})
}
// getBucketOwnership returns the ownership setting for a bucket
func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return "", errCode
}
return config.Ownership, s3err.ErrNone
}
// setBucketOwnership sets the ownership setting for a bucket
func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
config.Ownership = ownership
return nil
})
}
// loadCORSFromBucketContent loads CORS configuration from bucket directory content
func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
metadata, err := s3a.GetBucketMetadata(bucket)
if err != nil {
return nil, err
}
// Note: corsConfig can be nil if no CORS configuration is set, which is valid
return metadata.CORS, nil
}
// getCORSConfiguration retrieves CORS configuration with caching
func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return nil, errCode
}
return config.CORS, s3err.ErrNone
}
// updateCORSConfiguration updates the CORS configuration for a bucket
func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
// Update using structured API
err := s3a.UpdateBucketCORS(bucket, corsConfig)
if err != nil {
glog.Errorf("updateCORSConfiguration: failed to update CORS config for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Cache will be updated automatically via metadata subscription
return s3err.ErrNone
}
// removeCORSConfiguration removes the CORS configuration for a bucket
func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
// Update using structured API
err := s3a.ClearBucketCORS(bucket)
if err != nil {
glog.Errorf("removeCORSConfiguration: failed to remove CORS config for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
// Cache will be updated automatically via metadata subscription
return s3err.ErrNone
}
// Conversion functions between CORS types and protobuf types
// corsRuleToProto converts a CORS rule to protobuf format
func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
return &s3_pb.CORSRule{
AllowedHeaders: rule.AllowedHeaders,
AllowedMethods: rule.AllowedMethods,
AllowedOrigins: rule.AllowedOrigins,
ExposeHeaders: rule.ExposeHeaders,
MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
Id: rule.ID,
}
}
// corsRuleFromProto converts a protobuf CORS rule to standard format
func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
var maxAge *int
// Always create the pointer if MaxAgeSeconds is >= 0
// This prevents nil pointer dereferences in tests and matches AWS behavior
if protoRule.MaxAgeSeconds >= 0 {
age := int(protoRule.MaxAgeSeconds)
maxAge = &age
}
// Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
return cors.CORSRule{
AllowedHeaders: protoRule.AllowedHeaders,
AllowedMethods: protoRule.AllowedMethods,
AllowedOrigins: protoRule.AllowedOrigins,
ExposeHeaders: protoRule.ExposeHeaders,
MaxAgeSeconds: maxAge,
ID: protoRule.Id,
}
}
// corsConfigToProto converts CORS configuration to protobuf format
func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
if config == nil {
return nil
}
protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
for i, rule := range config.CORSRules {
protoRules[i] = corsRuleToProto(rule)
}
return &s3_pb.CORSConfiguration{
CorsRules: protoRules,
}
}
// corsConfigFromProto converts protobuf CORS configuration to standard format
func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
if protoConfig == nil {
return nil
}
rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
for i, protoRule := range protoConfig.CorsRules {
rules[i] = corsRuleFromProto(protoRule)
}
return &cors.CORSConfiguration{
CORSRules: rules,
}
}
// getMaxAgeSecondsValue safely extracts max age seconds value
func getMaxAgeSecondsValue(maxAge *int) int {
if maxAge == nil {
return 0
}
return *maxAge
}
// parseAndCachePublicReadStatus parses the ACL and caches the public-read status
func parseAndCachePublicReadStatus(acl []byte) bool {
var grants []*s3.Grant
if err := json.Unmarshal(acl, &grants); err != nil {
return false
}
// Check if any grant gives read permission to "AllUsers" group
for _, grant := range grants {
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
// Check for AllUsers group with Read permission
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
return true
}
}
}
return false
}
// getBucketMetadata retrieves bucket metadata as a structured object with caching
func (s3a *S3ApiServer) getBucketMetadata(bucket string) (*BucketMetadata, error) {
if s3a.bucketConfigCache != nil {
// Check negative cache first
if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
return nil, fmt.Errorf("bucket directory not found %s", bucket)
}
// Try to get from positive cache
if config, found := s3a.bucketConfigCache.Get(bucket); found {
// Extract metadata from cached config
if metadata, err := s3a.extractMetadataFromConfig(config); err == nil {
return metadata, nil
}
// If extraction fails, fall through to direct load
}
}
// Load directly from filer
return s3a.loadBucketMetadataFromFiler(bucket)
}
// extractMetadataFromConfig extracts BucketMetadata from cached BucketConfig
func (s3a *S3ApiServer) extractMetadataFromConfig(config *BucketConfig) (*BucketMetadata, error) {
if config == nil || config.Entry == nil {
return NewBucketMetadata(), nil
}
// Parse metadata from entry content if available
if len(config.Entry.Content) > 0 {
var protoMetadata s3_pb.BucketMetadata
if err := proto.Unmarshal(config.Entry.Content, &protoMetadata); err != nil {
glog.Errorf("extractMetadataFromConfig: failed to unmarshal protobuf metadata for bucket %s: %v", config.Name, err)
return nil, err
}
// Convert protobuf to structured metadata
metadata := &BucketMetadata{
Tags: protoMetadata.Tags,
CORS: corsConfigFromProto(protoMetadata.Cors),
Encryption: protoMetadata.Encryption,
}
return metadata, nil
}
// Fallback: create metadata from cached CORS config
metadata := NewBucketMetadata()
if config.CORS != nil {
metadata.CORS = config.CORS
}
return metadata, nil
}
// loadBucketMetadataFromFiler loads bucket metadata directly from the filer
func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetadata, error) {
// Validate bucket name to prevent path traversal attacks
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
return nil, fmt.Errorf("invalid bucket name: %s", bucket)
}
// Clean the bucket name further to prevent any potential path traversal
bucket = filepath.Clean(bucket)
if bucket == "." || bucket == ".." {
return nil, fmt.Errorf("invalid bucket name: %s", bucket)
}
// Get bucket directory entry to access its content
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
// Check if this is a "not found" error
if errors.Is(err, filer_pb.ErrNotFound) {
// Set negative cache for non-existent bucket
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.SetNegativeCache(bucket)
}
}
return nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
}
if entry == nil {
// Set negative cache for non-existent bucket
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.SetNegativeCache(bucket)
}
return nil, fmt.Errorf("bucket directory not found %s", bucket)
}
// If no content, return empty metadata
if len(entry.Content) == 0 {
return NewBucketMetadata(), nil
}
// Unmarshal metadata from protobuf
var protoMetadata s3_pb.BucketMetadata
if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
return nil, fmt.Errorf("failed to unmarshal bucket metadata for %s: %w", bucket, err)
}
// Convert protobuf CORS to standard CORS
corsConfig := corsConfigFromProto(protoMetadata.Cors)
// Create and return structured metadata
metadata := &BucketMetadata{
Tags: protoMetadata.Tags,
CORS: corsConfig,
Encryption: protoMetadata.Encryption,
}
return metadata, nil
}
// setBucketMetadata stores bucket metadata from a structured object
func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadata) error {
// Validate bucket name to prevent path traversal attacks
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
return fmt.Errorf("invalid bucket name: %s", bucket)
}
// Clean the bucket name further to prevent any potential path traversal
bucket = filepath.Clean(bucket)
if bucket == "." || bucket == ".." {
return fmt.Errorf("invalid bucket name: %s", bucket)
}
// Default to empty metadata if nil
if metadata == nil {
metadata = NewBucketMetadata()
}
// Create protobuf metadata
protoMetadata := &s3_pb.BucketMetadata{
Tags: metadata.Tags,
Cors: corsConfigToProto(metadata.CORS),
Encryption: metadata.Encryption,
}
// Marshal metadata to protobuf
metadataBytes, err := proto.Marshal(protoMetadata)
if err != nil {
return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
}
// Update the bucket entry with new content
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Get current bucket entry
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
}
if entry == nil {
return fmt.Errorf("bucket directory not found %s", bucket)
}
// Update content with metadata
entry.Content = metadataBytes
request := &filer_pb.UpdateEntryRequest{
Directory: s3a.option.BucketsPath,
Entry: entry,
}
_, err = client.UpdateEntry(context.Background(), request)
return err
})
// Invalidate cache after successful update
if err == nil && s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.Remove(bucket)
s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Remove from negative cache too
}
return err
}
// New structured API functions using BucketMetadata
// GetBucketMetadata retrieves complete bucket metadata as a structured object
func (s3a *S3ApiServer) GetBucketMetadata(bucket string) (*BucketMetadata, error) {
return s3a.getBucketMetadata(bucket)
}
// SetBucketMetadata stores complete bucket metadata from a structured object
func (s3a *S3ApiServer) SetBucketMetadata(bucket string, metadata *BucketMetadata) error {
return s3a.setBucketMetadata(bucket, metadata)
}
// UpdateBucketMetadata updates specific parts of bucket metadata while preserving others
//
// DISTRIBUTED SYSTEM DESIGN NOTE:
// This function implements a read-modify-write pattern with "last write wins" semantics.
// In the rare case of concurrent updates to different parts of bucket metadata
// (e.g., simultaneous tag and CORS updates), the last write may overwrite previous changes.
//
// This is an acceptable trade-off because:
// 1. Bucket metadata updates are infrequent in typical S3 usage
// 2. Traditional locking doesn't work in distributed systems across multiple nodes
// 3. The complexity of distributed consensus (e.g., Raft) for metadata updates would
// be disproportionate to the low frequency of bucket configuration changes
// 4. Most bucket operations (tags, CORS, encryption) are typically configured once
// during setup rather than being frequently modified
//
// If stronger consistency is required, consider implementing optimistic concurrency
// control with version numbers or ETags at the storage layer.
func (s3a *S3ApiServer) UpdateBucketMetadata(bucket string, update func(*BucketMetadata) error) error {
// Get current metadata
metadata, err := s3a.GetBucketMetadata(bucket)
if err != nil {
return fmt.Errorf("failed to get current bucket metadata: %w", err)
}
// Apply update function
if err := update(metadata); err != nil {
return fmt.Errorf("failed to apply metadata update: %w", err)
}
// Store updated metadata (last write wins)
return s3a.SetBucketMetadata(bucket, metadata)
}
// Helper functions for specific metadata operations using structured API
// UpdateBucketTags sets bucket tags using the structured API
func (s3a *S3ApiServer) UpdateBucketTags(bucket string, tags map[string]string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Tags = tags
return nil
})
}
// UpdateBucketCORS sets bucket CORS configuration using the structured API
func (s3a *S3ApiServer) UpdateBucketCORS(bucket string, corsConfig *cors.CORSConfiguration) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.CORS = corsConfig
return nil
})
}
// UpdateBucketEncryption sets bucket encryption configuration using the structured API
func (s3a *S3ApiServer) UpdateBucketEncryption(bucket string, encryptionConfig *s3_pb.EncryptionConfiguration) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Encryption = encryptionConfig
return nil
})
}
// ClearBucketTags removes all bucket tags using the structured API
func (s3a *S3ApiServer) ClearBucketTags(bucket string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Tags = make(map[string]string)
return nil
})
}
// ClearBucketCORS removes bucket CORS configuration using the structured API
func (s3a *S3ApiServer) ClearBucketCORS(bucket string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.CORS = nil
return nil
})
}
// ClearBucketEncryption removes bucket encryption configuration using the structured API
func (s3a *S3ApiServer) ClearBucketEncryption(bucket string) error {
return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
metadata.Encryption = nil
return nil
})
}