diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 53cb2f912..c521ce33e 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -1,14 +1,12 @@ package main import ( - "context" "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "google.golang.org/grpc" - "io" "strconv" "time" ) @@ -74,38 +72,9 @@ func startGenerateMetadata() { func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) { - lastTsNs := int64(0) + tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail", + *dir, 0, 0, eachEntryFunc, false) - tailErr := pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "tail", - PathPrefix: *dir, - SinceNs: lastTsNs, - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - if err = eachEntryFunc(resp); err != nil { - glog.V(0).Infof("tail last record:%+v", time.Unix(0, lastTsNs)) - return err - } - lastTsNs = resp.TsNs - } - - }) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) } diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index fc4dd8298..2828ccb39 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -1,16 +1,13 @@ package command import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" - "io" "time" ) @@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "backup_" + dataSink.GetName(), - PathPrefix: sourcePath, - SinceNs: startFrom.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return fmt.Errorf("processEventFn: %v", err) - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil { - return fmt.Errorf("setOffset: %v", err) - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_" + dataSink.GetName(), + sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false) + } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 28bd367e7..108e76566 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" "google.golang.org/grpc" - "io" "reflect" "time" @@ -190,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return nil } - tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "meta_backup", - PathPrefix: *metaBackup.filerDirectory, - SinceNs: startTime.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - if err = eachEntryFunc(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil { - return err2 - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3)) + return metaBackup.setOffset(lastTime) }) - return tailErr + + return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", + *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) + } func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 76699bb5e..28c0db99b 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -3,16 +3,15 @@ package command import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/golang/protobuf/jsonpb" jsoniter "github.com/json-iterator/go" "github.com/olivere/elastic/v7" - "io" "os" "path/filepath" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -104,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "tail", - PathPrefix: *tailTarget, - SinceNs: time.Now().Add(-*tailStart).UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { + tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", + *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0, + func(resp *filer_pb.SubscribeMetadataResponse) error { + if !shouldPrint(resp) { return nil } - if listenErr != nil { - return listenErr - } - if !shouldPrint(resp) { - continue - } - if err = eachEntryFunc(resp); err != nil { + if err := eachEntryFunc(resp); err != nil { return err } - } + return nil + }, false) - }) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) } diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 7cfc8a7fe..a20f17201 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -15,7 +15,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" - "io" "strings" "time" ) @@ -166,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return persistEventFn(resp) } - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "syncTo_" + targetFiler, - PathPrefix: sourcePath, - SinceNs: sourceFilerOffsetTsNs, - Signature: targetFilerSignature, - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { - return err - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_" + targetFiler, + sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + } const ( diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go index a859ad34b..b1ee96a42 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/filer_remote_storage.go @@ -3,10 +3,12 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" + "google.golang.org/grpc" "math" "strings" @@ -141,4 +143,41 @@ func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *fil } return -} \ No newline at end of file +} + + +func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return readErr + }); readErr != nil { + return nil, readErr + } + + mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) + if readErr != nil { + return nil, fmt.Errorf("unmarshal mappings: %v", readErr) + } + + return +} + +func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) + return readErr + }); readErr != nil { + return nil, readErr + } + + // unmarshal storage configuration + conf = &filer_pb.RemoteConf{} + if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { + readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) + return + } + + return +} diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index 747ac3cb9..c650b8024 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -2,12 +2,9 @@ package meta_cache import ( "context" - "fmt" - "io" - "time" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -62,38 +59,8 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } - for { - err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "mount", - PathPrefix: dir, - SinceNs: lastTsNs, - Signature: selfSignature, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } + return util.Retry("followMetaUpdates", func() error { + return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true) + }) - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - glog.Fatalf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs - } - }) - if err != nil { - glog.Errorf("subscribing filer meta change: %v", err) - } - time.Sleep(time.Second) - } } diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go new file mode 100644 index 000000000..31fb62fb3 --- /dev/null +++ b/weed/pb/filer_pb_tail.go @@ -0,0 +1,94 @@ +package pb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc" + "io" + "time" +) + +type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error + +func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, + clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, + processEventFn ProcessMetadataFunc, fatalOnError bool) error { + + err := WithFilerClient(filerAddress, grpcDialOption, makeFunc( + clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) + if err != nil { + return fmt.Errorf("subscribing filer meta change: %v", err) + } + return err +} + +func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, + clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, + processEventFn ProcessMetadataFunc, fatalOnError bool) error { + + err := filerClient.WithFilerClient(makeFunc( + clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) + if err != nil { + return fmt.Errorf("subscribing filer meta change: %v", err) + } + + return nil +} + +func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, + processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { + return func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: clientName, + PathPrefix: pathPrefix, + SinceNs: lastTsNs, + Signature: selfSignature, + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + if fatalOnError { + glog.Fatalf("process %v: %v", resp, err) + } else { + glog.Errorf("process %v: %v", resp, err) + } + } + lastTsNs = resp.TsNs + } + } +} + +func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc { + var counter int64 + var lastWriteTime time.Time + return func(resp *filer_pb.SubscribeMetadataResponse) error { + if err := processEventFn(resp); err != nil { + return err + } + counter++ + if lastWriteTime.Add(offsetInterval).Before(time.Now()) { + counter = 0 + lastWriteTime = time.Now() + if err := offsetFunc(counter, resp.TsNs); err != nil { + return err + } + } + return nil + } + +} diff --git a/weed/remote_storage/mount_mapping.go b/weed/remote_storage/mount_mapping.go deleted file mode 100644 index 767de5bed..000000000 --- a/weed/remote_storage/mount_mapping.go +++ /dev/null @@ -1,46 +0,0 @@ -package remote_storage - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/golang/protobuf/proto" - "google.golang.org/grpc" -) - -func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { - var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) - return readErr - }); readErr != nil { - return nil, readErr - } - - mappings, readErr = filer.UnmarshalRemoteStorageMappings(oldContent) - if readErr != nil { - return nil, fmt.Errorf("unmarshal mappings: %v", readErr) - } - - return -} - -func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) { - var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, storageName+filer.REMOTE_STORAGE_CONF_SUFFIX) - return readErr - }); readErr != nil { - return nil, readErr - } - - // unmarshal storage configuration - conf = &filer_pb.RemoteConf{} - if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { - readErr = fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, storageName+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) - return - } - - return -} diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index ea4b69550..05cce632a 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -1,13 +1,11 @@ package s3api import ( - "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "time" + "github.com/chrislusf/seaweedfs/weed/util" ) func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error { @@ -34,37 +32,8 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la return nil } - for { - err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: clientName, - PathPrefix: prefix, - SinceNs: lastTsNs, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } + return util.Retry("followIamChanges", func() error { + return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true) + }) - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - glog.Fatalf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs - } - }) - if err != nil { - glog.Errorf("subscribing filer meta change: %v", err) - } - time.Sleep(time.Second) - } } diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 35aad9498..73a5119d5 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -9,7 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" "io" ) @@ -79,7 +78,7 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (err error) { // read current mapping - mappings, readErr := remote_storage.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + mappings, readErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) if readErr != nil { return readErr } @@ -95,7 +94,7 @@ func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *Command func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { - return remote_storage.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) + return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) }