From 871efa4fc15822d518f1bcc952ebf4d80fb0defd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 Apr 2020 17:20:44 -0700 Subject: [PATCH] refactoring some previous chunk etag was using md5, which should be wrong. --- weed/command/filer_copy.go | 24 ++++--------------- weed/filer2/filer_notify_append.go | 11 +-------- weed/filesys/dirty_page.go | 10 +------- weed/messaging/broker/broker_append.go | 13 +--------- weed/operation/upload_content.go | 14 +++++++++++ .../filer_server_handlers_write_autochunk.go | 12 +--------- .../filer_server_handlers_write_cipher.go | 12 +--------- weed/server/webdav_server.go | 12 +--------- 8 files changed, 25 insertions(+), 83 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index b6ac7854b..322ab20d5 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -14,9 +14,10 @@ import ( "sync" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -324,15 +325,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.FileId, - Offset: 0, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.Md5, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - }) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } @@ -435,15 +428,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) return } - chunksChan <- &filer_pb.FileChunk{ - FileId: assignResult.FileId, - Offset: i * chunkSize, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - } + chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i * chunkSize) + fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) }(i) } diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go index efe7b0213..af291058c 100644 --- a/weed/filer2/filer_notify_append.go +++ b/weed/filer2/filer_notify_append.go @@ -38,16 +38,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { } // append to existing chunks - chunk := &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: offset, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - } - entry.Chunks = append(entry.Chunks, chunk) + entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) // update the entry err = f.CreateEntry(context.Background(), entry, false) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index ce74d64d5..45224b3e7 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -187,15 +187,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } pages.f.wfs.chunkCache.SetChunk(fileId, data) - return &filer_pb.FileChunk{ - FileId: fileId, - Offset: offset, - Size: uint64(size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - }, nil + return uploadResult.ToPbFileChunk(fileId, offset), nil } diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 9c753ea26..e87e197b0 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -3,7 +3,6 @@ package broker import ( "context" "fmt" - "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -23,23 +22,13 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag dir, name := util.FullPath(targetFile).DirAndName() - chunk := &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: 0, // needs to be fixed during appending - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - } - // append the chunk if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AppendToEntryRequest{ Directory: dir, EntryName: name, - Chunks: []*filer_pb.FileChunk{chunk}, + Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)}, } _, err := client.AppendToEntry(context.Background(), request) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 5b0441ff9..1e2c591c5 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -14,8 +14,10 @@ import ( "net/textproto" "path/filepath" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -31,6 +33,18 @@ type UploadResult struct { Md5 string `json:"md5,omitempty"` } +func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + CipherKey: uploadResult.CipherKey, + IsGzipped: uploadResult.Gzip > 0, + } +} + var ( client *http.Client ) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 4c371a9a5..532693742 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -119,17 +119,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } // Save to chunk manifest structure - fileChunks = append(fileChunks, - &filer_pb.FileChunk{ - FileId: fileId, - Offset: chunkOffset, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - }, - ) + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 2dcf4b4e3..bea72b2c1 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -46,17 +46,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } // Save to chunk manifest structure - fileChunks := []*filer_pb.FileChunk{ - { - FileId: fileId, - Offset: 0, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.Md5, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - }, - } + fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)} // fmt.Printf("uploaded: %+v\n", uploadResult) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index a4a1d8b8b..37c4afd5c 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -418,17 +418,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { return 0, fmt.Errorf("upload result: %v", uploadResult.Error) } - chunk := &filer_pb.FileChunk{ - FileId: fileId, - Offset: f.off, - Size: uint64(len(buf)), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - CipherKey: uploadResult.CipherKey, - IsGzipped: uploadResult.Gzip > 0, - } - - f.entry.Chunks = append(f.entry.Chunks, chunk) + f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off)) err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix()