From b19c9847c6241cc76e087a4b930a776ef5f07d11 Mon Sep 17 00:00:00 2001 From: skycope Date: Wed, 3 Apr 2024 11:18:42 +0800 Subject: [PATCH] fix completed multiupload lost data (#5460) If there are putObjectPart requests with the same uploadId during completeMultiPart, it can result in data loss. putObjectPart requests might be due to timeout retries. Co-authored-by: Yang Wang --- weed/s3api/filer_multipart.go | 76 +++++++++++++------ weed/s3api/s3api_object_copy_handlers.go | 12 +-- weed/s3api/s3api_object_multipart_handlers.go | 9 ++- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 765a5679e..ac9bffe21 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -4,9 +4,6 @@ import ( "encoding/hex" "encoding/xml" "fmt" - "github.com/google/uuid" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "golang.org/x/exp/slices" "math" "path/filepath" "sort" @@ -14,6 +11,10 @@ import ( "strings" "time" + "github.com/google/uuid" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "golang.org/x/exp/slices" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -90,40 +91,55 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return nil, s3err.ErrNoSuchUpload } - // check whether completedParts is more than received parts - { - partNumbers := make(map[int]struct{}, len(entries)) - for _, entry := range entries { - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - partNumberString := entry.Name[:len(entry.Name)-len(".part")] - partNumber, err := strconv.Atoi(partNumberString) - if err == nil { - partNumbers[partNumber] = struct{}{} - } + partEntries := make(map[int][]*filer_pb.Entry, len(entries)) + for _, entry := range entries { + glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) + if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { + var partNumberString string + index := strings.Index(entry.Name, "_") + if index != -1 { + partNumberString = entry.Name[:index] + } else { + partNumberString = entry.Name[:len(entry.Name)-len(".part")] } - } - for _, part := range completedParts { - if _, found := partNumbers[part.PartNumber]; !found { - return nil, s3err.ErrInvalidPart + partNumber, err := strconv.Atoi(partNumberString) + if err != nil { + glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", partNumberString, err) + continue } + //there maybe multi same part, because of client retry + partEntries[partNumber] = append(partEntries[partNumber], entry) } + } mime := pentry.Attributes.Mime var finalParts []*filer_pb.FileChunk var offset int64 + var deleteEntries []*filer_pb.Entry + for _, part := range completedParts { + entries := partEntries[part.PartNumber] + // check whether completedParts is more than received parts + if len(entries) == 0 { + glog.Errorf("part %d has no entry", part.PartNumber) + return nil, s3err.ErrInvalidPart + } - for _, entry := range entries { - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - partETag, found := findByPartNumber(entry.Name, completedParts) - if !found { + found := false + for _, entry := range entries { + if found { + deleteEntries = append(deleteEntries, entry) continue } + + partETag := strings.Trim(part.ETag, `"`) entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) + glog.Warningf("complete etag %s, partEtag %s", partETag, entryETag) if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag { - glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) - return nil, s3err.ErrInvalidPart + err = fmt.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) + deleteEntries = append(deleteEntries, entry) + continue } for _, chunk := range entry.GetChunks() { p := &filer_pb.FileChunk{ @@ -137,7 +153,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa finalParts = append(finalParts, p) offset += int64(chunk.Size) } + found = true + err = nil } + if err != nil { + glog.Errorf("%s", err) + return nil, s3err.ErrInvalidPart + } + } entryName := filepath.Base(*input.Key) @@ -186,6 +209,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa }, } + for _, deleteEntry := range deleteEntries { + //delete unused part data + glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name) + if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil { + glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err) + } + } if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil { glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err) } diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go index 8dc33f213..8d13fe17e 100644 --- a/weed/s3api/s3api_object_copy_handlers.go +++ b/weed/s3api/s3api_object_copy_handlers.go @@ -2,16 +2,17 @@ package s3api import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "modernc.org/strutil" "net/http" "net/url" "strconv" "strings" "time" + "modernc.org/strutil" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -170,8 +171,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req rangeHeader := r.Header.Get("x-amz-copy-source-range") - dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID) + dstUrl := s3a.genPartUploadUrl(dstBucket, uploadID, partID) srcUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject)) diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 187022079..6fecdcf2d 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "github.com/google/uuid" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -247,8 +248,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID) - uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID) + uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID) if partID == 1 && r.Header.Get("Content-Type") == "" { dataReader = mimeDetect(r, dataReader) @@ -271,6 +271,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder) } +func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string { + return fmt.Sprintf("http://%s%s/%s/%04d_%s.part", + s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString()) +} + // Generate uploadID hash string from object func (s3a *S3ApiServer) generateUploadID(object string) string { if strings.HasPrefix(object, "/") {