mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-06-16 07:32:12 +02:00
fix completed multiupload lost data (#5460)
If there are putObjectPart requests with the same uploadId during completeMultiPart, it can result in data loss. putObjectPart requests might be due to timeout retries. Co-authored-by: Yang Wang <yangwang@weride.ai>
This commit is contained in:
parent
d5d8b8e2ae
commit
b19c9847c6
|
@ -4,9 +4,6 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
"math"
|
"math"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -14,6 +11,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
|
||||||
|
@ -90,40 +91,55 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||||
return nil, s3err.ErrNoSuchUpload
|
return nil, s3err.ErrNoSuchUpload
|
||||||
}
|
}
|
||||||
|
|
||||||
// check whether completedParts is more than received parts
|
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
|
||||||
{
|
for _, entry := range entries {
|
||||||
partNumbers := make(map[int]struct{}, len(entries))
|
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
|
||||||
for _, entry := range entries {
|
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
||||||
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
var partNumberString string
|
||||||
partNumberString := entry.Name[:len(entry.Name)-len(".part")]
|
index := strings.Index(entry.Name, "_")
|
||||||
partNumber, err := strconv.Atoi(partNumberString)
|
if index != -1 {
|
||||||
if err == nil {
|
partNumberString = entry.Name[:index]
|
||||||
partNumbers[partNumber] = struct{}{}
|
} else {
|
||||||
}
|
partNumberString = entry.Name[:len(entry.Name)-len(".part")]
|
||||||
}
|
}
|
||||||
}
|
partNumber, err := strconv.Atoi(partNumberString)
|
||||||
for _, part := range completedParts {
|
if err != nil {
|
||||||
if _, found := partNumbers[part.PartNumber]; !found {
|
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", partNumberString, err)
|
||||||
return nil, s3err.ErrInvalidPart
|
continue
|
||||||
}
|
}
|
||||||
|
//there maybe multi same part, because of client retry
|
||||||
|
partEntries[partNumber] = append(partEntries[partNumber], entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mime := pentry.Attributes.Mime
|
mime := pentry.Attributes.Mime
|
||||||
|
|
||||||
var finalParts []*filer_pb.FileChunk
|
var finalParts []*filer_pb.FileChunk
|
||||||
var offset int64
|
var offset int64
|
||||||
|
var deleteEntries []*filer_pb.Entry
|
||||||
|
for _, part := range completedParts {
|
||||||
|
entries := partEntries[part.PartNumber]
|
||||||
|
// check whether completedParts is more than received parts
|
||||||
|
if len(entries) == 0 {
|
||||||
|
glog.Errorf("part %d has no entry", part.PartNumber)
|
||||||
|
return nil, s3err.ErrInvalidPart
|
||||||
|
}
|
||||||
|
|
||||||
for _, entry := range entries {
|
found := false
|
||||||
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
for _, entry := range entries {
|
||||||
partETag, found := findByPartNumber(entry.Name, completedParts)
|
if found {
|
||||||
if !found {
|
deleteEntries = append(deleteEntries, entry)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
partETag := strings.Trim(part.ETag, `"`)
|
||||||
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
|
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
|
||||||
|
glog.Warningf("complete etag %s, partEtag %s", partETag, entryETag)
|
||||||
if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
|
if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
|
||||||
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
|
err = fmt.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
|
||||||
return nil, s3err.ErrInvalidPart
|
deleteEntries = append(deleteEntries, entry)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
for _, chunk := range entry.GetChunks() {
|
for _, chunk := range entry.GetChunks() {
|
||||||
p := &filer_pb.FileChunk{
|
p := &filer_pb.FileChunk{
|
||||||
|
@ -137,7 +153,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||||
finalParts = append(finalParts, p)
|
finalParts = append(finalParts, p)
|
||||||
offset += int64(chunk.Size)
|
offset += int64(chunk.Size)
|
||||||
}
|
}
|
||||||
|
found = true
|
||||||
|
err = nil
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("%s", err)
|
||||||
|
return nil, s3err.ErrInvalidPart
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
entryName := filepath.Base(*input.Key)
|
entryName := filepath.Base(*input.Key)
|
||||||
|
@ -186,6 +209,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, deleteEntry := range deleteEntries {
|
||||||
|
//delete unused part data
|
||||||
|
glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name)
|
||||||
|
if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
|
||||||
|
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
|
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
|
||||||
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
|
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,16 +2,17 @@ package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
||||||
"modernc.org/strutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"modernc.org/strutil"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -170,8 +171,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
|
||||||
|
|
||||||
rangeHeader := r.Header.Get("x-amz-copy-source-range")
|
rangeHeader := r.Header.Get("x-amz-copy-source-range")
|
||||||
|
|
||||||
dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
|
dstUrl := s3a.genPartUploadUrl(dstBucket, uploadID, partID)
|
||||||
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
|
|
||||||
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
|
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
|
||||||
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
|
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||||
|
@ -247,8 +248,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
|
||||||
|
|
||||||
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
|
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
|
||||||
|
|
||||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
|
uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID)
|
||||||
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
|
|
||||||
|
|
||||||
if partID == 1 && r.Header.Get("Content-Type") == "" {
|
if partID == 1 && r.Header.Get("Content-Type") == "" {
|
||||||
dataReader = mimeDetect(r, dataReader)
|
dataReader = mimeDetect(r, dataReader)
|
||||||
|
@ -271,6 +271,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
|
||||||
return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
|
return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
|
||||||
|
return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
|
||||||
|
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
|
||||||
|
}
|
||||||
|
|
||||||
// Generate uploadID hash string from object
|
// Generate uploadID hash string from object
|
||||||
func (s3a *S3ApiServer) generateUploadID(object string) string {
|
func (s3a *S3ApiServer) generateUploadID(object string) string {
|
||||||
if strings.HasPrefix(object, "/") {
|
if strings.HasPrefix(object, "/") {
|
||||||
|
|
Loading…
Reference in a new issue