diff --git a/.github/workflows/s3tests.yml b/.github/workflows/s3tests.yml index dfd3d4acf..f7dba0c39 100644 --- a/.github/workflows/s3tests.yml +++ b/.github/workflows/s3tests.yml @@ -166,6 +166,7 @@ jobs: s3tests_boto3/functional/test_s3.py::test_multipart_copy_without_range \ s3tests_boto3/functional/test_s3.py::test_multipart_upload_multiple_sizes \ s3tests_boto3/functional/test_s3.py::test_multipart_copy_multiple_sizes \ + s3tests_boto3/functional/test_s3.py::test_multipart_upload \ s3tests_boto3/functional/test_s3.py::test_multipart_upload_contents \ s3tests_boto3/functional/test_s3.py::test_multipart_upload_overwrite_existing_object \ s3tests_boto3/functional/test_s3.py::test_abort_multipart_upload \ diff --git a/Makefile b/Makefile index 269b6ec91..869de2f47 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ full_install: cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone" server: install - weed -v 4 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false -s3.config=./docker/compose/s3.json -metricsPort=9324 + weed -v 0 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=true -s3.config=./docker/compose/s3.json -metricsPort=9324 benchmark: install warp_install pkill weed || true diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index e8491fce7..b2da8193f 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/xml" "fmt" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/stats" "golang.org/x/exp/slices" "math" @@ -85,19 +86,36 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) - if err != nil || len(entries) == 0 { + if err != nil { glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() return nil, s3err.ErrNoSuchUpload } + if len(entries) == 0 { + entryName, dirName := s3a.getEntryNameAndDir(input) + if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { + if uploadId, ok := entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id]; ok && *input.UploadId == string(uploadId) { + return &CompleteMultipartUploadResult{ + CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), + Key: objectKey(input.Key), + }, + }, s3err.ErrNone + } + } + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() + return nil, s3err.ErrNoSuchUpload + } + pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId) if err != nil { glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() return nil, s3err.ErrNoSuchUpload } - deleteEntries := []*filer_pb.Entry{} partEntries := make(map[int][]*filer_pb.Entry, len(entries)) for _, entry := range entries { @@ -183,25 +201,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa } } - entryName := filepath.Base(*input.Key) - dirName := filepath.ToSlash(filepath.Dir(*input.Key)) - if dirName == "." { - dirName = "" - } - if strings.HasPrefix(dirName, "/") { - dirName = dirName[1:] - } - dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) - - // remove suffix '/' - if strings.HasSuffix(dirName, "/") { - dirName = dirName[:len(dirName)-1] - } - + entryName, dirName := s3a.getEntryNameAndDir(input) err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } + entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id] = []byte(*input.UploadId) for k, v := range pentry.Extended { if k != "key" { entry.Extended[k] = v @@ -243,6 +248,24 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return } +func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) { + entryName := filepath.Base(*input.Key) + dirName := filepath.ToSlash(filepath.Dir(*input.Key)) + if dirName == "." { + dirName = "" + } + if strings.HasPrefix(dirName, "/") { + dirName = dirName[1:] + } + dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) + + // remove suffix '/' + if strings.HasSuffix(dirName, "/") { + dirName = dirName[:len(dirName)-1] + } + return entryName, dirName +} + func parsePartNumber(fileName string) (int, error) { var partNumberString string index := strings.Index(fileName, "_") diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 30a878ccb..8e4a2f8c7 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -39,6 +39,7 @@ const ( AmzTagCount = "x-amz-tagging-count" X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key" + X_SeaweedFS_Header_Upload_Id = "X-Seaweedfs-Upload-Id" // S3 ACL headers AmzCannedAcl = "X-Amz-Acl"