From e219c578490454aced96b046dceb7313dbbb4a47 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 25 Oct 2020 15:46:29 -0700 Subject: [PATCH] passing full path when assign volume locations --- weed/command/filer_copy.go | 4 ++-- weed/filesys/dirty_page.go | 4 +--- weed/filesys/filehandle.go | 2 +- weed/filesys/wfs_write.go | 5 +++-- weed/replication/sink/filersink/fetch_write.go | 12 ++++++------ weed/replication/sink/filersink/filer_sink.go | 4 ++-- weed/server/webdav_server.go | 2 +- 7 files changed, 16 insertions(+), 17 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index ffdf4e968..2295faa8a 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -219,7 +219,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi fileCopyTaskChan <- FileCopyTask{ sourceLocation: fileOrDir, - destinationUrlPath: destPath, + destinationUrlPath: destPath+fi.Name(), fileSize: fi.Size(), fileMode: fi.Mode(), uid: uid, @@ -405,7 +405,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, Replication: *worker.options.replication, Collection: *worker.options.collection, TtlSec: worker.options.ttlSec, - Path: task.destinationUrlPath, + Path: task.destinationUrlPath+fileName, } assignResult, assignError = client.AssignVolume(context.Background(), request) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 2be319a62..dd0c48796 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -110,10 +110,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, go func() { defer pages.writeWaitGroup.Done() - dir, _ := pages.f.fullpath().DirAndName() - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) pages.chunkSaveErrChan <- err diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index e3fef72ac..54bde3494 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -256,7 +256,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) - chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) + chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 53589b02c..83e40e7f5 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -10,9 +10,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) -func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType { +func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { var fileId, host string @@ -26,7 +27,7 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType { Collection: wfs.option.Collection, TtlSec: wfs.option.TtlSec, DataCenter: wfs.option.DataCenter, - Path: dir, + Path: string(fullPath), } resp, err := client.AssignVolume(context.Background(), request) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 974fb3c87..d193ff81c 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -15,7 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" ) -func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -27,7 +27,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str wg.Add(1) go func(chunk *filer_pb.FileChunk, index int) { defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(chunk, dir) + replicatedChunk, e := fs.replicateOneChunk(chunk, path) if e != nil { err = e } @@ -39,9 +39,9 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str return } -func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(sourceChunk, dir) + fileId, err := fs.fetchAndWrite(sourceChunk, path) if err != nil { return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } @@ -58,7 +58,7 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir stri }, nil } -func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) (fileId string, err error) { filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) if err != nil { @@ -77,7 +77,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) Collection: fs.collection, TtlSec: fs.ttlSec, DataCenter: fs.dataCenter, - Path: dir, + Path: path, } resp, err := client.AssignVolume(context.Background(), request) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index f1d8ff840..6f467ea58 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -96,7 +96,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ } } - replicatedChunks, err := fs.replicateChunks(entry.Chunks, dir) + replicatedChunks, err := fs.replicateChunks(entry.Chunks, key) if err != nil { // only warning here since the source chunk may have been deleted already @@ -180,7 +180,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(newChunks, newParentPath) + replicatedChunks, err := fs.replicateChunks(newChunks, key) if err != nil { return true, fmt.Errorf("replicte %s chunks error: %v", key, err) } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 901f88264..3e9f882e3 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -387,7 +387,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { Count: 1, Replication: "", Collection: f.fs.option.Collection, - Path: dir, + Path: f.name, } resp, err := client.AssignVolume(ctx, request)