1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-24 20:42:47 +02:00
seaweedfs/weed/s3api/s3api_object_retention.go
Chris Lu 0e4d803896
refactor (#6999)
* fix GetObjectLockConfigurationHandler

* cache and use bucket object lock config

* subscribe to bucket configuration changes

* increase bucket config cache TTL

* refactor

* Update weed/s3api/s3api_server.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* avoid duplidated work

* rename variable

* Update s3api_object_handlers_put.go

* fix routing

* admin ui and api handler are consistent now

* use fields instead of xml

* fix test

* address comments

* Update weed/s3api/s3api_object_handlers_put.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update test/s3/retention/s3_retention_test.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/s3api/object_lock_utils.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* change error style

* errorf

* read entry once

* add s3 tests for object lock and retention

* use marker

* install s3 tests

* Update s3tests.yml

* Update s3tests.yml

* Update s3tests.conf

* Update s3tests.conf

* address test errors

* address test errors

With these fixes, the s3-tests should now:
 Return InvalidBucketState (409 Conflict) for object lock operations on invalid buckets
 Return MalformedXML for invalid retention configurations
 Include VersionId in response headers when available
 Return proper HTTP status codes (403 Forbidden for retention mode changes)
 Handle all object lock validation errors consistently

* fixes

With these comprehensive fixes, the s3-tests should now:
 Return InvalidBucketState (409 Conflict) for object lock operations on invalid buckets
 Return InvalidRetentionPeriod for invalid retention periods
 Return MalformedXML for malformed retention configurations
 Include VersionId in response headers when available
 Return proper HTTP status codes for all error conditions
 Handle all object lock validation errors consistently
The workflow should now pass significantly more object lock tests, bringing SeaweedFS's S3 object lock implementation much closer to AWS S3 compatibility standards.

* fixes

With these final fixes, the s3-tests should now:
 Return MalformedXML for ObjectLockEnabled: 'Disabled'
 Return MalformedXML when both Days and Years are specified in retention configuration
 Return InvalidBucketState (409 Conflict) when trying to suspend versioning on buckets with object lock enabled
 Handle all object lock validation errors consistently with proper error codes

* constants and fixes

 Return InvalidRetentionPeriod for invalid retention values (0 days, negative years)
 Return ObjectLockConfigurationNotFoundError when object lock configuration doesn't exist
 Handle all object lock validation errors consistently with proper error codes

* fixes

 Return MalformedXML when both Days and Years are specified in the same retention configuration
 Return 400 (Bad Request) with InvalidRequest when object lock operations are attempted on buckets without object lock enabled
 Handle all object lock validation errors consistently with proper error codes

* fixes

 Return 409 (Conflict) with InvalidBucketState for bucket-level object lock configuration operations on buckets without object lock enabled
 Allow increasing retention periods and overriding retention with same/later dates
 Only block decreasing retention periods without proper bypass permissions
 Handle all object lock validation errors consistently with proper error codes

* fixes

 Include VersionId in multipart upload completion responses when versioning is enabled
 Block retention mode changes (GOVERNANCE ↔ COMPLIANCE) without bypass permissions
 Handle all object lock validation errors consistently with proper error codes
 Pass the remaining object lock tests

* fix tests

* fixes

* pass tests

* fix tests

* fixes

* add error mapping

* Update s3tests.conf

* fix test_object_lock_put_obj_lock_invalid_days

* fixes

* fix many issues

* fix test_object_lock_delete_multipart_object_with_legal_hold_on

* fix tests

* refactor

* fix test_object_lock_delete_object_with_retention_and_marker

* fix tests

* fix tests

* fix tests

* fix test itself

* fix tests

* fix test

* Update weed/s3api/s3api_object_retention.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* reduce logs

* address comments

* refactor

* rename

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-07-19 00:49:56 -07:00

675 lines
26 KiB
Go

package s3api
import (
"encoding/xml"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// ====================================================================
// ERROR DEFINITIONS
// ====================================================================
// Sentinel errors for proper error handling instead of string matching
var (
ErrNoRetentionConfiguration = errors.New("no retention configuration found")
ErrNoLegalHoldConfiguration = errors.New("no legal hold configuration found")
ErrBucketNotFound = errors.New("bucket not found")
ErrObjectNotFound = errors.New("object not found")
ErrVersionNotFound = errors.New("version not found")
ErrLatestVersionNotFound = errors.New("latest version not found")
ErrComplianceModeActive = errors.New("object is under COMPLIANCE mode retention and cannot be deleted or modified")
ErrGovernanceModeActive = errors.New("object is under GOVERNANCE mode retention and cannot be deleted or modified without bypass")
)
// Error definitions for Object Lock
var (
ErrObjectUnderLegalHold = errors.New("object is under legal hold and cannot be deleted or modified")
ErrGovernanceBypassNotPermitted = errors.New("user does not have permission to bypass governance retention")
ErrInvalidRetentionPeriod = errors.New("invalid retention period specified")
ErrBothDaysAndYearsSpecified = errors.New("both days and years cannot be specified in the same retention configuration")
ErrMalformedXML = errors.New("malformed XML in request body")
// Validation error constants with specific messages for tests
ErrRetentionMissingMode = errors.New("retention configuration must specify Mode")
ErrRetentionMissingRetainUntilDate = errors.New("retention configuration must specify RetainUntilDate")
ErrInvalidRetentionModeValue = errors.New("invalid retention mode")
)
const (
// Maximum retention period limits according to AWS S3 specifications
MaxRetentionDays = 36500 // Maximum number of days for object retention (100 years)
MaxRetentionYears = 100 // Maximum number of years for object retention
)
// ====================================================================
// DATA STRUCTURES
// ====================================================================
// ObjectRetention represents S3 Object Retention configuration
type ObjectRetention struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Retention"`
Mode string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Mode,omitempty"`
RetainUntilDate *time.Time `xml:"http://s3.amazonaws.com/doc/2006-03-01/ RetainUntilDate,omitempty"`
}
// ObjectLegalHold represents S3 Object Legal Hold configuration
type ObjectLegalHold struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LegalHold"`
Status string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Status,omitempty"`
}
// ObjectLockConfiguration represents S3 Object Lock Configuration
type ObjectLockConfiguration struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockConfiguration"`
ObjectLockEnabled string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockEnabled,omitempty"`
Rule *ObjectLockRule `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Rule,omitempty"`
}
// ObjectLockRule represents an Object Lock Rule
type ObjectLockRule struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Rule"`
DefaultRetention *DefaultRetention `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DefaultRetention,omitempty"`
}
// DefaultRetention represents default retention settings
// Implements custom XML unmarshal to track if Days/Years were present in XML
type DefaultRetention struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DefaultRetention"`
Mode string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Mode,omitempty"`
Days int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Days,omitempty"`
Years int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Years,omitempty"`
DaysSet bool `xml:"-"`
YearsSet bool `xml:"-"`
}
// ====================================================================
// XML PARSING
// ====================================================================
// UnmarshalXML implements custom XML unmarshaling for DefaultRetention
// to track whether Days/Years fields were explicitly present in the XML
func (dr *DefaultRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
type Alias DefaultRetention
aux := &struct {
*Alias
Days *int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Days,omitempty"`
Years *int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Years,omitempty"`
}{Alias: (*Alias)(dr)}
if err := d.DecodeElement(aux, &start); err != nil {
glog.V(2).Infof("DefaultRetention.UnmarshalXML: decode error: %v", err)
return err
}
if aux.Days != nil {
dr.Days = *aux.Days
dr.DaysSet = true
glog.V(4).Infof("DefaultRetention.UnmarshalXML: Days present, value=%d", dr.Days)
} else {
glog.V(4).Infof("DefaultRetention.UnmarshalXML: Days not present")
}
if aux.Years != nil {
dr.Years = *aux.Years
dr.YearsSet = true
glog.V(4).Infof("DefaultRetention.UnmarshalXML: Years present, value=%d", dr.Years)
} else {
glog.V(4).Infof("DefaultRetention.UnmarshalXML: Years not present")
}
return nil
}
// parseXML is a generic helper function to parse XML from an HTTP request body.
// It uses xml.Decoder for streaming XML parsing, which is more memory-efficient
// and avoids loading the entire request body into memory.
//
// The function assumes:
// - The request body is not nil (returns error if it is)
// - The request body will be closed after parsing (deferred close)
// - The XML content matches the structure of the provided result type T
//
// This approach is optimized for small XML payloads typical in S3 API requests
// (retention configurations, legal hold settings, etc.) where the overhead of
// streaming parsing is acceptable for the memory efficiency benefits.
func parseXML[T any](request *http.Request, result *T) error {
if request.Body == nil {
return fmt.Errorf("error parsing XML: empty request body")
}
defer request.Body.Close()
decoder := xml.NewDecoder(request.Body)
if err := decoder.Decode(result); err != nil {
return fmt.Errorf("error parsing XML: %w", err)
}
return nil
}
// parseObjectRetention parses XML retention configuration from request body
func parseObjectRetention(request *http.Request) (*ObjectRetention, error) {
var retention ObjectRetention
if err := parseXML(request, &retention); err != nil {
return nil, err
}
return &retention, nil
}
// parseObjectLegalHold parses XML legal hold configuration from request body
func parseObjectLegalHold(request *http.Request) (*ObjectLegalHold, error) {
var legalHold ObjectLegalHold
if err := parseXML(request, &legalHold); err != nil {
return nil, err
}
return &legalHold, nil
}
// parseObjectLockConfiguration parses XML object lock configuration from request body
func parseObjectLockConfiguration(request *http.Request) (*ObjectLockConfiguration, error) {
var config ObjectLockConfiguration
if err := parseXML(request, &config); err != nil {
return nil, err
}
return &config, nil
}
// ====================================================================
// OBJECT ENTRY OPERATIONS
// ====================================================================
// getObjectEntry retrieves the appropriate object entry based on versioning and versionId
func (s3a *S3ApiServer) getObjectEntry(bucket, object, versionId string) (*filer_pb.Entry, error) {
var entry *filer_pb.Entry
var err error
if versionId != "" {
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
} else {
// Check if versioning is enabled
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
if vErr != nil {
return nil, fmt.Errorf("error checking versioning: %w", vErr)
}
if versioningEnabled {
entry, err = s3a.getLatestObjectVersion(bucket, object)
} else {
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err = s3a.getEntry(bucketDir, object)
}
}
if err != nil {
return nil, fmt.Errorf("failed to retrieve object %s/%s: %w", bucket, object, ErrObjectNotFound)
}
return entry, nil
}
// ====================================================================
// RETENTION OPERATIONS
// ====================================================================
// getObjectRetention retrieves object retention configuration
func (s3a *S3ApiServer) getObjectRetention(bucket, object, versionId string) (*ObjectRetention, error) {
entry, err := s3a.getObjectEntry(bucket, object, versionId)
if err != nil {
return nil, err
}
if entry.Extended == nil {
return nil, ErrNoRetentionConfiguration
}
retention := &ObjectRetention{}
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
retention.Mode = string(modeBytes)
}
if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists {
if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
t := time.Unix(timestamp, 0)
retention.RetainUntilDate = &t
} else {
return nil, fmt.Errorf("failed to parse retention timestamp for %s/%s: corrupted timestamp data", bucket, object)
}
}
if retention.Mode == "" || retention.RetainUntilDate == nil {
return nil, ErrNoRetentionConfiguration
}
return retention, nil
}
// setObjectRetention sets object retention configuration
func (s3a *S3ApiServer) setObjectRetention(bucket, object, versionId string, retention *ObjectRetention, bypassGovernance bool) error {
var entry *filer_pb.Entry
var err error
var entryPath string
if versionId != "" {
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
return fmt.Errorf("failed to get version %s for object %s/%s: %w", versionId, bucket, object, ErrVersionNotFound)
}
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
} else {
// Check if versioning is enabled
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
if vErr != nil {
return fmt.Errorf("error checking versioning: %w", vErr)
}
if versioningEnabled {
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
return fmt.Errorf("failed to get latest version for object %s/%s: %w", bucket, object, ErrLatestVersionNotFound)
}
// Extract version ID from entry metadata
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
versionId = string(versionIdBytes)
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
}
}
} else {
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err = s3a.getEntry(bucketDir, object)
if err != nil {
return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound)
}
entryPath = object
}
}
// Check if object is already under retention
if entry.Extended != nil {
if existingMode, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
// Check if attempting to change retention mode
if retention.Mode != "" && string(existingMode) != retention.Mode {
// Attempting to change retention mode
if string(existingMode) == s3_constants.RetentionModeCompliance {
// Cannot change compliance mode retention without bypass
return ErrComplianceModeActive
}
if string(existingMode) == s3_constants.RetentionModeGovernance && !bypassGovernance {
// Cannot change governance mode retention without bypass
return ErrGovernanceModeActive
}
}
if existingDateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
if timestamp, err := strconv.ParseInt(string(existingDateBytes), 10, 64); err == nil {
existingDate := time.Unix(timestamp, 0)
// Check if the new retention date is earlier than the existing one
if retention.RetainUntilDate != nil && retention.RetainUntilDate.Before(existingDate) {
// Attempting to decrease retention period
if string(existingMode) == s3_constants.RetentionModeCompliance {
// Cannot decrease compliance mode retention without bypass
return ErrComplianceModeActive
}
if string(existingMode) == s3_constants.RetentionModeGovernance && !bypassGovernance {
// Cannot decrease governance mode retention without bypass
return ErrGovernanceModeActive
}
}
// If new retention date is later or same, allow the operation
// This covers both increasing retention period and overriding with same/later date
}
}
}
}
// Update retention metadata
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
if retention.Mode != "" {
entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(retention.Mode)
}
if retention.RetainUntilDate != nil {
entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retention.RetainUntilDate.Unix(), 10))
// Also update the existing WORM fields for compatibility
entry.WormEnforcedAtTsNs = time.Now().UnixNano()
}
// Update the entry
// NOTE: Potential race condition exists if concurrent calls to PutObjectRetention
// and PutObjectLegalHold update the same object simultaneously, as they might
// overwrite each other's Extended map changes. This is mitigated by the fact
// that mkFile operations are typically serialized at the filer level, but
// future implementations might consider using atomic update operations or
// entry-level locking for complete safety.
bucketDir := s3a.option.BucketsPath + "/" + bucket
return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
updatedEntry.Extended = entry.Extended
updatedEntry.WormEnforcedAtTsNs = entry.WormEnforcedAtTsNs
})
}
// ====================================================================
// LEGAL HOLD OPERATIONS
// ====================================================================
// getObjectLegalHold retrieves object legal hold configuration
func (s3a *S3ApiServer) getObjectLegalHold(bucket, object, versionId string) (*ObjectLegalHold, error) {
entry, err := s3a.getObjectEntry(bucket, object, versionId)
if err != nil {
return nil, err
}
if entry.Extended == nil {
return nil, ErrNoLegalHoldConfiguration
}
legalHold := &ObjectLegalHold{}
if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
legalHold.Status = string(statusBytes)
} else {
return nil, ErrNoLegalHoldConfiguration
}
return legalHold, nil
}
// setObjectLegalHold sets object legal hold configuration
func (s3a *S3ApiServer) setObjectLegalHold(bucket, object, versionId string, legalHold *ObjectLegalHold) error {
var entry *filer_pb.Entry
var err error
var entryPath string
if versionId != "" {
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
return fmt.Errorf("failed to get version %s for object %s/%s: %w", versionId, bucket, object, ErrVersionNotFound)
}
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
} else {
// Check if versioning is enabled
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
if vErr != nil {
return fmt.Errorf("error checking versioning: %w", vErr)
}
if versioningEnabled {
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
return fmt.Errorf("failed to get latest version for object %s/%s: %w", bucket, object, ErrLatestVersionNotFound)
}
// Extract version ID from entry metadata
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
versionId = string(versionIdBytes)
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
}
}
} else {
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err = s3a.getEntry(bucketDir, object)
if err != nil {
return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound)
}
entryPath = object
}
}
// Update legal hold metadata
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtLegalHoldKey] = []byte(legalHold.Status)
// Update the entry
// NOTE: Potential race condition exists if concurrent calls to PutObjectRetention
// and PutObjectLegalHold update the same object simultaneously, as they might
// overwrite each other's Extended map changes. This is mitigated by the fact
// that mkFile operations are typically serialized at the filer level, but
// future implementations might consider using atomic update operations or
// entry-level locking for complete safety.
bucketDir := s3a.option.BucketsPath + "/" + bucket
return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
updatedEntry.Extended = entry.Extended
})
}
// ====================================================================
// PROTECTION ENFORCEMENT
// ====================================================================
// isObjectRetentionActive checks if object has active retention
func (s3a *S3ApiServer) isObjectRetentionActive(bucket, object, versionId string) (bool, error) {
retention, err := s3a.getObjectRetention(bucket, object, versionId)
if err != nil {
// If no retention found, object is not under retention
if errors.Is(err, ErrNoRetentionConfiguration) {
return false, nil
}
return false, err
}
if retention.RetainUntilDate != nil && retention.RetainUntilDate.After(time.Now()) {
return true, nil
}
return false, nil
}
// getRetentionFromEntry extracts retention configuration from filer entry
func (s3a *S3ApiServer) getRetentionFromEntry(entry *filer_pb.Entry) (*ObjectRetention, bool, error) {
if entry.Extended == nil {
return nil, false, nil
}
retention := &ObjectRetention{}
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
retention.Mode = string(modeBytes)
}
if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists {
if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
t := time.Unix(timestamp, 0)
retention.RetainUntilDate = &t
} else {
return nil, false, fmt.Errorf("failed to parse retention timestamp: corrupted timestamp data")
}
}
if retention.Mode == "" || retention.RetainUntilDate == nil {
return nil, false, nil
}
// Check if retention is currently active
isActive := retention.RetainUntilDate.After(time.Now())
return retention, isActive, nil
}
// getLegalHoldFromEntry extracts legal hold configuration from filer entry
func (s3a *S3ApiServer) getLegalHoldFromEntry(entry *filer_pb.Entry) (*ObjectLegalHold, bool, error) {
if entry.Extended == nil {
return nil, false, nil
}
legalHold := &ObjectLegalHold{}
if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
legalHold.Status = string(statusBytes)
} else {
return nil, false, nil
}
isActive := legalHold.Status == s3_constants.LegalHoldOn
return legalHold, isActive, nil
}
// ====================================================================
// GOVERNANCE BYPASS
// ====================================================================
// checkGovernanceBypassPermission checks if user has permission to bypass governance retention
func (s3a *S3ApiServer) checkGovernanceBypassPermission(request *http.Request, bucket, object string) bool {
// Use the existing IAM auth system to check the specific permission
// Create the governance bypass action with proper bucket/object concatenation
// Note: path.Join would drop bucket if object has leading slash, so use explicit formatting
resource := fmt.Sprintf("%s/%s", bucket, strings.TrimPrefix(object, "/"))
action := Action(fmt.Sprintf("%s:%s", s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION, resource))
// Use the IAM system to authenticate and authorize this specific action
identity, errCode := s3a.iam.authRequest(request, action)
if errCode != s3err.ErrNone {
glog.V(3).Infof("IAM auth failed for governance bypass: %v", errCode)
return false
}
// Verify that the authenticated identity can perform this action
if identity != nil && identity.canDo(action, bucket, object) {
return true
}
// Additional check: allow users with Admin action to bypass governance retention
// Use the proper S3 Admin action constant instead of generic isAdmin() method
adminAction := Action(fmt.Sprintf("%s:%s", s3_constants.ACTION_ADMIN, resource))
if identity != nil && identity.canDo(adminAction, bucket, object) {
glog.V(2).Infof("Admin user %s granted governance bypass permission for %s/%s", identity.Name, bucket, object)
return true
}
return false
}
// evaluateGovernanceBypassRequest evaluates if governance bypass is requested and permitted
func (s3a *S3ApiServer) evaluateGovernanceBypassRequest(r *http.Request, bucket, object string) bool {
// Step 1: Check if governance bypass was requested via header
bypassRequested := r.Header.Get("x-amz-bypass-governance-retention") == "true"
if !bypassRequested {
// No bypass requested - normal retention enforcement applies
return false
}
// Step 2: Validate user has permission to bypass governance retention
hasPermission := s3a.checkGovernanceBypassPermission(r, bucket, object)
if !hasPermission {
glog.V(2).Infof("Governance bypass denied for %s/%s: user lacks s3:BypassGovernanceRetention permission", bucket, object)
return false
}
glog.V(2).Infof("Governance bypass granted for %s/%s: header present and user has permission", bucket, object)
return true
}
// enforceObjectLockProtections enforces object lock protections for operations
func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, bucket, object, versionId string, governanceBypassAllowed bool) error {
// Get the object entry to check both retention and legal hold
// For delete operations without versionId, we need to check the latest version
var entry *filer_pb.Entry
var err error
if versionId != "" {
// Check specific version
entry, err = s3a.getObjectEntry(bucket, object, versionId)
} else {
// Check latest version for delete marker creation
entry, err = s3a.getObjectEntry(bucket, object, "")
}
if err != nil {
// If object doesn't exist, it's not under retention or legal hold - this is expected during delete operations
if errors.Is(err, ErrObjectNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrLatestVersionNotFound) {
// Object doesn't exist, so it can't be under retention or legal hold - this is normal
glog.V(4).Infof("Object %s/%s (versionId: %s) not found during object lock check (expected during delete operations)", bucket, object, versionId)
return nil
}
glog.Warningf("Error retrieving object %s/%s (versionId: %s) for lock check: %v", bucket, object, versionId, err)
return err
}
// Extract retention information from the entry
retention, retentionActive, err := s3a.getRetentionFromEntry(entry)
if err != nil {
glog.Warningf("Error parsing retention for %s/%s (versionId: %s): %v", bucket, object, versionId, err)
// Continue with legal hold check even if retention parsing fails
}
// Extract legal hold information from the entry
_, legalHoldActive, err := s3a.getLegalHoldFromEntry(entry)
if err != nil {
glog.Warningf("Error parsing legal hold for %s/%s (versionId: %s): %v", bucket, object, versionId, err)
// Continue with retention check even if legal hold parsing fails
}
// If object is under legal hold, it cannot be deleted or modified (including delete marker creation)
if legalHoldActive {
return ErrObjectUnderLegalHold
}
// If object is under retention, check the mode
if retentionActive && retention != nil {
if retention.Mode == s3_constants.RetentionModeCompliance {
return ErrComplianceModeActive
}
if retention.Mode == s3_constants.RetentionModeGovernance {
if !governanceBypassAllowed {
return ErrGovernanceModeActive
}
// Note: governanceBypassAllowed parameter is already validated by evaluateGovernanceBypassRequest()
// which checks both header presence and IAM permissions, so we trust it here
}
}
return nil
}
// ====================================================================
// AVAILABILITY CHECKS
// ====================================================================
// isObjectLockAvailable checks if object lock is available for the bucket
func (s3a *S3ApiServer) isObjectLockAvailable(bucket string) error {
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
return ErrBucketNotFound
}
return fmt.Errorf("error checking versioning status: %w", err)
}
if !versioningEnabled {
return fmt.Errorf("object lock requires versioning to be enabled")
}
return nil
}
// handleObjectLockAvailabilityCheck handles object lock availability checks for API endpoints
func (s3a *S3ApiServer) handleObjectLockAvailabilityCheck(w http.ResponseWriter, request *http.Request, bucket, handlerName string) bool {
if err := s3a.isObjectLockAvailable(bucket); err != nil {
glog.Errorf("%s: object lock not available for bucket %s: %v", handlerName, bucket, err)
if errors.Is(err, ErrBucketNotFound) {
s3err.WriteErrorResponse(w, request, s3err.ErrNoSuchBucket)
} else {
// Return InvalidRequest for object lock operations on buckets without object lock enabled
// This matches AWS S3 behavior and s3-tests expectations (400 Bad Request)
s3err.WriteErrorResponse(w, request, s3err.ErrInvalidRequest)
}
return false
}
return true
}