From 6a0bb7106bc7c3524991f8a2a5b78d636613095d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 26 Aug 2021 16:16:26 -0700 Subject: [PATCH] cloud drive: parallelize remote storage downloading --- weed/server/filer_grpc_server_remote.go | 5 +++ weed/shell/command_remote_cache.go | 42 +++++++++++++++++++------ 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 6ceaa3013..8064431c5 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "strings" + "sync" "time" ) @@ -80,12 +81,15 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo var chunks []*filer_pb.FileChunk var fetchAndWriteErr error + var wg sync.WaitGroup limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { localOffset := offset + wg.Add(1) limitedConcurrentExecutor.Execute(func() { + defer wg.Done() size := chunkSize if localOffset+chunkSize > entry.Remote.RemoteSize { size = entry.Remote.RemoteSize - localOffset @@ -147,6 +151,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo }) } + wg.Wait() if fetchAndWriteErr != nil { return nil, fetchAndWriteErr } diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 9570291df..647de0d25 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "sync" ) func init() { @@ -50,6 +51,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) dir := remoteMountCommand.String("dir", "", "a directory in filer") + concurrency := remoteMountCommand.Int("concurrent", 8, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) if err = remoteMountCommand.Parse(args); err != nil { @@ -63,7 +65,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io } // pull content from remote - if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil { + if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil { return fmt.Errorf("cache content data: %v", err) } @@ -117,9 +119,13 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { return false } -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf) error { +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { - return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + var wg sync.WaitGroup + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) + var executionErr error + + traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { if !shouldCacheToLocal(entry) { return true // true means recursive traversal should continue } @@ -128,16 +134,32 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io. return true } - fmt.Fprintf(writer, "Cache %+v ... ", dir.Child(entry.Name)) + wg.Add(1) + limitedConcurrentExecutor.Execute(func() { + defer wg.Done() + fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) - if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { - fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) - return false - } - fmt.Fprintf(writer, "Done\n") + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + if executionErr == nil { + executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err) + } + return + } + fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) + }) return true }) + wg.Wait() + + if traverseErr != nil { + return traverseErr + } + if executionErr != nil { + return executionErr + } + return nil }