1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/s3api/s3api_object_handlers.go
chrislu bbdbb0515e cleanup: remove CI-DEBUG logging from S3 versioning troubleshooting
REMOVED DEBUG LOGS:
- PutObjectHandler: CI-DEBUG logs for object creation tracing
- putVersionedObject: CI-DEBUG logs for version metadata updates
- updateLatestVersionInDirectory: CI-DEBUG logs for retry logic
- getLatestObjectVersion: CI-DEBUG logs for version lookups and race conditions
- HeadObjectHandler: CI-DEBUG logs for latest version requests
- ListObjectVersionsHandler: CI-DEBUG logs for request params and responses

REMOVED S3-TESTS MODIFICATIONS:
- Dynamic sed patches to s3-tests Python code in GitHub Actions
- CI-S3TEST-DEBUG logging for test_bucket_list_return_data_versioning
- CI-S3TEST-DEBUG logging for test_versioning_concurrent_multi_object_delete
- CI-S3TEST-DEBUG logging for _create_objects function

RETAINED FUNCTIONALITY:
✓ Path normalization fix (removeDuplicateSlashes consistency)
✓ Retry logic with exponential backoff for filer consistency
✓ Race condition handling for .versions directory metadata
✓ All existing glog.V(1), glog.V(2), glog.V(3) logging levels

The debug logs served their purpose in identifying and fixing the root cause
of S3 versioning CI failures (path normalization inconsistency). Now that
the issues are resolved, the verbose debug output is no longer needed.
2025-09-15 09:02:56 -07:00

1446 lines
52 KiB
Go

