mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-06-26 12:29:37 +02:00
[s3] add s3 pass test_multipart_upload (#5474)
add s3 pass test_multipart_upload
This commit is contained in:
parent
d42a04cceb
commit
35cba720a5
1
.github/workflows/s3tests.yml
vendored
1
.github/workflows/s3tests.yml
vendored
|
@ -166,6 +166,7 @@ jobs:
|
||||||
s3tests_boto3/functional/test_s3.py::test_multipart_copy_without_range \
|
s3tests_boto3/functional/test_s3.py::test_multipart_copy_without_range \
|
||||||
s3tests_boto3/functional/test_s3.py::test_multipart_upload_multiple_sizes \
|
s3tests_boto3/functional/test_s3.py::test_multipart_upload_multiple_sizes \
|
||||||
s3tests_boto3/functional/test_s3.py::test_multipart_copy_multiple_sizes \
|
s3tests_boto3/functional/test_s3.py::test_multipart_copy_multiple_sizes \
|
||||||
|
s3tests_boto3/functional/test_s3.py::test_multipart_upload \
|
||||||
s3tests_boto3/functional/test_s3.py::test_multipart_upload_contents \
|
s3tests_boto3/functional/test_s3.py::test_multipart_upload_contents \
|
||||||
s3tests_boto3/functional/test_s3.py::test_multipart_upload_overwrite_existing_object \
|
s3tests_boto3/functional/test_s3.py::test_multipart_upload_overwrite_existing_object \
|
||||||
s3tests_boto3/functional/test_s3.py::test_abort_multipart_upload \
|
s3tests_boto3/functional/test_s3.py::test_abort_multipart_upload \
|
||||||
|
|
2
Makefile
2
Makefile
|
@ -15,7 +15,7 @@ full_install:
|
||||||
cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone"
|
cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone"
|
||||||
|
|
||||||
server: install
|
server: install
|
||||||
weed -v 4 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false -s3.config=./docker/compose/s3.json -metricsPort=9324
|
weed -v 0 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=true -s3.config=./docker/compose/s3.json -metricsPort=9324
|
||||||
|
|
||||||
benchmark: install warp_install
|
benchmark: install warp_install
|
||||||
pkill weed || true
|
pkill weed || true
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/stats"
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
"math"
|
"math"
|
||||||
|
@ -85,19 +86,36 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||||
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
||||||
|
|
||||||
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList)
|
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList)
|
||||||
if err != nil || len(entries) == 0 {
|
if err != nil {
|
||||||
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
|
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
|
||||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||||
return nil, s3err.ErrNoSuchUpload
|
return nil, s3err.ErrNoSuchUpload
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(entries) == 0 {
|
||||||
|
entryName, dirName := s3a.getEntryNameAndDir(input)
|
||||||
|
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
|
||||||
|
if uploadId, ok := entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id]; ok && *input.UploadId == string(uploadId) {
|
||||||
|
return &CompleteMultipartUploadResult{
|
||||||
|
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
|
||||||
|
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
|
||||||
|
Bucket: input.Bucket,
|
||||||
|
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
|
||||||
|
Key: objectKey(input.Key),
|
||||||
|
},
|
||||||
|
}, s3err.ErrNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||||
|
return nil, s3err.ErrNoSuchUpload
|
||||||
|
}
|
||||||
|
|
||||||
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
|
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
|
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
|
||||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||||
return nil, s3err.ErrNoSuchUpload
|
return nil, s3err.ErrNoSuchUpload
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteEntries := []*filer_pb.Entry{}
|
deleteEntries := []*filer_pb.Entry{}
|
||||||
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
|
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
|
@ -183,25 +201,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entryName := filepath.Base(*input.Key)
|
entryName, dirName := s3a.getEntryNameAndDir(input)
|
||||||
dirName := filepath.ToSlash(filepath.Dir(*input.Key))
|
|
||||||
if dirName == "." {
|
|
||||||
dirName = ""
|
|
||||||
}
|
|
||||||
if strings.HasPrefix(dirName, "/") {
|
|
||||||
dirName = dirName[1:]
|
|
||||||
}
|
|
||||||
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
|
|
||||||
|
|
||||||
// remove suffix '/'
|
|
||||||
if strings.HasSuffix(dirName, "/") {
|
|
||||||
dirName = dirName[:len(dirName)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
|
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
|
||||||
if entry.Extended == nil {
|
if entry.Extended == nil {
|
||||||
entry.Extended = make(map[string][]byte)
|
entry.Extended = make(map[string][]byte)
|
||||||
}
|
}
|
||||||
|
entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id] = []byte(*input.UploadId)
|
||||||
for k, v := range pentry.Extended {
|
for k, v := range pentry.Extended {
|
||||||
if k != "key" {
|
if k != "key" {
|
||||||
entry.Extended[k] = v
|
entry.Extended[k] = v
|
||||||
|
@ -243,6 +248,24 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) {
|
||||||
|
entryName := filepath.Base(*input.Key)
|
||||||
|
dirName := filepath.ToSlash(filepath.Dir(*input.Key))
|
||||||
|
if dirName == "." {
|
||||||
|
dirName = ""
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(dirName, "/") {
|
||||||
|
dirName = dirName[1:]
|
||||||
|
}
|
||||||
|
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
|
||||||
|
|
||||||
|
// remove suffix '/'
|
||||||
|
if strings.HasSuffix(dirName, "/") {
|
||||||
|
dirName = dirName[:len(dirName)-1]
|
||||||
|
}
|
||||||
|
return entryName, dirName
|
||||||
|
}
|
||||||
|
|
||||||
func parsePartNumber(fileName string) (int, error) {
|
func parsePartNumber(fileName string) (int, error) {
|
||||||
var partNumberString string
|
var partNumberString string
|
||||||
index := strings.Index(fileName, "_")
|
index := strings.Index(fileName, "_")
|
||||||
|
|
|
@ -39,6 +39,7 @@ const (
|
||||||
AmzTagCount = "x-amz-tagging-count"
|
AmzTagCount = "x-amz-tagging-count"
|
||||||
|
|
||||||
X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key"
|
X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key"
|
||||||
|
X_SeaweedFS_Header_Upload_Id = "X-Seaweedfs-Upload-Id"
|
||||||
|
|
||||||
// S3 ACL headers
|
// S3 ACL headers
|
||||||
AmzCannedAcl = "X-Amz-Acl"
|
AmzCannedAcl = "X-Amz-Acl"
|
||||||
|
|
Loading…
Reference in a new issue