diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index c521ce33e..b89e8de51 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -72,8 +72,7 @@ func startGenerateMetadata() { func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) { - tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail", - *dir, 0, 0, eachEntryFunc, false) + tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail", *dir, nil, 0, 0, eachEntryFunc, false) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 0c450181b..f0a4bd79b 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -112,7 +112,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), - sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 6fe323fba..ebc39beb1 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -195,8 +195,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return metaBackup.setOffset(lastTime) }) - return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", - *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) + return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 28c0db99b..3b30e2b6a 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -103,17 +103,15 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", - *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0, - func(resp *filer_pb.SubscribeMetadataResponse) error { - if !shouldPrint(resp) { - return nil - } - if err := eachEntryFunc(resp); err != nil { - return err - } + tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0, func(resp *filer_pb.SubscribeMetadataResponse) error { + if !shouldPrint(resp) { return nil - }, false) + } + if err := eachEntryFunc(resp); err != nil { + return err + } + return nil + }, false) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 8d2719660..fd6ed23b9 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -210,8 +210,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) }) - return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, - "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", mountedDir, nil, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 5440811dd..4ebb87f71 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -170,8 +170,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, - sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) } diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 31fb62fb3..63900081c 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -12,12 +12,12 @@ import ( type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error -func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, - clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, +func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, clientName string, + pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) error { - err := WithFilerClient(filerAddress, grpcDialOption, makeFunc( - clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) + err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName, + pathPrefix, additionalPathPrefixes, lastTsNs, selfSignature, processEventFn, fatalOnError)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } @@ -28,8 +28,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) error { - err := filerClient.WithFilerClient(makeFunc( - clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) + err := filerClient.WithFilerClient(makeFunc(clientName, pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } @@ -37,16 +36,16 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, return nil } -func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, - processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { +func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { return func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: clientName, - PathPrefix: pathPrefix, - SinceNs: lastTsNs, - Signature: selfSignature, + ClientName: clientName, + PathPrefix: pathPrefix, + PathPrefixes: additionalPathPrefixes, + SinceNs: lastTsNs, + Signature: selfSignature, }) if err != nil { return fmt.Errorf("subscribe: %v", err)