mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-26 05:22:46 +02:00
fix https://github.com/seaweedfs/seaweedfs/issues/6971 fix https://github.com/seaweedfs/seaweedfs/issues/7028
600 lines
19 KiB
Go
600 lines
19 KiB
Go
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/cors"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
)
|
|
|
|
// BucketConfig represents cached bucket configuration
|
|
type BucketConfig struct {
|
|
Name string
|
|
Versioning string // "Enabled", "Suspended", or ""
|
|
Ownership string
|
|
ACL []byte
|
|
Owner string
|
|
IsPublicRead bool // Cached flag to avoid JSON parsing on every request
|
|
CORS *cors.CORSConfiguration
|
|
ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
|
|
LastModified time.Time
|
|
Entry *filer_pb.Entry
|
|
}
|
|
|
|
// BucketConfigCache provides caching for bucket configurations
|
|
// Cache entries are automatically updated/invalidated through metadata subscription events,
|
|
// so TTL serves as a safety fallback rather than the primary consistency mechanism
|
|
type BucketConfigCache struct {
|
|
cache map[string]*BucketConfig
|
|
mutex sync.RWMutex
|
|
ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events
|
|
}
|
|
|
|
// NewBucketConfigCache creates a new bucket configuration cache
|
|
// TTL can be set to a longer duration since cache consistency is maintained
|
|
// through real-time metadata subscription events rather than TTL expiration
|
|
func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
|
|
return &BucketConfigCache{
|
|
cache: make(map[string]*BucketConfig),
|
|
ttl: ttl,
|
|
}
|
|
}
|
|
|
|
// Get retrieves bucket configuration from cache
|
|
func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
|
|
bcc.mutex.RLock()
|
|
defer bcc.mutex.RUnlock()
|
|
|
|
config, exists := bcc.cache[bucket]
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
// Check if cache entry is expired (safety fallback; entries are normally updated via events)
|
|
if time.Since(config.LastModified) > bcc.ttl {
|
|
return nil, false
|
|
}
|
|
|
|
return config, true
|
|
}
|
|
|
|
// Set stores bucket configuration in cache
|
|
func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
|
|
bcc.mutex.Lock()
|
|
defer bcc.mutex.Unlock()
|
|
|
|
config.LastModified = time.Now()
|
|
bcc.cache[bucket] = config
|
|
}
|
|
|
|
// Remove removes bucket configuration from cache
|
|
func (bcc *BucketConfigCache) Remove(bucket string) {
|
|
bcc.mutex.Lock()
|
|
defer bcc.mutex.Unlock()
|
|
|
|
delete(bcc.cache, bucket)
|
|
}
|
|
|
|
// Clear clears all cached configurations
|
|
func (bcc *BucketConfigCache) Clear() {
|
|
bcc.mutex.Lock()
|
|
defer bcc.mutex.Unlock()
|
|
|
|
bcc.cache = make(map[string]*BucketConfig)
|
|
}
|
|
|
|
// getBucketConfig retrieves bucket configuration with caching
|
|
func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
|
|
// Try cache first
|
|
if config, found := s3a.bucketConfigCache.Get(bucket); found {
|
|
return config, s3err.ErrNone
|
|
}
|
|
|
|
// Try to get from filer
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
// Bucket doesn't exist
|
|
return nil, s3err.ErrNoSuchBucket
|
|
}
|
|
glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
|
|
return nil, s3err.ErrInternalError
|
|
}
|
|
|
|
config := &BucketConfig{
|
|
Name: bucket,
|
|
Entry: entry,
|
|
IsPublicRead: false, // Explicitly default to false for private buckets
|
|
}
|
|
|
|
// Extract configuration from extended attributes
|
|
if entry.Extended != nil {
|
|
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
|
|
config.Versioning = string(versioning)
|
|
}
|
|
if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
|
|
config.Ownership = string(ownership)
|
|
}
|
|
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
|
|
config.ACL = acl
|
|
// Parse ACL once and cache public-read status
|
|
config.IsPublicRead = parseAndCachePublicReadStatus(acl)
|
|
} else {
|
|
// No ACL means private bucket
|
|
config.IsPublicRead = false
|
|
}
|
|
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
|
config.Owner = string(owner)
|
|
}
|
|
// Parse Object Lock configuration if present
|
|
if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
|
|
config.ObjectLockConfig = objectLockConfig
|
|
glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
|
|
}
|
|
}
|
|
|
|
// Load CORS configuration from bucket directory content
|
|
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
// Missing metadata is not an error; fall back cleanly
|
|
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
|
|
} else {
|
|
// Log parsing or validation errors
|
|
glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
|
|
}
|
|
} else {
|
|
config.CORS = corsConfig
|
|
}
|
|
|
|
// Cache the result
|
|
s3a.bucketConfigCache.Set(bucket, config)
|
|
|
|
return config, s3err.ErrNone
|
|
}
|
|
|
|
// updateBucketConfig updates bucket configuration and invalidates cache
|
|
func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return errCode
|
|
}
|
|
|
|
// Apply update function
|
|
if err := updateFn(config); err != nil {
|
|
glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Prepare extended attributes
|
|
if config.Entry.Extended == nil {
|
|
config.Entry.Extended = make(map[string][]byte)
|
|
}
|
|
|
|
// Update extended attributes
|
|
if config.Versioning != "" {
|
|
config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
|
|
}
|
|
if config.Ownership != "" {
|
|
config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
|
|
}
|
|
if config.ACL != nil {
|
|
config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
|
|
}
|
|
if config.Owner != "" {
|
|
config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
|
|
}
|
|
// Update Object Lock configuration
|
|
if config.ObjectLockConfig != nil {
|
|
if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil {
|
|
glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
}
|
|
|
|
// Save to filer
|
|
err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
|
|
if err != nil {
|
|
glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Update cache
|
|
s3a.bucketConfigCache.Set(bucket, config)
|
|
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
|
|
func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return false, filer_pb.ErrNotFound
|
|
}
|
|
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled
|
|
// (since object lock requires versioning to be enabled)
|
|
return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
|
|
}
|
|
|
|
// isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
|
|
func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return false, filer_pb.ErrNotFound
|
|
}
|
|
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// Versioning is configured if explicitly set to either "Enabled" or "Suspended"
|
|
// OR if object lock is enabled (which forces versioning)
|
|
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
|
|
}
|
|
|
|
// getVersioningState returns the detailed versioning state for a bucket
|
|
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return "", nil
|
|
}
|
|
return "", fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// If object lock is enabled, versioning must be enabled regardless of explicit setting
|
|
if config.ObjectLockConfig != nil {
|
|
return s3_constants.VersioningEnabled, nil
|
|
}
|
|
|
|
// Return the explicit versioning status (empty string means never configured)
|
|
return config.Versioning, nil
|
|
}
|
|
|
|
// getBucketVersioningStatus returns the versioning status for a bucket
|
|
func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return "", errCode
|
|
}
|
|
|
|
// Return exactly what's stored - empty string means versioning was never configured
|
|
// This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
|
|
return config.Versioning, s3err.ErrNone
|
|
}
|
|
|
|
// setBucketVersioningStatus sets the versioning status for a bucket
|
|
func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
|
|
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
|
config.Versioning = status
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// getBucketOwnership returns the ownership setting for a bucket
|
|
func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return "", errCode
|
|
}
|
|
|
|
return config.Ownership, s3err.ErrNone
|
|
}
|
|
|
|
// setBucketOwnership sets the ownership setting for a bucket
|
|
func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
|
|
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
|
config.Ownership = ownership
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// loadCORSFromBucketContent loads CORS configuration from bucket directory content
|
|
func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
|
|
_, corsConfig, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Note: corsConfig can be nil if no CORS configuration is set, which is valid
|
|
return corsConfig, nil
|
|
}
|
|
|
|
// getCORSConfiguration retrieves CORS configuration with caching
|
|
func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return nil, errCode
|
|
}
|
|
|
|
return config.CORS, s3err.ErrNone
|
|
}
|
|
|
|
// updateCORSConfiguration updates the CORS configuration for a bucket
|
|
func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
|
|
// Get existing metadata
|
|
existingTags, _, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
glog.Errorf("updateCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Update CORS configuration
|
|
updatedCorsConfig := corsConfig
|
|
|
|
// Store updated metadata
|
|
if err := s3a.setBucketMetadata(bucket, existingTags, updatedCorsConfig); err != nil {
|
|
glog.Errorf("updateCORSConfiguration: failed to persist CORS config to bucket content for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Cache will be updated automatically via metadata subscription
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// removeCORSConfiguration removes the CORS configuration for a bucket
|
|
func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
|
|
// Get existing metadata
|
|
existingTags, _, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
glog.Errorf("removeCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Remove CORS configuration
|
|
var nilCorsConfig *cors.CORSConfiguration = nil
|
|
|
|
// Store updated metadata
|
|
if err := s3a.setBucketMetadata(bucket, existingTags, nilCorsConfig); err != nil {
|
|
glog.Errorf("removeCORSConfiguration: failed to remove CORS config from bucket content for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Cache will be updated automatically via metadata subscription
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// Conversion functions between CORS types and protobuf types
|
|
|
|
// corsRuleToProto converts a CORS rule to protobuf format
|
|
func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
|
|
return &s3_pb.CORSRule{
|
|
AllowedHeaders: rule.AllowedHeaders,
|
|
AllowedMethods: rule.AllowedMethods,
|
|
AllowedOrigins: rule.AllowedOrigins,
|
|
ExposeHeaders: rule.ExposeHeaders,
|
|
MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
|
|
Id: rule.ID,
|
|
}
|
|
}
|
|
|
|
// corsRuleFromProto converts a protobuf CORS rule to standard format
|
|
func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
|
|
var maxAge *int
|
|
// Always create the pointer if MaxAgeSeconds is >= 0
|
|
// This prevents nil pointer dereferences in tests and matches AWS behavior
|
|
if protoRule.MaxAgeSeconds >= 0 {
|
|
age := int(protoRule.MaxAgeSeconds)
|
|
maxAge = &age
|
|
}
|
|
// Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
|
|
|
|
return cors.CORSRule{
|
|
AllowedHeaders: protoRule.AllowedHeaders,
|
|
AllowedMethods: protoRule.AllowedMethods,
|
|
AllowedOrigins: protoRule.AllowedOrigins,
|
|
ExposeHeaders: protoRule.ExposeHeaders,
|
|
MaxAgeSeconds: maxAge,
|
|
ID: protoRule.Id,
|
|
}
|
|
}
|
|
|
|
// corsConfigToProto converts CORS configuration to protobuf format
|
|
func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
|
|
if config == nil {
|
|
return nil
|
|
}
|
|
|
|
protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
|
|
for i, rule := range config.CORSRules {
|
|
protoRules[i] = corsRuleToProto(rule)
|
|
}
|
|
|
|
return &s3_pb.CORSConfiguration{
|
|
CorsRules: protoRules,
|
|
}
|
|
}
|
|
|
|
// corsConfigFromProto converts protobuf CORS configuration to standard format
|
|
func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
|
|
if protoConfig == nil {
|
|
return nil
|
|
}
|
|
|
|
rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
|
|
for i, protoRule := range protoConfig.CorsRules {
|
|
rules[i] = corsRuleFromProto(protoRule)
|
|
}
|
|
|
|
return &cors.CORSConfiguration{
|
|
CORSRules: rules,
|
|
}
|
|
}
|
|
|
|
// getMaxAgeSecondsValue safely extracts max age seconds value
|
|
func getMaxAgeSecondsValue(maxAge *int) int {
|
|
if maxAge == nil {
|
|
return 0
|
|
}
|
|
return *maxAge
|
|
}
|
|
|
|
// parseAndCachePublicReadStatus parses the ACL and caches the public-read status
|
|
func parseAndCachePublicReadStatus(acl []byte) bool {
|
|
var grants []*s3.Grant
|
|
if err := json.Unmarshal(acl, &grants); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Check if any grant gives read permission to "AllUsers" group
|
|
for _, grant := range grants {
|
|
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
|
|
// Check for AllUsers group with Read permission
|
|
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
|
|
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// getBucketMetadata retrieves bucket metadata from bucket directory content using protobuf
|
|
func (s3a *S3ApiServer) getBucketMetadata(bucket string) (map[string]string, *cors.CORSConfiguration, error) {
|
|
// Validate bucket name to prevent path traversal attacks
|
|
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
|
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
|
return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Clean the bucket name further to prevent any potential path traversal
|
|
bucket = filepath.Clean(bucket)
|
|
if bucket == "." || bucket == ".." {
|
|
return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Get bucket directory entry to access its content
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
|
}
|
|
if entry == nil {
|
|
return nil, nil, fmt.Errorf("bucket directory not found %s", bucket)
|
|
}
|
|
|
|
// If no content, return empty metadata
|
|
if len(entry.Content) == 0 {
|
|
return make(map[string]string), nil, nil
|
|
}
|
|
|
|
// Unmarshal metadata from protobuf
|
|
var protoMetadata s3_pb.BucketMetadata
|
|
if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
|
|
glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
|
|
return make(map[string]string), nil, nil // Return empty metadata on error, don't fail
|
|
}
|
|
|
|
// Convert protobuf CORS to standard CORS
|
|
corsConfig := corsConfigFromProto(protoMetadata.Cors)
|
|
|
|
return protoMetadata.Tags, corsConfig, nil
|
|
}
|
|
|
|
// setBucketMetadata stores bucket metadata in bucket directory content using protobuf
|
|
func (s3a *S3ApiServer) setBucketMetadata(bucket string, tags map[string]string, corsConfig *cors.CORSConfiguration) error {
|
|
// Validate bucket name to prevent path traversal attacks
|
|
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
|
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
|
return fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Clean the bucket name further to prevent any potential path traversal
|
|
bucket = filepath.Clean(bucket)
|
|
if bucket == "." || bucket == ".." {
|
|
return fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Create protobuf metadata
|
|
protoMetadata := &s3_pb.BucketMetadata{
|
|
Tags: tags,
|
|
Cors: corsConfigToProto(corsConfig),
|
|
}
|
|
|
|
// Marshal metadata to protobuf
|
|
metadataBytes, err := proto.Marshal(protoMetadata)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
|
|
}
|
|
|
|
// Update the bucket entry with new content
|
|
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// Get current bucket entry
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
|
}
|
|
if entry == nil {
|
|
return fmt.Errorf("bucket directory not found %s", bucket)
|
|
}
|
|
|
|
// Update content with metadata
|
|
entry.Content = metadataBytes
|
|
|
|
request := &filer_pb.UpdateEntryRequest{
|
|
Directory: s3a.option.BucketsPath,
|
|
Entry: entry,
|
|
}
|
|
|
|
_, err = client.UpdateEntry(context.Background(), request)
|
|
return err
|
|
})
|
|
return err
|
|
}
|
|
|
|
// getBucketTags retrieves bucket tags from bucket directory content
|
|
func (s3a *S3ApiServer) getBucketTags(bucket string) (map[string]string, error) {
|
|
tags, _, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(tags) == 0 {
|
|
return nil, fmt.Errorf("no tags configuration found")
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
// setBucketTags stores bucket tags in bucket directory content
|
|
func (s3a *S3ApiServer) setBucketTags(bucket string, tags map[string]string) error {
|
|
// Get existing metadata
|
|
_, existingCorsConfig, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Store updated metadata with new tags
|
|
err = s3a.setBucketMetadata(bucket, tags, existingCorsConfig)
|
|
return err
|
|
}
|
|
|
|
// deleteBucketTags removes bucket tags from bucket directory content
|
|
func (s3a *S3ApiServer) deleteBucketTags(bucket string) error {
|
|
// Get existing metadata
|
|
_, existingCorsConfig, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Store updated metadata with empty tags
|
|
emptyTags := make(map[string]string)
|
|
err = s3a.setBucketMetadata(bucket, emptyTags, existingCorsConfig)
|
|
return err
|
|
}
|