mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
REMOVED DEBUG LOGS: - PutObjectHandler: CI-DEBUG logs for object creation tracing - putVersionedObject: CI-DEBUG logs for version metadata updates - updateLatestVersionInDirectory: CI-DEBUG logs for retry logic - getLatestObjectVersion: CI-DEBUG logs for version lookups and race conditions - HeadObjectHandler: CI-DEBUG logs for latest version requests - ListObjectVersionsHandler: CI-DEBUG logs for request params and responses REMOVED S3-TESTS MODIFICATIONS: - Dynamic sed patches to s3-tests Python code in GitHub Actions - CI-S3TEST-DEBUG logging for test_bucket_list_return_data_versioning - CI-S3TEST-DEBUG logging for test_versioning_concurrent_multi_object_delete - CI-S3TEST-DEBUG logging for _create_objects function RETAINED FUNCTIONALITY: ✓ Path normalization fix (removeDuplicateSlashes consistency) ✓ Retry logic with exponential backoff for filer consistency ✓ Race condition handling for .versions directory metadata ✓ All existing glog.V(1), glog.V(2), glog.V(3) logging levels The debug logs served their purpose in identifying and fixing the root cause of S3 versioning CI failures (path normalization inconsistency). Now that the issues are resolved, the verbose debug output is no longer needed.
1446 lines
52 KiB
Go
1446 lines
52 KiB
Go
package s3api
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
)
|
|
|
|
// corsHeaders defines the CORS headers that need to be preserved
|
|
// Package-level constant to avoid repeated allocations
|
|
var corsHeaders = []string{
|
|
"Access-Control-Allow-Origin",
|
|
"Access-Control-Allow-Methods",
|
|
"Access-Control-Allow-Headers",
|
|
"Access-Control-Expose-Headers",
|
|
"Access-Control-Max-Age",
|
|
"Access-Control-Allow-Credentials",
|
|
}
|
|
|
|
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
|
|
mimeBuffer := make([]byte, 512)
|
|
size, _ := dataReader.Read(mimeBuffer)
|
|
if size > 0 {
|
|
r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size]))
|
|
return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader))
|
|
}
|
|
return io.NopCloser(dataReader)
|
|
}
|
|
|
|
func urlEscapeObject(object string) string {
|
|
t := urlPathEscape(removeDuplicateSlashes(object))
|
|
if strings.HasPrefix(t, "/") {
|
|
return t
|
|
}
|
|
return "/" + t
|
|
}
|
|
|
|
func entryUrlEncode(dir string, entry string, encodingTypeUrl bool) (dirName string, entryName string, prefix string) {
|
|
if !encodingTypeUrl {
|
|
return dir, entry, entry
|
|
}
|
|
return urlPathEscape(dir), url.QueryEscape(entry), urlPathEscape(entry)
|
|
}
|
|
|
|
func urlPathEscape(object string) string {
|
|
var escapedParts []string
|
|
for _, part := range strings.Split(object, "/") {
|
|
escapedParts = append(escapedParts, strings.ReplaceAll(url.PathEscape(part), "+", "%2B"))
|
|
}
|
|
return strings.Join(escapedParts, "/")
|
|
}
|
|
|
|
func removeDuplicateSlashes(object string) string {
|
|
result := strings.Builder{}
|
|
result.Grow(len(object))
|
|
|
|
isLastSlash := false
|
|
for _, r := range object {
|
|
switch r {
|
|
case '/':
|
|
if !isLastSlash {
|
|
result.WriteRune(r)
|
|
}
|
|
isLastSlash = true
|
|
default:
|
|
result.WriteRune(r)
|
|
isLastSlash = false
|
|
}
|
|
}
|
|
return result.String()
|
|
}
|
|
|
|
// checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists
|
|
// Returns: (entry, isDirectoryObject, error)
|
|
// - entry: the directory entry if found and is a directory
|
|
// - isDirectoryObject: true if the request was for a directory object (ends with "/")
|
|
// - error: any error encountered while checking
|
|
func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.Entry, bool, error) {
|
|
if !strings.HasSuffix(object, "/") {
|
|
return nil, false, nil // Not a directory object
|
|
}
|
|
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
cleanObject := strings.TrimSuffix(strings.TrimPrefix(object, "/"), "/")
|
|
|
|
if cleanObject == "" {
|
|
return nil, true, nil // Root level directory object, but we don't handle it
|
|
}
|
|
|
|
// Check if directory exists
|
|
dirEntry, err := s3a.getEntry(bucketDir, cleanObject)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
return nil, true, nil // Directory object requested but doesn't exist
|
|
}
|
|
return nil, true, err // Other errors should be propagated
|
|
}
|
|
|
|
if !dirEntry.IsDirectory {
|
|
return nil, true, nil // Exists but not a directory
|
|
}
|
|
|
|
return dirEntry, true, nil
|
|
}
|
|
|
|
// serveDirectoryContent serves the content of a directory object directly
|
|
func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) {
|
|
// Set content type - use stored MIME type or default
|
|
contentType := entry.Attributes.Mime
|
|
if contentType == "" {
|
|
contentType = "application/octet-stream"
|
|
}
|
|
w.Header().Set("Content-Type", contentType)
|
|
|
|
// Set content length - use FileSize for accuracy, especially for large files
|
|
contentLength := int64(entry.Attributes.FileSize)
|
|
w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
|
|
|
|
// Set last modified
|
|
w.Header().Set("Last-Modified", time.Unix(entry.Attributes.Mtime, 0).UTC().Format(http.TimeFormat))
|
|
|
|
// Set ETag
|
|
w.Header().Set("ETag", "\""+filer.ETag(entry)+"\"")
|
|
|
|
// For HEAD requests, don't write body
|
|
if r.Method == http.MethodHead {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// Write content
|
|
w.WriteHeader(http.StatusOK)
|
|
if len(entry.Content) > 0 {
|
|
if _, err := w.Write(entry.Content); err != nil {
|
|
glog.Errorf("serveDirectoryContent: failed to write response: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleDirectoryObjectRequest is a helper function that handles directory object requests
|
|
// for both GET and HEAD operations, eliminating code duplication
|
|
func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string) bool {
|
|
// Check if this is a directory object and handle it directly
|
|
if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
|
|
glog.Errorf("%s: error checking directory object %s/%s: %v", handlerName, bucket, object, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return true // Request was handled (with error)
|
|
} else if dirEntry != nil {
|
|
glog.V(2).Infof("%s: directory object %s/%s found, serving content", handlerName, bucket, object)
|
|
s3a.serveDirectoryContent(w, r, dirEntry)
|
|
return true // Request was handled successfully
|
|
} else if isDirectoryObject {
|
|
// Directory object but doesn't exist
|
|
glog.V(2).Infof("%s: directory object %s/%s not found", handlerName, bucket, object)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return true // Request was handled (with not found)
|
|
}
|
|
|
|
return false // Not a directory object, continue with normal processing
|
|
}
|
|
|
|
func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) {
|
|
storageClass := "STANDARD"
|
|
if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
|
|
storageClass = string(v)
|
|
}
|
|
keyFormat := "%s/%s"
|
|
if isDirectory {
|
|
keyFormat += "/"
|
|
}
|
|
if key == "" {
|
|
key = fmt.Sprintf(keyFormat, dir, name)[len(bucketPrefix):]
|
|
}
|
|
if encodingTypeUrl {
|
|
key = urlPathEscape(key)
|
|
}
|
|
listEntry = ListEntry{
|
|
Key: key,
|
|
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
|
|
ETag: "\"" + filer.ETag(entry) + "\"",
|
|
Size: int64(filer.FileSize(entry)),
|
|
StorageClass: StorageClass(storageClass),
|
|
}
|
|
if fetchOwner {
|
|
// Extract owner from S3 metadata (Extended attributes) instead of file system attributes
|
|
var ownerID, displayName string
|
|
if entry.Extended != nil {
|
|
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
|
ownerID = string(ownerBytes)
|
|
}
|
|
}
|
|
|
|
// Fallback to anonymous if no S3 owner found
|
|
if ownerID == "" {
|
|
ownerID = s3_constants.AccountAnonymousId
|
|
displayName = "anonymous"
|
|
} else {
|
|
// Get the proper display name from IAM system
|
|
displayName = iam.GetAccountNameById(ownerID)
|
|
// Fallback to ownerID if no display name found
|
|
if displayName == "" {
|
|
displayName = ownerID
|
|
}
|
|
}
|
|
|
|
listEntry.Owner = &CanonicalUser{
|
|
ID: ownerID,
|
|
DisplayName: displayName,
|
|
}
|
|
}
|
|
return listEntry
|
|
}
|
|
|
|
func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
|
|
object = urlPathEscape(removeDuplicateSlashes(object))
|
|
destUrl := fmt.Sprintf("http://%s%s/%s%s",
|
|
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
|
|
return destUrl
|
|
}
|
|
|
|
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
|
|
|
|
// Handle directory objects with shared logic
|
|
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") {
|
|
return // Directory object request was handled
|
|
}
|
|
|
|
// Check conditional headers for read operations
|
|
result := s3a.checkConditionalHeadersForReads(r, bucket, object)
|
|
if result.ErrorCode != s3err.ErrNone {
|
|
glog.V(3).Infof("GetObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
|
|
|
|
// For 304 Not Modified responses, include the ETag header
|
|
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
|
|
w.Header().Set("ETag", result.ETag)
|
|
}
|
|
|
|
s3err.WriteErrorResponse(w, r, result.ErrorCode)
|
|
return
|
|
}
|
|
|
|
// Check for specific version ID in query parameters
|
|
versionId := r.URL.Query().Get("versionId")
|
|
|
|
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
|
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
|
if err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
return
|
|
}
|
|
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
|
|
|
|
var destUrl string
|
|
|
|
if versioningConfigured {
|
|
// Handle versioned GET - all versions are stored in .versions directory
|
|
var targetVersionId string
|
|
var entry *filer_pb.Entry
|
|
|
|
if versionId != "" {
|
|
// Request for specific version
|
|
glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
|
|
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
|
if err != nil {
|
|
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
targetVersionId = versionId
|
|
} else {
|
|
// Request for latest version
|
|
glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object)
|
|
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
|
if err != nil {
|
|
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
if entry.Extended != nil {
|
|
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
|
targetVersionId = string(versionIdBytes)
|
|
}
|
|
}
|
|
// If no version ID found in entry, this is a pre-versioning object
|
|
if targetVersionId == "" {
|
|
targetVersionId = "null"
|
|
}
|
|
}
|
|
|
|
// Check if this is a delete marker
|
|
if entry.Extended != nil {
|
|
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Determine the actual file path based on whether this is a versioned or pre-versioning object
|
|
if targetVersionId == "null" {
|
|
// Pre-versioning object - stored as regular file
|
|
destUrl = s3a.toFilerUrl(bucket, object)
|
|
glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl)
|
|
} else {
|
|
// Versioned object - stored in .versions directory
|
|
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
|
|
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
|
|
glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
|
|
}
|
|
|
|
// Set version ID in response header
|
|
w.Header().Set("x-amz-version-id", targetVersionId)
|
|
|
|
// Add object lock metadata to response headers if present
|
|
s3a.addObjectLockHeadersToResponse(w, entry)
|
|
} else {
|
|
// Handle regular GET (non-versioned)
|
|
destUrl = s3a.toFilerUrl(bucket, object)
|
|
}
|
|
|
|
// Check if this is a range request to an SSE object and modify the approach
|
|
originalRangeHeader := r.Header.Get("Range")
|
|
var sseObject = false
|
|
|
|
// Pre-check if this object is SSE encrypted to avoid filer range conflicts
|
|
if originalRangeHeader != "" {
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
|
|
primarySSEType := s3a.detectPrimarySSEType(objectEntry)
|
|
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
|
|
sseObject = true
|
|
// Temporarily remove Range header to get full encrypted data from filer
|
|
r.Header.Del("Range")
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
|
// Restore the original Range header for SSE processing
|
|
if sseObject && originalRangeHeader != "" {
|
|
r.Header.Set("Range", originalRangeHeader)
|
|
|
|
}
|
|
|
|
// Add SSE metadata headers based on object metadata before SSE processing
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
|
|
s3a.addSSEHeadersToResponse(proxyResponse, objectEntry)
|
|
}
|
|
|
|
// Handle SSE decryption (both SSE-C and SSE-KMS) if needed
|
|
return s3a.handleSSEResponse(r, proxyResponse, w)
|
|
})
|
|
}
|
|
|
|
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
|
|
|
|
// Handle directory objects with shared logic
|
|
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") {
|
|
return // Directory object request was handled
|
|
}
|
|
|
|
// Check conditional headers for read operations
|
|
result := s3a.checkConditionalHeadersForReads(r, bucket, object)
|
|
if result.ErrorCode != s3err.ErrNone {
|
|
glog.V(3).Infof("HeadObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
|
|
|
|
// For 304 Not Modified responses, include the ETag header
|
|
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
|
|
w.Header().Set("ETag", result.ETag)
|
|
}
|
|
|
|
s3err.WriteErrorResponse(w, r, result.ErrorCode)
|
|
return
|
|
}
|
|
|
|
// Check for specific version ID in query parameters
|
|
versionId := r.URL.Query().Get("versionId")
|
|
|
|
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
|
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
|
if err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
return
|
|
}
|
|
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
var destUrl string
|
|
|
|
if versioningConfigured {
|
|
// Handle versioned HEAD - all versions are stored in .versions directory
|
|
var targetVersionId string
|
|
var entry *filer_pb.Entry
|
|
|
|
if versionId != "" {
|
|
// Request for specific version
|
|
glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object)
|
|
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
|
if err != nil {
|
|
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
targetVersionId = versionId
|
|
} else {
|
|
// Request for latest version
|
|
glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object)
|
|
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
|
if err != nil {
|
|
glog.Errorf("Failed to get latest version: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
if entry.Extended != nil {
|
|
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
|
targetVersionId = string(versionIdBytes)
|
|
}
|
|
}
|
|
// If no version ID found in entry, this is a pre-versioning object
|
|
if targetVersionId == "" {
|
|
targetVersionId = "null"
|
|
}
|
|
}
|
|
|
|
// Check if this is a delete marker
|
|
if entry.Extended != nil {
|
|
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Determine the actual file path based on whether this is a versioned or pre-versioning object
|
|
if targetVersionId == "null" {
|
|
// Pre-versioning object - stored as regular file
|
|
destUrl = s3a.toFilerUrl(bucket, object)
|
|
glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl)
|
|
} else {
|
|
// Versioned object - stored in .versions directory
|
|
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
|
|
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
|
|
glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
|
|
}
|
|
|
|
// Set version ID in response header
|
|
w.Header().Set("x-amz-version-id", targetVersionId)
|
|
|
|
// Add object lock metadata to response headers if present
|
|
s3a.addObjectLockHeadersToResponse(w, entry)
|
|
} else {
|
|
// Handle regular HEAD (non-versioned)
|
|
destUrl = s3a.toFilerUrl(bucket, object)
|
|
}
|
|
|
|
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
|
// Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests
|
|
return s3a.handleSSEResponse(r, proxyResponse, w)
|
|
})
|
|
}
|
|
|
|
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) {
|
|
|
|
glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
|
|
start := time.Now()
|
|
|
|
proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
|
|
|
|
if err != nil {
|
|
glog.Errorf("NewRequest %s: %v", destUrl, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
|
proxyReq.Header.Set("Accept-Encoding", "identity")
|
|
for k, v := range r.URL.Query() {
|
|
if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok {
|
|
proxyReq.Header[k] = v
|
|
}
|
|
if k == "partNumber" {
|
|
proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v
|
|
}
|
|
}
|
|
for header, values := range r.Header {
|
|
proxyReq.Header[header] = values
|
|
}
|
|
if proxyReq.ContentLength == 0 && r.ContentLength != 0 {
|
|
proxyReq.ContentLength = r.ContentLength
|
|
}
|
|
|
|
// ensure that the Authorization header is overriding any previous
|
|
// Authorization header which might be already present in proxyReq
|
|
s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
|
|
resp, postErr := s3a.client.Do(proxyReq)
|
|
|
|
if postErr != nil {
|
|
glog.Errorf("post to filer: %v", postErr)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
defer util_http.CloseResponse(resp)
|
|
|
|
if resp.StatusCode == http.StatusPreconditionFailed {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
|
|
return
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
|
|
return
|
|
}
|
|
|
|
if r.Method == http.MethodDelete {
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
// this is normal
|
|
responseStatusCode, _ := responseFn(resp, w)
|
|
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
|
|
return
|
|
}
|
|
}
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
|
|
TimeToFirstByte(r.Method, start, r)
|
|
if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" {
|
|
responseStatusCode, _ := responseFn(resp, w)
|
|
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
|
|
return
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusInternalServerError {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// when HEAD a directory, it should be reported as no such key
|
|
// https://github.com/seaweedfs/seaweedfs/issues/3457
|
|
if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
|
return
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusBadRequest {
|
|
resp_body, _ := io.ReadAll(resp.Body)
|
|
switch string(resp_body) {
|
|
case "InvalidPart":
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
|
|
default:
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
}
|
|
resp.Body.Close()
|
|
return
|
|
}
|
|
|
|
setUserMetadataKeyToLowercase(resp)
|
|
|
|
responseStatusCode, bytesTransferred := responseFn(resp, w)
|
|
BucketTrafficSent(bytesTransferred, r)
|
|
|
|
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
|
|
}
|
|
|
|
func setUserMetadataKeyToLowercase(resp *http.Response) {
|
|
for key, value := range resp.Header {
|
|
if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) {
|
|
resp.Header[strings.ToLower(key)] = value
|
|
delete(resp.Header, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string {
|
|
captured := make(map[string]string)
|
|
for _, corsHeader := range headersToCapture {
|
|
if value := w.Header().Get(corsHeader); value != "" {
|
|
captured[corsHeader] = value
|
|
}
|
|
}
|
|
return captured
|
|
}
|
|
|
|
func restoreCORSHeaders(w http.ResponseWriter, capturedCORSHeaders map[string]string) {
|
|
for corsHeader, value := range capturedCORSHeaders {
|
|
w.Header().Set(corsHeader, value)
|
|
}
|
|
}
|
|
|
|
// writeFinalResponse handles the common response writing logic shared between
|
|
// passThroughResponse and handleSSECResponse
|
|
func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bodyReader io.Reader, capturedCORSHeaders map[string]string) (statusCode int, bytesTransferred int64) {
|
|
// Restore CORS headers that were set by middleware
|
|
restoreCORSHeaders(w, capturedCORSHeaders)
|
|
|
|
if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
|
|
statusCode = http.StatusPartialContent
|
|
} else {
|
|
statusCode = proxyResponse.StatusCode
|
|
}
|
|
w.WriteHeader(statusCode)
|
|
|
|
// Stream response data
|
|
buf := mem.Allocate(128 * 1024)
|
|
defer mem.Free(buf)
|
|
bytesTransferred, err := io.CopyBuffer(w, bodyReader, buf)
|
|
if err != nil {
|
|
glog.V(1).Infof("response read %d bytes: %v", bytesTransferred, err)
|
|
}
|
|
return statusCode, bytesTransferred
|
|
}
|
|
|
|
func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
|
// Capture existing CORS headers that may have been set by middleware
|
|
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
|
|
|
|
// Copy headers from proxy response
|
|
for k, v := range proxyResponse.Header {
|
|
w.Header()[k] = v
|
|
}
|
|
|
|
return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
|
|
}
|
|
|
|
// handleSSECResponse handles SSE-C decryption and response processing
|
|
func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
|
// Check if the object has SSE-C metadata
|
|
sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
|
|
sseKeyMD5 := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5)
|
|
isObjectEncrypted := sseAlgorithm != "" && sseKeyMD5 != ""
|
|
|
|
// Parse SSE-C headers from request once (avoid duplication)
|
|
customerKey, err := ParseSSECHeaders(r)
|
|
if err != nil {
|
|
errCode := MapSSECErrorToS3Error(err)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return http.StatusBadRequest, 0
|
|
}
|
|
|
|
if isObjectEncrypted {
|
|
// This object was encrypted with SSE-C, validate customer key
|
|
if customerKey == nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
|
|
return http.StatusBadRequest, 0
|
|
}
|
|
|
|
// SSE-C MD5 is base64 and case-sensitive
|
|
if customerKey.KeyMD5 != sseKeyMD5 {
|
|
// For GET/HEAD requests, AWS S3 returns 403 Forbidden for a key mismatch.
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return http.StatusForbidden, 0
|
|
}
|
|
|
|
// SSE-C encrypted objects support HTTP Range requests
|
|
// The IV is stored in metadata and CTR mode allows seeking to any offset
|
|
// Range requests will be handled by the filer layer with proper offset-based decryption
|
|
|
|
// Check if this is a chunked or small content SSE-C object
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
if entry, err := s3a.getEntry("", objectPath); err == nil {
|
|
// Check for SSE-C chunks
|
|
sseCChunks := 0
|
|
for _, chunk := range entry.GetChunks() {
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
|
sseCChunks++
|
|
}
|
|
}
|
|
|
|
if sseCChunks >= 1 {
|
|
|
|
// Handle chunked SSE-C objects - each chunk needs independent decryption
|
|
multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse)
|
|
if decErr != nil {
|
|
glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
|
|
// Capture existing CORS headers
|
|
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
|
|
|
|
// Copy headers from proxy response
|
|
for k, v := range proxyResponse.Header {
|
|
w.Header()[k] = v
|
|
}
|
|
|
|
// Set proper headers for range requests
|
|
rangeHeader := r.Header.Get("Range")
|
|
if rangeHeader != "" {
|
|
|
|
// Parse range header (e.g., "bytes=0-99")
|
|
if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
|
|
rangeSpec := rangeHeader[6:]
|
|
parts := strings.Split(rangeSpec, "-")
|
|
if len(parts) == 2 {
|
|
startOffset, endOffset := int64(0), int64(-1)
|
|
if parts[0] != "" {
|
|
startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
|
|
}
|
|
if parts[1] != "" {
|
|
endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
|
|
}
|
|
|
|
if endOffset >= startOffset {
|
|
// Specific range - set proper Content-Length and Content-Range headers
|
|
rangeLength := endOffset - startOffset + 1
|
|
totalSize := proxyResponse.Header.Get("Content-Length")
|
|
|
|
w.Header().Set("Content-Length", strconv.FormatInt(rangeLength, 10))
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startOffset, endOffset, totalSize))
|
|
// writeFinalResponse will set status to 206 if Content-Range is present
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return writeFinalResponse(w, proxyResponse, multipartReader, capturedCORSHeaders)
|
|
} else if len(entry.GetChunks()) == 0 && len(entry.Content) > 0 {
|
|
// Small content SSE-C object stored directly in entry.Content
|
|
|
|
// Fall through to traditional single-object SSE-C handling below
|
|
}
|
|
}
|
|
|
|
// Single-part SSE-C object: Get IV from proxy response headers (stored during upload)
|
|
ivBase64 := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEIVHeader)
|
|
if ivBase64 == "" {
|
|
glog.Errorf("SSE-C encrypted single-part object missing IV in metadata")
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
|
|
iv, err := base64.StdEncoding.DecodeString(ivBase64)
|
|
if err != nil {
|
|
glog.Errorf("Failed to decode IV from metadata: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
|
|
// Create decrypted reader with IV from metadata
|
|
decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey, iv)
|
|
if decErr != nil {
|
|
glog.Errorf("Failed to create SSE-C decrypted reader: %v", decErr)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
|
|
// Capture existing CORS headers that may have been set by middleware
|
|
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
|
|
|
|
// Copy headers from proxy response (excluding body-related headers that might change)
|
|
for k, v := range proxyResponse.Header {
|
|
if k != "Content-Length" && k != "Content-Encoding" {
|
|
w.Header()[k] = v
|
|
}
|
|
}
|
|
|
|
// Set correct Content-Length for SSE-C (only for full object requests)
|
|
// With IV stored in metadata, the encrypted length equals the original length
|
|
if proxyResponse.Header.Get("Content-Range") == "" {
|
|
// Full object request: encrypted length equals original length (IV not in stream)
|
|
if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
|
|
// Content-Length is already correct since IV is stored in metadata, not in data stream
|
|
w.Header().Set("Content-Length", contentLengthStr)
|
|
}
|
|
}
|
|
// For range requests, let the actual bytes transferred determine the response length
|
|
|
|
// Add SSE-C response headers
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, sseAlgorithm)
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, sseKeyMD5)
|
|
|
|
return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
|
|
} else {
|
|
// Object is not encrypted, but check if customer provided SSE-C headers unnecessarily
|
|
if customerKey != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyNotNeeded)
|
|
return http.StatusBadRequest, 0
|
|
}
|
|
|
|
// Normal pass-through response
|
|
return passThroughResponse(proxyResponse, w)
|
|
}
|
|
}
|
|
|
|
// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing
|
|
func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
|
// Check what the client is expecting based on request headers
|
|
clientExpectsSSEC := IsSSECRequest(r)
|
|
|
|
// Check what the stored object has in headers (may be conflicting after copy)
|
|
kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
|
|
sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
|
|
|
|
// Get actual object state by examining chunks (most reliable for cross-encryption)
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
actualObjectType := "Unknown"
|
|
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
|
|
actualObjectType = s3a.detectPrimarySSEType(objectEntry)
|
|
}
|
|
|
|
// Route based on ACTUAL object type (from chunks) rather than conflicting headers
|
|
if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC {
|
|
// Object is SSE-C and client expects SSE-C → SSE-C handler
|
|
return s3a.handleSSECResponse(r, proxyResponse, w)
|
|
} else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC {
|
|
// Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler
|
|
return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
|
|
} else if actualObjectType == "None" && !clientExpectsSSEC {
|
|
// Object is unencrypted and client doesn't expect SSE-C → pass through
|
|
return passThroughResponse(proxyResponse, w)
|
|
} else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC {
|
|
// Object is SSE-C but client doesn't provide SSE-C headers → Error
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
|
|
return http.StatusBadRequest, 0
|
|
} else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC {
|
|
// Object is SSE-KMS but client provides SSE-C headers → Error
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
|
|
return http.StatusBadRequest, 0
|
|
} else if actualObjectType == "None" && clientExpectsSSEC {
|
|
// Object is unencrypted but client provides SSE-C headers → Error
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
|
|
return http.StatusBadRequest, 0
|
|
}
|
|
|
|
// Fallback for edge cases - use original logic with header-based detection
|
|
if clientExpectsSSEC && sseAlgorithm != "" {
|
|
return s3a.handleSSECResponse(r, proxyResponse, w)
|
|
} else if !clientExpectsSSEC && kmsMetadataHeader != "" {
|
|
return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
|
|
} else {
|
|
return passThroughResponse(proxyResponse, w)
|
|
}
|
|
}
|
|
|
|
// handleSSEKMSResponse handles SSE-KMS decryption and response processing
|
|
func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) {
|
|
// Deserialize SSE-KMS metadata
|
|
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader)
|
|
if err != nil {
|
|
glog.Errorf("Failed to decode SSE-KMS metadata: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
|
|
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
|
|
if err != nil {
|
|
glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
|
|
// For HEAD requests, we don't need to decrypt the body, just add response headers
|
|
if r.Method == "HEAD" {
|
|
// Capture existing CORS headers that may have been set by middleware
|
|
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
|
|
|
|
// Copy headers from proxy response
|
|
for k, v := range proxyResponse.Header {
|
|
w.Header()[k] = v
|
|
}
|
|
|
|
// Add SSE-KMS response headers
|
|
AddSSEKMSResponseHeaders(w, sseKMSKey)
|
|
|
|
return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
|
|
}
|
|
|
|
// For GET requests, check if this is a multipart SSE-KMS object
|
|
// We need to check the object structure to determine if it's multipart encrypted
|
|
isMultipartSSEKMS := false
|
|
|
|
if sseKMSKey != nil {
|
|
// Get the object entry to check chunk structure
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
if entry, err := s3a.getEntry("", objectPath); err == nil {
|
|
// Check for multipart SSE-KMS
|
|
sseKMSChunks := 0
|
|
for _, chunk := range entry.GetChunks() {
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
|
|
sseKMSChunks++
|
|
}
|
|
}
|
|
isMultipartSSEKMS = sseKMSChunks > 1
|
|
|
|
glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t",
|
|
len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS)
|
|
}
|
|
}
|
|
|
|
var decryptedReader io.Reader
|
|
if isMultipartSSEKMS {
|
|
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
|
|
multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse)
|
|
if decErr != nil {
|
|
glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
decryptedReader = multipartReader
|
|
glog.V(3).Infof("Using multipart SSE-KMS decryption for object")
|
|
} else {
|
|
// Handle single-part SSE-KMS objects
|
|
singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey)
|
|
if decErr != nil {
|
|
glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return http.StatusInternalServerError, 0
|
|
}
|
|
decryptedReader = singlePartReader
|
|
glog.V(3).Infof("Using single-part SSE-KMS decryption for object")
|
|
}
|
|
|
|
// Capture existing CORS headers that may have been set by middleware
|
|
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
|
|
|
|
// Copy headers from proxy response (excluding body-related headers that might change)
|
|
for k, v := range proxyResponse.Header {
|
|
if k != "Content-Length" && k != "Content-Encoding" {
|
|
w.Header()[k] = v
|
|
}
|
|
}
|
|
|
|
// Set correct Content-Length for SSE-KMS
|
|
if proxyResponse.Header.Get("Content-Range") == "" {
|
|
// For full object requests, encrypted length equals original length
|
|
if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
|
|
w.Header().Set("Content-Length", contentLengthStr)
|
|
}
|
|
}
|
|
|
|
// Add SSE-KMS response headers
|
|
AddSSEKMSResponseHeaders(w, sseKMSKey)
|
|
|
|
return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
|
|
}
|
|
|
|
// addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes
|
|
// and adds the appropriate S3 headers to the response
|
|
func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) {
|
|
if entry == nil || entry.Extended == nil {
|
|
return
|
|
}
|
|
|
|
// Check if this entry has any object lock metadata (indicating it's from an object lock enabled bucket)
|
|
hasObjectLockMode := false
|
|
hasRetentionDate := false
|
|
|
|
// Add object lock mode header if present
|
|
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists && len(modeBytes) > 0 {
|
|
w.Header().Set(s3_constants.AmzObjectLockMode, string(modeBytes))
|
|
hasObjectLockMode = true
|
|
}
|
|
|
|
// Add retention until date header if present
|
|
if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists && len(dateBytes) > 0 {
|
|
dateStr := string(dateBytes)
|
|
// Convert Unix timestamp to ISO8601 format for S3 compatibility
|
|
if timestamp, err := strconv.ParseInt(dateStr, 10, 64); err == nil {
|
|
retainUntilDate := time.Unix(timestamp, 0).UTC()
|
|
w.Header().Set(s3_constants.AmzObjectLockRetainUntilDate, retainUntilDate.Format(time.RFC3339))
|
|
hasRetentionDate = true
|
|
} else {
|
|
glog.Errorf("addObjectLockHeadersToResponse: failed to parse retention until date from stored metadata (dateStr: %s): %v", dateStr, err)
|
|
}
|
|
}
|
|
|
|
// Add legal hold header - AWS S3 behavior: always include legal hold for object lock enabled buckets
|
|
if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists && len(legalHoldBytes) > 0 {
|
|
// Return stored S3 standard "ON"/"OFF" values directly
|
|
w.Header().Set(s3_constants.AmzObjectLockLegalHold, string(legalHoldBytes))
|
|
} else if hasObjectLockMode || hasRetentionDate {
|
|
// If this entry has object lock metadata (indicating object lock enabled bucket)
|
|
// but no legal hold specifically set, default to "OFF" as per AWS S3 behavior
|
|
w.Header().Set(s3_constants.AmzObjectLockLegalHold, s3_constants.LegalHoldOff)
|
|
}
|
|
}
|
|
|
|
// addSSEHeadersToResponse converts stored SSE metadata from entry.Extended to HTTP response headers
|
|
// Uses intelligent prioritization: only set headers for the PRIMARY encryption type to avoid conflicts
|
|
func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, entry *filer_pb.Entry) {
|
|
if entry == nil || entry.Extended == nil {
|
|
return
|
|
}
|
|
|
|
// Determine the primary encryption type by examining chunks (most reliable)
|
|
primarySSEType := s3a.detectPrimarySSEType(entry)
|
|
|
|
// Only set headers for the PRIMARY encryption type
|
|
switch primarySSEType {
|
|
case s3_constants.SSETypeC:
|
|
// Add only SSE-C headers
|
|
if algorithmBytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists && len(algorithmBytes) > 0 {
|
|
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algorithmBytes))
|
|
}
|
|
|
|
if keyMD5Bytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists && len(keyMD5Bytes) > 0 {
|
|
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5Bytes))
|
|
}
|
|
|
|
if ivBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists && len(ivBytes) > 0 {
|
|
ivBase64 := base64.StdEncoding.EncodeToString(ivBytes)
|
|
proxyResponse.Header.Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64)
|
|
}
|
|
|
|
case s3_constants.SSETypeKMS:
|
|
// Add only SSE-KMS headers
|
|
if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryption]; exists && len(sseAlgorithm) > 0 {
|
|
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryption, string(sseAlgorithm))
|
|
}
|
|
|
|
if kmsKeyID, exists := entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists && len(kmsKeyID) > 0 {
|
|
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, string(kmsKeyID))
|
|
}
|
|
|
|
default:
|
|
// Unencrypted or unknown - don't set any SSE headers
|
|
}
|
|
|
|
glog.V(3).Infof("addSSEHeadersToResponse: processed %d extended metadata entries", len(entry.Extended))
|
|
}
|
|
|
|
// detectPrimarySSEType determines the primary SSE type by examining chunk metadata
|
|
func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
|
|
if len(entry.GetChunks()) == 0 {
|
|
// No chunks - check object-level metadata only (single objects or smallContent)
|
|
hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil
|
|
hasSSEKMS := entry.Extended[s3_constants.AmzServerSideEncryption] != nil
|
|
|
|
if hasSSEC && !hasSSEKMS {
|
|
return s3_constants.SSETypeC
|
|
} else if hasSSEKMS && !hasSSEC {
|
|
return s3_constants.SSETypeKMS
|
|
} else if hasSSEC && hasSSEKMS {
|
|
// Both present - this should only happen during cross-encryption copies
|
|
// Use content to determine actual encryption state
|
|
if len(entry.Content) > 0 {
|
|
// smallContent - check if it's encrypted (heuristic: random-looking data)
|
|
return s3_constants.SSETypeC // Default to SSE-C for mixed case
|
|
} else {
|
|
// No content, both headers - default to SSE-C
|
|
return s3_constants.SSETypeC
|
|
}
|
|
}
|
|
return "None"
|
|
}
|
|
|
|
// Count chunk types to determine primary (multipart objects)
|
|
ssecChunks := 0
|
|
ssekmsChunks := 0
|
|
|
|
for _, chunk := range entry.GetChunks() {
|
|
switch chunk.GetSseType() {
|
|
case filer_pb.SSEType_SSE_C:
|
|
ssecChunks++
|
|
case filer_pb.SSEType_SSE_KMS:
|
|
ssekmsChunks++
|
|
}
|
|
}
|
|
|
|
// Primary type is the one with more chunks
|
|
if ssecChunks > ssekmsChunks {
|
|
return s3_constants.SSETypeC
|
|
} else if ssekmsChunks > ssecChunks {
|
|
return s3_constants.SSETypeKMS
|
|
} else if ssecChunks > 0 {
|
|
// Equal number, prefer SSE-C (shouldn't happen in practice)
|
|
return s3_constants.SSETypeC
|
|
}
|
|
|
|
return "None"
|
|
}
|
|
|
|
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
|
|
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
|
|
// Get the object path from the request
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
|
|
// Get the object entry from filer to access chunk information
|
|
entry, err := s3a.getEntry("", objectPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err)
|
|
}
|
|
|
|
// Sort chunks by offset to ensure correct order
|
|
chunks := entry.GetChunks()
|
|
sort.Slice(chunks, func(i, j int) bool {
|
|
return chunks[i].GetOffset() < chunks[j].GetOffset()
|
|
})
|
|
|
|
// Create readers for each chunk, decrypting them independently
|
|
var readers []io.Reader
|
|
|
|
for i, chunk := range chunks {
|
|
glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d",
|
|
i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType())
|
|
|
|
// Get this chunk's encrypted data
|
|
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
|
|
}
|
|
|
|
// Get SSE-KMS metadata for this chunk
|
|
var chunkSSEKMSKey *SSEKMSKey
|
|
|
|
// Check if this chunk has per-chunk SSE-KMS metadata (new architecture)
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
|
|
// Use the per-chunk SSE-KMS metadata
|
|
kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
|
|
if err != nil {
|
|
glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
|
|
} else {
|
|
// ChunkOffset is already set from the stored metadata (PartOffset)
|
|
chunkSSEKMSKey = kmsKey
|
|
glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d",
|
|
chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset)
|
|
}
|
|
}
|
|
|
|
// Fallback to object-level metadata (legacy support)
|
|
if chunkSSEKMSKey == nil {
|
|
objectMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
|
|
if objectMetadataHeader != "" {
|
|
kmsMetadataBytes, decodeErr := base64.StdEncoding.DecodeString(objectMetadataHeader)
|
|
if decodeErr == nil {
|
|
kmsKey, _ := DeserializeSSEKMSMetadata(kmsMetadataBytes)
|
|
if kmsKey != nil {
|
|
// For object-level metadata (legacy), use absolute file offset as fallback
|
|
kmsKey.ChunkOffset = chunk.GetOffset()
|
|
chunkSSEKMSKey = kmsKey
|
|
}
|
|
glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset())
|
|
}
|
|
}
|
|
}
|
|
|
|
if chunkSSEKMSKey == nil {
|
|
return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString())
|
|
}
|
|
|
|
// Create decrypted reader for this chunk
|
|
decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey)
|
|
if decErr != nil {
|
|
chunkReader.Close() // Close the chunk reader if decryption fails
|
|
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
|
|
}
|
|
|
|
// Use the streaming decrypted reader directly instead of reading into memory
|
|
readers = append(readers, decryptedChunkReader)
|
|
glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString())
|
|
}
|
|
|
|
// Combine all decrypted chunk readers into a single stream with proper resource management
|
|
multiReader := NewMultipartSSEReader(readers)
|
|
glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers))
|
|
|
|
return multiReader, nil
|
|
}
|
|
|
|
// createEncryptedChunkReader creates a reader for a single encrypted chunk
|
|
func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
|
|
// Get chunk URL
|
|
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err)
|
|
}
|
|
|
|
// Create HTTP request for chunk data
|
|
req, err := http.NewRequest("GET", srcUrl, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create HTTP request for chunk: %v", err)
|
|
}
|
|
|
|
// Execute request
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("execute HTTP request for chunk: %v", err)
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
resp.Body.Close()
|
|
return nil, fmt.Errorf("HTTP request for chunk failed: %d", resp.StatusCode)
|
|
}
|
|
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// MultipartSSEReader wraps multiple readers and ensures all underlying readers are properly closed
|
|
type MultipartSSEReader struct {
|
|
multiReader io.Reader
|
|
readers []io.Reader
|
|
}
|
|
|
|
// SSERangeReader applies range logic to an underlying reader
|
|
type SSERangeReader struct {
|
|
reader io.Reader
|
|
offset int64 // bytes to skip from the beginning
|
|
remaining int64 // bytes remaining to read (-1 for unlimited)
|
|
skipped int64 // bytes already skipped
|
|
}
|
|
|
|
// NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers
|
|
func NewMultipartSSEReader(readers []io.Reader) *MultipartSSEReader {
|
|
return &MultipartSSEReader{
|
|
multiReader: io.MultiReader(readers...),
|
|
readers: readers,
|
|
}
|
|
}
|
|
|
|
// Read implements the io.Reader interface
|
|
func (m *MultipartSSEReader) Read(p []byte) (n int, err error) {
|
|
return m.multiReader.Read(p)
|
|
}
|
|
|
|
// Close implements the io.Closer interface and closes all underlying readers that support closing
|
|
func (m *MultipartSSEReader) Close() error {
|
|
var lastErr error
|
|
for i, reader := range m.readers {
|
|
if closer, ok := reader.(io.Closer); ok {
|
|
if err := closer.Close(); err != nil {
|
|
glog.V(2).Infof("Error closing reader %d: %v", i, err)
|
|
lastErr = err // Keep track of the last error, but continue closing others
|
|
}
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// Read implements the io.Reader interface for SSERangeReader
|
|
func (r *SSERangeReader) Read(p []byte) (n int, err error) {
|
|
|
|
// If we need to skip bytes and haven't skipped enough yet
|
|
if r.skipped < r.offset {
|
|
skipNeeded := r.offset - r.skipped
|
|
skipBuf := make([]byte, min(int64(len(p)), skipNeeded))
|
|
skipRead, skipErr := r.reader.Read(skipBuf)
|
|
r.skipped += int64(skipRead)
|
|
|
|
if skipErr != nil {
|
|
return 0, skipErr
|
|
}
|
|
|
|
// If we still need to skip more, recurse
|
|
if r.skipped < r.offset {
|
|
return r.Read(p)
|
|
}
|
|
}
|
|
|
|
// If we have a remaining limit and it's reached
|
|
if r.remaining == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
// Calculate how much to read
|
|
readSize := len(p)
|
|
if r.remaining > 0 && int64(readSize) > r.remaining {
|
|
readSize = int(r.remaining)
|
|
}
|
|
|
|
// Read the data
|
|
n, err = r.reader.Read(p[:readSize])
|
|
if r.remaining > 0 {
|
|
r.remaining -= int64(n)
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
|
|
// Each chunk has its own IV and encryption key from the original multipart parts
|
|
func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
|
|
// Parse SSE-C headers from the request for decryption key
|
|
customerKey, err := ParseSSECHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err)
|
|
}
|
|
|
|
// Get the object path from the request
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
|
|
|
// Get the object entry from filer to access chunk information
|
|
entry, err := s3a.getEntry("", objectPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err)
|
|
}
|
|
|
|
// Sort chunks by offset to ensure correct order
|
|
chunks := entry.GetChunks()
|
|
sort.Slice(chunks, func(i, j int) bool {
|
|
return chunks[i].GetOffset() < chunks[j].GetOffset()
|
|
})
|
|
|
|
// Check for Range header to optimize chunk processing
|
|
var startOffset, endOffset int64 = 0, -1
|
|
rangeHeader := r.Header.Get("Range")
|
|
if rangeHeader != "" {
|
|
// Parse range header (e.g., "bytes=0-99")
|
|
if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
|
|
rangeSpec := rangeHeader[6:]
|
|
parts := strings.Split(rangeSpec, "-")
|
|
if len(parts) == 2 {
|
|
if parts[0] != "" {
|
|
startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
|
|
}
|
|
if parts[1] != "" {
|
|
endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Filter chunks to only those needed for the range request
|
|
var neededChunks []*filer_pb.FileChunk
|
|
for _, chunk := range chunks {
|
|
chunkStart := chunk.GetOffset()
|
|
chunkEnd := chunkStart + int64(chunk.GetSize()) - 1
|
|
|
|
// Check if this chunk overlaps with the requested range
|
|
if endOffset == -1 {
|
|
// No end specified, take all chunks from startOffset
|
|
if chunkEnd >= startOffset {
|
|
neededChunks = append(neededChunks, chunk)
|
|
}
|
|
} else {
|
|
// Specific range: check for overlap
|
|
if chunkStart <= endOffset && chunkEnd >= startOffset {
|
|
neededChunks = append(neededChunks, chunk)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create readers for only the needed chunks
|
|
var readers []io.Reader
|
|
|
|
for _, chunk := range neededChunks {
|
|
|
|
// Get this chunk's encrypted data
|
|
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
|
|
}
|
|
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
|
// For SSE-C chunks, extract the IV from the stored per-chunk metadata (unified approach)
|
|
if len(chunk.GetSseMetadata()) > 0 {
|
|
// Deserialize the SSE-C metadata stored in the unified metadata field
|
|
ssecMetadata, decErr := DeserializeSSECMetadata(chunk.GetSseMetadata())
|
|
if decErr != nil {
|
|
return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), decErr)
|
|
}
|
|
|
|
// Decode the IV from the metadata
|
|
iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV)
|
|
if ivErr != nil {
|
|
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr)
|
|
}
|
|
|
|
// Calculate the correct IV for this chunk using within-part offset
|
|
var chunkIV []byte
|
|
if ssecMetadata.PartOffset > 0 {
|
|
chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset)
|
|
} else {
|
|
chunkIV = iv
|
|
}
|
|
|
|
decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
|
|
if decErr != nil {
|
|
return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr)
|
|
}
|
|
readers = append(readers, decryptedReader)
|
|
glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString())
|
|
} else {
|
|
return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString())
|
|
}
|
|
} else {
|
|
// Non-SSE-C chunk, use as-is
|
|
readers = append(readers, chunkReader)
|
|
}
|
|
}
|
|
|
|
multiReader := NewMultipartSSEReader(readers)
|
|
|
|
// Apply range logic if a range was requested
|
|
if rangeHeader != "" && startOffset >= 0 {
|
|
if endOffset == -1 {
|
|
// Open-ended range (e.g., "bytes=100-")
|
|
return &SSERangeReader{
|
|
reader: multiReader,
|
|
offset: startOffset,
|
|
remaining: -1, // Read until EOF
|
|
}, nil
|
|
} else {
|
|
// Specific range (e.g., "bytes=0-99")
|
|
rangeLength := endOffset - startOffset + 1
|
|
return &SSERangeReader{
|
|
reader: multiReader,
|
|
offset: startOffset,
|
|
remaining: rangeLength,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
return multiReader, nil
|
|
}
|