package s3api
import (
"bytes"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// corsHeaders defines the CORS headers that need to be preserved
// Package-level constant to avoid repeated allocations
var corsHeaders = []string{
"Access-Control-Allow-Origin",
"Access-Control-Allow-Methods",
"Access-Control-Allow-Headers",
"Access-Control-Expose-Headers",
"Access-Control-Max-Age",
"Access-Control-Allow-Credentials",
}
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
mimeBuffer := make([]byte, 512)
size, _ := dataReader.Read(mimeBuffer)
if size > 0 {
r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size]))
return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader))
}
return io.NopCloser(dataReader)
}
func urlEscapeObject(object string) string {
t := urlPathEscape(removeDuplicateSlashes(object))
if strings.HasPrefix(t, "/") {
return t
}
return "/" + t
}
func entryUrlEncode(dir string, entry string, encodingTypeUrl bool) (dirName string, entryName string, prefix string) {
if !encodingTypeUrl {
return dir, entry, entry
}
return urlPathEscape(dir), url.QueryEscape(entry), urlPathEscape(entry)
}
func urlPathEscape(object string) string {
var escapedParts []string
for _, part := range strings.Split(object, "/") {
escapedParts = append(escapedParts, strings.ReplaceAll(url.PathEscape(part), "+", "%2B"))
}
return strings.Join(escapedParts, "/")
}
func removeDuplicateSlashes(object string) string {
result := strings.Builder{}
result.Grow(len(object))
isLastSlash := false
for _, r := range object {
switch r {
case '/':
if !isLastSlash {
result.WriteRune(r)
}
isLastSlash = true
default:
result.WriteRune(r)
isLastSlash = false
}
}
return result.String()
}
// checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists
// Returns: (entry, isDirectoryObject, error)
// - entry: the directory entry if found and is a directory
// - isDirectoryObject: true if the request was for a directory object (ends with "/")
// - error: any error encountered while checking
func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.Entry, bool, error) {
if !strings.HasSuffix(object, "/") {
return nil, false, nil // Not a directory object
}
bucketDir := s3a.option.BucketsPath + "/" + bucket
cleanObject := strings.TrimSuffix(strings.TrimPrefix(object, "/"), "/")
if cleanObject == "" {
return nil, true, nil // Root level directory object, but we don't handle it
}
// Check if directory exists
dirEntry, err := s3a.getEntry(bucketDir, cleanObject)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
return nil, true, nil // Directory object requested but doesn't exist
}
return nil, true, err // Other errors should be propagated
}
if !dirEntry.IsDirectory {
return nil, true, nil // Exists but not a directory
}
return dirEntry, true, nil
}
// serveDirectoryContent serves the content of a directory object directly
func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) {
// Set content type - use stored MIME type or default
contentType := entry.Attributes.Mime
if contentType == "" {
contentType = "application/octet-stream"
}
w.Header().Set("Content-Type", contentType)
// Set content length - use FileSize for accuracy, especially for large files
contentLength := int64(entry.Attributes.FileSize)
w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
// Set last modified
w.Header().Set("Last-Modified", time.Unix(entry.Attributes.Mtime, 0).UTC().Format(http.TimeFormat))
// Set ETag
w.Header().Set("ETag", "\""+filer.ETag(entry)+"\"")
// For HEAD requests, don't write body
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
// Write content
w.WriteHeader(http.StatusOK)
if len(entry.Content) > 0 {
if _, err := w.Write(entry.Content); err != nil {
glog.Errorf("serveDirectoryContent: failed to write response: %v", err)
}
}
}
// handleDirectoryObjectRequest is a helper function that handles directory object requests
// for both GET and HEAD operations, eliminating code duplication
func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string) bool {
// Check if this is a directory object and handle it directly
if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
glog.Errorf("%s: error checking directory object %s/%s: %v", handlerName, bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return true // Request was handled (with error)
} else if dirEntry != nil {
glog.V(2).Infof("%s: directory object %s/%s found, serving content", handlerName, bucket, object)
s3a.serveDirectoryContent(w, r, dirEntry)
return true // Request was handled successfully
} else if isDirectoryObject {
// Directory object but doesn't exist
glog.V(2).Infof("%s: directory object %s/%s not found", handlerName, bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return true // Request was handled (with not found)
}
return false // Not a directory object, continue with normal processing
}
func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) {
storageClass := "STANDARD"
if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
storageClass = string(v)
}
keyFormat := "%s/%s"
if isDirectory {
keyFormat += "/"
}
if key == "" {
key = fmt.Sprintf(keyFormat, dir, name)[len(bucketPrefix):]
}
if encodingTypeUrl {
key = urlPathEscape(key)
}
listEntry = ListEntry{
Key: key,
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
ETag: "\"" + filer.ETag(entry) + "\"",
Size: int64(filer.FileSize(entry)),
StorageClass: StorageClass(storageClass),
}
if fetchOwner {
// Extract owner from S3 metadata (Extended attributes) instead of file system attributes
var ownerID, displayName string
if entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
ownerID = string(ownerBytes)
}
}
// Fallback to anonymous if no S3 owner found
if ownerID == "" {
ownerID = s3_constants.AccountAnonymousId
displayName = "anonymous"
} else {
// Get the proper display name from IAM system
displayName = iam.GetAccountNameById(ownerID)
// Fallback to ownerID if no display name found
if displayName == "" {
displayName = ownerID
}
}
listEntry.Owner = &CanonicalUser{
ID: ownerID,
DisplayName: displayName,
}
}
return listEntry
}
func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
object = urlPathEscape(removeDuplicateSlashes(object))
destUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
return destUrl
}
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
// Handle directory objects with shared logic
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") {
return // Directory object request was handled
}
// Check conditional headers for read operations
result := s3a.checkConditionalHeadersForReads(r, bucket, object)
if result.ErrorCode != s3err.ErrNone {
glog.V(3).Infof("GetObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
// For 304 Not Modified responses, include the ETag header
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
w.Header().Set("ETag", result.ETag)
}
s3err.WriteErrorResponse(w, r, result.ErrorCode)
return
}
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
var destUrl string
if versioningConfigured {
// Handle versioned GET - all versions are stored in .versions directory
var targetVersionId string
var entry *filer_pb.Entry
if versionId != "" {
// Request for specific version
glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
targetVersionId = versionId
} else {
// Request for latest version
glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
if targetVersionId == "" {
targetVersionId = "null"
}
}
// Check if this is a delete marker
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
// Determine the actual file path based on whether this is a versioned or pre-versioning object
if targetVersionId == "null" {
// Pre-versioning object - stored as regular file
destUrl = s3a.toFilerUrl(bucket, object)
glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl)
} else {
// Versioned object - stored in .versions directory
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", targetVersionId)
// Add object lock metadata to response headers if present
s3a.addObjectLockHeadersToResponse(w, entry)
} else {
// Handle regular GET (non-versioned)
destUrl = s3a.toFilerUrl(bucket, object)
}
// Check if this is a range request to an SSE object and modify the approach
originalRangeHeader := r.Header.Get("Range")
var sseObject = false
// Pre-check if this object is SSE encrypted to avoid filer range conflicts
if originalRangeHeader != "" {
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
primarySSEType := s3a.detectPrimarySSEType(objectEntry)
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
sseObject = true
// Temporarily remove Range header to get full encrypted data from filer
r.Header.Del("Range")
}
}
}
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Restore the original Range header for SSE processing
if sseObject && originalRangeHeader != "" {
r.Header.Set("Range", originalRangeHeader)
}
// Add SSE metadata headers based on object metadata before SSE processing
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
s3a.addSSEHeadersToResponse(proxyResponse, objectEntry)
}
// Handle SSE decryption (both SSE-C and SSE-KMS) if needed
return s3a.handleSSEResponse(r, proxyResponse, w)
})
}
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
// Handle directory objects with shared logic
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") {
return // Directory object request was handled
}
// Check conditional headers for read operations
result := s3a.checkConditionalHeadersForReads(r, bucket, object)
if result.ErrorCode != s3err.ErrNone {
glog.V(3).Infof("HeadObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
// For 304 Not Modified responses, include the ETag header
if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
w.Header().Set("ETag", result.ETag)
}
s3err.WriteErrorResponse(w, r, result.ErrorCode)
return
}
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var destUrl string
if versioningConfigured {
// Handle versioned HEAD - all versions are stored in .versions directory
var targetVersionId string
var entry *filer_pb.Entry
if versionId != "" {
// Request for specific version
glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
targetVersionId = versionId
} else {
// Request for latest version
glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("Failed to get latest version: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
if targetVersionId == "" {
targetVersionId = "null"
}
}
// Check if this is a delete marker
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
// Determine the actual file path based on whether this is a versioned or pre-versioning object
if targetVersionId == "null" {
// Pre-versioning object - stored as regular file
destUrl = s3a.toFilerUrl(bucket, object)
glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl)
} else {
// Versioned object - stored in .versions directory
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", targetVersionId)
// Add object lock metadata to response headers if present
s3a.addObjectLockHeadersToResponse(w, entry)
} else {
// Handle regular HEAD (non-versioned)
destUrl = s3a.toFilerUrl(bucket, object)
}
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests
return s3a.handleSSEResponse(r, proxyResponse, w)
})
}
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) {
glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
start := time.Now()
proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
if err != nil {
glog.Errorf("NewRequest %s: %v", destUrl, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
proxyReq.Header.Set("Accept-Encoding", "identity")
for k, v := range r.URL.Query() {
if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok {
proxyReq.Header[k] = v
}
if k == "partNumber" {
proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v
}
}
for header, values := range r.Header {
proxyReq.Header[header] = values
}
if proxyReq.ContentLength == 0 && r.ContentLength != 0 {
proxyReq.ContentLength = r.ContentLength
}
// ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
defer util_http.CloseResponse(resp)
if resp.StatusCode == http.StatusPreconditionFailed {
s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
return
}
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return
}
if r.Method == http.MethodDelete {
if resp.StatusCode == http.StatusNotFound {
// this is normal
responseStatusCode, _ := responseFn(resp, w)
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
return
}
}
if resp.StatusCode == http.StatusNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
TimeToFirstByte(r.Method, start, r)
if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" {
responseStatusCode, _ := responseFn(resp, w)
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
return
}
if resp.StatusCode == http.StatusInternalServerError {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// when HEAD a directory, it should be reported as no such key
// https://github.com/seaweedfs/seaweedfs/issues/3457
if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
if resp.StatusCode == http.StatusBadRequest {
resp_body, _ := io.ReadAll(resp.Body)
switch string(resp_body) {
case "InvalidPart":
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
default:
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
}
resp.Body.Close()
return
}
setUserMetadataKeyToLowercase(resp)
responseStatusCode, bytesTransferred := responseFn(resp, w)
BucketTrafficSent(bytesTransferred, r)
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
}
func setUserMetadataKeyToLowercase(resp *http.Response) {
for key, value := range resp.Header {
if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) {
resp.Header[strings.ToLower(key)] = value
delete(resp.Header, key)
}
}
}
func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string {
captured := make(map[string]string)
for _, corsHeader := range headersToCapture {
if value := w.Header().Get(corsHeader); value != "" {
captured[corsHeader] = value
}
}
return captured
}
func restoreCORSHeaders(w http.ResponseWriter, capturedCORSHeaders map[string]string) {
for corsHeader, value := range capturedCORSHeaders {
w.Header().Set(corsHeader, value)
}
}
// writeFinalResponse handles the common response writing logic shared between
// passThroughResponse and handleSSECResponse
func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bodyReader io.Reader, capturedCORSHeaders map[string]string) (statusCode int, bytesTransferred int64) {
// Restore CORS headers that were set by middleware
restoreCORSHeaders(w, capturedCORSHeaders)
if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
statusCode = http.StatusPartialContent
} else {
statusCode = proxyResponse.StatusCode
}
w.WriteHeader(statusCode)
// Stream response data
buf := mem.Allocate(128 * 1024)
defer mem.Free(buf)
bytesTransferred, err := io.CopyBuffer(w, bodyReader, buf)
if err != nil {
glog.V(1).Infof("response read %d bytes: %v", bytesTransferred, err)
}
return statusCode, bytesTransferred
}
func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
}
// handleSSECResponse handles SSE-C decryption and response processing
func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Check if the object has SSE-C metadata
sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
sseKeyMD5 := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5)
isObjectEncrypted := sseAlgorithm != "" && sseKeyMD5 != ""
// Parse SSE-C headers from request once (avoid duplication)
customerKey, err := ParseSSECHeaders(r)
if err != nil {
errCode := MapSSECErrorToS3Error(err)
s3err.WriteErrorResponse(w, r, errCode)
return http.StatusBadRequest, 0
}
if isObjectEncrypted {
// This object was encrypted with SSE-C, validate customer key
if customerKey == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
}
// SSE-C MD5 is base64 and case-sensitive
if customerKey.KeyMD5 != sseKeyMD5 {
// For GET/HEAD requests, AWS S3 returns 403 Forbidden for a key mismatch.
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return http.StatusForbidden, 0
}
// SSE-C encrypted objects support HTTP Range requests
// The IV is stored in metadata and CTR mode allows seeking to any offset
// Range requests will be handled by the filer layer with proper offset-based decryption
// Check if this is a chunked or small content SSE-C object
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
if entry, err := s3a.getEntry("", objectPath); err == nil {
// Check for SSE-C chunks
sseCChunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
sseCChunks++
}
}
if sseCChunks >= 1 {
// Handle chunked SSE-C objects - each chunk needs independent decryption
multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse)
if decErr != nil {
glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
// Capture existing CORS headers
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
// Set proper headers for range requests
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
// Parse range header (e.g., "bytes=0-99")
if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
rangeSpec := rangeHeader[6:]
parts := strings.Split(rangeSpec, "-")
if len(parts) == 2 {
startOffset, endOffset := int64(0), int64(-1)
if parts[0] != "" {
startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
}
if parts[1] != "" {
endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
}
if endOffset >= startOffset {
// Specific range - set proper Content-Length and Content-Range headers
rangeLength := endOffset - startOffset + 1
totalSize := proxyResponse.Header.Get("Content-Length")
w.Header().Set("Content-Length", strconv.FormatInt(rangeLength, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startOffset, endOffset, totalSize))
// writeFinalResponse will set status to 206 if Content-Range is present
}
}
}
}
return writeFinalResponse(w, proxyResponse, multipartReader, capturedCORSHeaders)
} else if len(entry.GetChunks()) == 0 && len(entry.Content) > 0 {
// Small content SSE-C object stored directly in entry.Content
// Fall through to traditional single-object SSE-C handling below
}
}
// Single-part SSE-C object: Get IV from proxy response headers (stored during upload)
ivBase64 := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEIVHeader)
if ivBase64 == "" {
glog.Errorf("SSE-C encrypted single-part object missing IV in metadata")
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
iv, err := base64.StdEncoding.DecodeString(ivBase64)
if err != nil {
glog.Errorf("Failed to decode IV from metadata: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
// Create decrypted reader with IV from metadata
decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey, iv)
if decErr != nil {
glog.Errorf("Failed to create SSE-C decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response (excluding body-related headers that might change)
for k, v := range proxyResponse.Header {
if k != "Content-Length" && k != "Content-Encoding" {
w.Header()[k] = v
}
}
// Set correct Content-Length for SSE-C (only for full object requests)
// With IV stored in metadata, the encrypted length equals the original length
if proxyResponse.Header.Get("Content-Range") == "" {
// Full object request: encrypted length equals original length (IV not in stream)
if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
// Content-Length is already correct since IV is stored in metadata, not in data stream
w.Header().Set("Content-Length", contentLengthStr)
}
}
// For range requests, let the actual bytes transferred determine the response length
// Add SSE-C response headers
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, sseAlgorithm)
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, sseKeyMD5)
return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
} else {
// Object is not encrypted, but check if customer provided SSE-C headers unnecessarily
if customerKey != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyNotNeeded)
return http.StatusBadRequest, 0
}
// Normal pass-through response
return passThroughResponse(proxyResponse, w)
}
}
// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing
func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Check what the client is expecting based on request headers
clientExpectsSSEC := IsSSECRequest(r)
// Check what the stored object has in headers (may be conflicting after copy)
kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
// Get actual object state by examining chunks (most reliable for cross-encryption)
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
actualObjectType := "Unknown"
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
actualObjectType = s3a.detectPrimarySSEType(objectEntry)
}
// Route based on ACTUAL object type (from chunks) rather than conflicting headers
if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC {
// Object is SSE-C and client expects SSE-C → SSE-C handler
return s3a.handleSSECResponse(r, proxyResponse, w)
} else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC {
// Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler
return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
} else if actualObjectType == "None" && !clientExpectsSSEC {
// Object is unencrypted and client doesn't expect SSE-C → pass through
return passThroughResponse(proxyResponse, w)
} else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC {
// Object is SSE-C but client doesn't provide SSE-C headers → Error
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
} else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC {
// Object is SSE-KMS but client provides SSE-C headers → Error
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
} else if actualObjectType == "None" && clientExpectsSSEC {
// Object is unencrypted but client provides SSE-C headers → Error
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
}
// Fallback for edge cases - use original logic with header-based detection
if clientExpectsSSEC && sseAlgorithm != "" {
return s3a.handleSSECResponse(r, proxyResponse, w)
} else if !clientExpectsSSEC && kmsMetadataHeader != "" {
return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
} else {
return passThroughResponse(proxyResponse, w)
}
}
// handleSSEKMSResponse handles SSE-KMS decryption and response processing
func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) {
// Deserialize SSE-KMS metadata
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader)
if err != nil {
glog.Errorf("Failed to decode SSE-KMS metadata: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
// For HEAD requests, we don't need to decrypt the body, just add response headers
if r.Method == "HEAD" {
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
// Add SSE-KMS response headers
AddSSEKMSResponseHeaders(w, sseKMSKey)
return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
}
// For GET requests, check if this is a multipart SSE-KMS object
// We need to check the object structure to determine if it's multipart encrypted
isMultipartSSEKMS := false
if sseKMSKey != nil {
// Get the object entry to check chunk structure
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
if entry, err := s3a.getEntry("", objectPath); err == nil {
// Check for multipart SSE-KMS
sseKMSChunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
sseKMSChunks++
}
}
isMultipartSSEKMS = sseKMSChunks > 1
glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t",
len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS)
}
}
var decryptedReader io.Reader
if isMultipartSSEKMS {
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse)
if decErr != nil {
glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
decryptedReader = multipartReader
glog.V(3).Infof("Using multipart SSE-KMS decryption for object")
} else {
// Handle single-part SSE-KMS objects
singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey)
if decErr != nil {
glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
decryptedReader = singlePartReader
glog.V(3).Infof("Using single-part SSE-KMS decryption for object")
}
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response (excluding body-related headers that might change)
for k, v := range proxyResponse.Header {
if k != "Content-Length" && k != "Content-Encoding" {
w.Header()[k] = v
}
}
// Set correct Content-Length for SSE-KMS
if proxyResponse.Header.Get("Content-Range") == "" {
// For full object requests, encrypted length equals original length
if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
w.Header().Set("Content-Length", contentLengthStr)
}
}
// Add SSE-KMS response headers
AddSSEKMSResponseHeaders(w, sseKMSKey)
return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
}
// addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes
// and adds the appropriate S3 headers to the response
func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) {
if entry == nil || entry.Extended == nil {
return
}
// Check if this entry has any object lock metadata (indicating it's from an object lock enabled bucket)
hasObjectLockMode := false
hasRetentionDate := false
// Add object lock mode header if present
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists && len(modeBytes) > 0 {
w.Header().Set(s3_constants.AmzObjectLockMode, string(modeBytes))
hasObjectLockMode = true
}
// Add retention until date header if present
if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists && len(dateBytes) > 0 {
dateStr := string(dateBytes)
// Convert Unix timestamp to ISO8601 format for S3 compatibility
if timestamp, err := strconv.ParseInt(dateStr, 10, 64); err == nil {
retainUntilDate := time.Unix(timestamp, 0).UTC()
w.Header().Set(s3_constants.AmzObjectLockRetainUntilDate, retainUntilDate.Format(time.RFC3339))
hasRetentionDate = true
} else {
glog.Errorf("addObjectLockHeadersToResponse: failed to parse retention until date from stored metadata (dateStr: %s): %v", dateStr, err)
}
}
// Add legal hold header - AWS S3 behavior: always include legal hold for object lock enabled buckets
if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists && len(legalHoldBytes) > 0 {
// Return stored S3 standard "ON"/"OFF" values directly
w.Header().Set(s3_constants.AmzObjectLockLegalHold, string(legalHoldBytes))
} else if hasObjectLockMode || hasRetentionDate {
// If this entry has object lock metadata (indicating object lock enabled bucket)
// but no legal hold specifically set, default to "OFF" as per AWS S3 behavior
w.Header().Set(s3_constants.AmzObjectLockLegalHold, s3_constants.LegalHoldOff)
}
}
// addSSEHeadersToResponse converts stored SSE metadata from entry.Extended to HTTP response headers
// Uses intelligent prioritization: only set headers for the PRIMARY encryption type to avoid conflicts
func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, entry *filer_pb.Entry) {
if entry == nil || entry.Extended == nil {
return
}
// Determine the primary encryption type by examining chunks (most reliable)
primarySSEType := s3a.detectPrimarySSEType(entry)
// Only set headers for the PRIMARY encryption type
switch primarySSEType {
case s3_constants.SSETypeC:
// Add only SSE-C headers
if algorithmBytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists && len(algorithmBytes) > 0 {
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algorithmBytes))
}
if keyMD5Bytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists && len(keyMD5Bytes) > 0 {
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5Bytes))
}
if ivBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists && len(ivBytes) > 0 {
ivBase64 := base64.StdEncoding.EncodeToString(ivBytes)
proxyResponse.Header.Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64)
}
case s3_constants.SSETypeKMS:
// Add only SSE-KMS headers
if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryption]; exists && len(sseAlgorithm) > 0 {
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryption, string(sseAlgorithm))
}
if kmsKeyID, exists := entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists && len(kmsKeyID) > 0 {
proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, string(kmsKeyID))
}
default:
// Unencrypted or unknown - don't set any SSE headers
}
glog.V(3).Infof("addSSEHeadersToResponse: processed %d extended metadata entries", len(entry.Extended))
}
// detectPrimarySSEType determines the primary SSE type by examining chunk metadata
func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
if len(entry.GetChunks()) == 0 {
// No chunks - check object-level metadata only (single objects or smallContent)
hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil
hasSSEKMS := entry.Extended[s3_constants.AmzServerSideEncryption] != nil
if hasSSEC && !hasSSEKMS {
return s3_constants.SSETypeC
} else if hasSSEKMS && !hasSSEC {
return s3_constants.SSETypeKMS
} else if hasSSEC && hasSSEKMS {
// Both present - this should only happen during cross-encryption copies
// Use content to determine actual encryption state
if len(entry.Content) > 0 {
// smallContent - check if it's encrypted (heuristic: random-looking data)
return s3_constants.SSETypeC // Default to SSE-C for mixed case
} else {
// No content, both headers - default to SSE-C
return s3_constants.SSETypeC
}
}
return "None"
}
// Count chunk types to determine primary (multipart objects)
ssecChunks := 0
ssekmsChunks := 0
for _, chunk := range entry.GetChunks() {
switch chunk.GetSseType() {
case filer_pb.SSEType_SSE_C:
ssecChunks++
case filer_pb.SSEType_SSE_KMS:
ssekmsChunks++
}
}
// Primary type is the one with more chunks
if ssecChunks > ssekmsChunks {
return s3_constants.SSETypeC
} else if ssekmsChunks > ssecChunks {
return s3_constants.SSETypeKMS
} else if ssecChunks > 0 {
// Equal number, prefer SSE-C (shouldn't happen in practice)
return s3_constants.SSETypeC
}
return "None"
}
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
// Get the object path from the request
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
// Get the object entry from filer to access chunk information
entry, err := s3a.getEntry("", objectPath)
if err != nil {
return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err)
}
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].GetOffset() < chunks[j].GetOffset()
})
// Create readers for each chunk, decrypting them independently
var readers []io.Reader
for i, chunk := range chunks {
glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d",
i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType())
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
// Get SSE-KMS metadata for this chunk
var chunkSSEKMSKey *SSEKMSKey
// Check if this chunk has per-chunk SSE-KMS metadata (new architecture)
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
// Use the per-chunk SSE-KMS metadata
kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
if err != nil {
glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
} else {
// ChunkOffset is already set from the stored metadata (PartOffset)
chunkSSEKMSKey = kmsKey
glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d",
chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset)
}
}
// Fallback to object-level metadata (legacy support)
if chunkSSEKMSKey == nil {
objectMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
if objectMetadataHeader != "" {
kmsMetadataBytes, decodeErr := base64.StdEncoding.DecodeString(objectMetadataHeader)
if decodeErr == nil {
kmsKey, _ := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if kmsKey != nil {
// For object-level metadata (legacy), use absolute file offset as fallback
kmsKey.ChunkOffset = chunk.GetOffset()
chunkSSEKMSKey = kmsKey
}
glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset())
}
}
}
if chunkSSEKMSKey == nil {
return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString())
}
// Create decrypted reader for this chunk
decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey)
if decErr != nil {
chunkReader.Close() // Close the chunk reader if decryption fails
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
}
// Use the streaming decrypted reader directly instead of reading into memory
readers = append(readers, decryptedChunkReader)
glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString())
}
// Combine all decrypted chunk readers into a single stream with proper resource management
multiReader := NewMultipartSSEReader(readers)
glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers))
return multiReader, nil
}
// createEncryptedChunkReader creates a reader for a single encrypted chunk
func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
// Get chunk URL
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
if err != nil {
return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err)
}
// Create HTTP request for chunk data
req, err := http.NewRequest("GET", srcUrl, nil)
if err != nil {
return nil, fmt.Errorf("create HTTP request for chunk: %v", err)
}
// Execute request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("execute HTTP request for chunk: %v", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("HTTP request for chunk failed: %d", resp.StatusCode)
}
return resp.Body, nil
}
// MultipartSSEReader wraps multiple readers and ensures all underlying readers are properly closed
type MultipartSSEReader struct {
multiReader io.Reader
readers []io.Reader
}
// SSERangeReader applies range logic to an underlying reader
type SSERangeReader struct {
reader io.Reader
offset int64 // bytes to skip from the beginning
remaining int64 // bytes remaining to read (-1 for unlimited)
skipped int64 // bytes already skipped
}
// NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers
func NewMultipartSSEReader(readers []io.Reader) *MultipartSSEReader {
return &MultipartSSEReader{
multiReader: io.MultiReader(readers...),
readers: readers,
}
}
// Read implements the io.Reader interface
func (m *MultipartSSEReader) Read(p []byte) (n int, err error) {
return m.multiReader.Read(p)
}
// Close implements the io.Closer interface and closes all underlying readers that support closing
func (m *MultipartSSEReader) Close() error {
var lastErr error
for i, reader := range m.readers {
if closer, ok := reader.(io.Closer); ok {
if err := closer.Close(); err != nil {
glog.V(2).Infof("Error closing reader %d: %v", i, err)
lastErr = err // Keep track of the last error, but continue closing others
}
}
}
return lastErr
}
// Read implements the io.Reader interface for SSERangeReader
func (r *SSERangeReader) Read(p []byte) (n int, err error) {
// If we need to skip bytes and haven't skipped enough yet
if r.skipped < r.offset {
skipNeeded := r.offset - r.skipped
skipBuf := make([]byte, min(int64(len(p)), skipNeeded))
skipRead, skipErr := r.reader.Read(skipBuf)
r.skipped += int64(skipRead)
if skipErr != nil {
return 0, skipErr
}
// If we still need to skip more, recurse
if r.skipped < r.offset {
return r.Read(p)
}
}
// If we have a remaining limit and it's reached
if r.remaining == 0 {
return 0, io.EOF
}
// Calculate how much to read
readSize := len(p)
if r.remaining > 0 && int64(readSize) > r.remaining {
readSize = int(r.remaining)
}
// Read the data
n, err = r.reader.Read(p[:readSize])
if r.remaining > 0 {
r.remaining -= int64(n)
}
return n, err
}
// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
// Each chunk has its own IV and encryption key from the original multipart parts
func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
// Parse SSE-C headers from the request for decryption key
customerKey, err := ParseSSECHeaders(r)
if err != nil {
return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err)
}
// Get the object path from the request
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
// Get the object entry from filer to access chunk information
entry, err := s3a.getEntry("", objectPath)
if err != nil {
return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err)
}
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].GetOffset() < chunks[j].GetOffset()
})
// Check for Range header to optimize chunk processing
var startOffset, endOffset int64 = 0, -1
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
// Parse range header (e.g., "bytes=0-99")
if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
rangeSpec := rangeHeader[6:]
parts := strings.Split(rangeSpec, "-")
if len(parts) == 2 {
if parts[0] != "" {
startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
}
if parts[1] != "" {
endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
}
}
}
}
// Filter chunks to only those needed for the range request
var neededChunks []*filer_pb.FileChunk
for _, chunk := range chunks {
chunkStart := chunk.GetOffset()
chunkEnd := chunkStart + int64(chunk.GetSize()) - 1
// Check if this chunk overlaps with the requested range
if endOffset == -1 {
// No end specified, take all chunks from startOffset
if chunkEnd >= startOffset {
neededChunks = append(neededChunks, chunk)
}
} else {
// Specific range: check for overlap
if chunkStart <= endOffset && chunkEnd >= startOffset {
neededChunks = append(neededChunks, chunk)
}
}
}
// Create readers for only the needed chunks
var readers []io.Reader
for _, chunk := range neededChunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
// For SSE-C chunks, extract the IV from the stored per-chunk metadata (unified approach)
if len(chunk.GetSseMetadata()) > 0 {
// Deserialize the SSE-C metadata stored in the unified metadata field
ssecMetadata, decErr := DeserializeSSECMetadata(chunk.GetSseMetadata())
if decErr != nil {
return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), decErr)
}
// Decode the IV from the metadata
iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV)
if ivErr != nil {
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr)
}
// Calculate the correct IV for this chunk using within-part offset
var chunkIV []byte
if ssecMetadata.PartOffset > 0 {
chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset)
} else {
chunkIV = iv
}
decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
if decErr != nil {
return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr)
}
readers = append(readers, decryptedReader)
glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString())
} else {
return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString())
}
} else {
// Non-SSE-C chunk, use as-is
readers = append(readers, chunkReader)
}
}
multiReader := NewMultipartSSEReader(readers)
// Apply range logic if a range was requested
if rangeHeader != "" && startOffset >= 0 {
if endOffset == -1 {
// Open-ended range (e.g., "bytes=100-")
return &SSERangeReader{
reader: multiReader,
offset: startOffset,
remaining: -1, // Read until EOF
}, nil
} else {
// Specific range (e.g., "bytes=0-99")
rangeLength := endOffset - startOffset + 1
return &SSERangeReader{
reader: multiReader,
offset: startOffset,
remaining: rangeLength,
}, nil
}
}
return multiReader, nil
}