diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go index d22691644..2910b8339 100644 --- a/weed/storage/backend/s3_backend/s3_upload.go +++ b/weed/storage/backend/s3_backend/s3_upload.go @@ -2,12 +2,11 @@ package s3_backend import ( "fmt" - "os" - "sync/atomic" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "os" + "sync" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -40,10 +39,10 @@ func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey }) fileReader := &s3UploadProgressedReader{ - fp: f, - size: fileSize, - read: -fileSize, - fn: fn, + fp: f, + size: fileSize, + signMap: map[int64]struct{}{}, + fn: fn, } // Upload the file to S3. @@ -65,11 +64,14 @@ func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey } // adapted from https://github.com/aws/aws-sdk-go/pull/1868 +// https://github.com/aws/aws-sdk-go/blob/main/example/service/s3/putObjectWithProcess/putObjWithProcess.go type s3UploadProgressedReader struct { - fp *os.File - size int64 - read int64 - fn func(progressed int64, percentage float32) error + fp *os.File + size int64 + read int64 + signMap map[int64]struct{} + mux sync.Mutex + fn func(progressed int64, percentage float32) error } func (r *s3UploadProgressedReader) Read(p []byte) (int, error) { @@ -82,8 +84,14 @@ func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) { return n, err } - // Got the length have read( or means has uploaded), and you can construct your message - atomic.AddInt64(&r.read, int64(n)) + r.mux.Lock() + // Ignore the first signature call + if _, ok := r.signMap[off]; ok { + r.read += int64(n) + } else { + r.signMap[off] = struct{}{} + } + r.mux.Unlock() if r.fn != nil { read := r.read