From 81fdf3651b1f60642fc15bd2b55ed0bd31afac15 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 20 Jan 2023 01:48:12 -0800 Subject: [PATCH] grpc connection to filer add sw-client-id header --- .../load_test_meta_tail.go | 3 +- weed/command/filer_cat.go | 2 +- weed/command/filer_copy.go | 12 ++++---- weed/command/filer_meta_backup.go | 2 +- weed/command/filer_remote_gateway.go | 2 +- weed/command/filer_remote_sync.go | 2 +- weed/command/filer_sync.go | 4 +-- weed/command/iam.go | 2 +- weed/command/s3.go | 2 +- weed/command/webdav.go | 2 +- weed/filer/filer_conf.go | 2 +- weed/filer/meta_aggregator.go | 4 +-- weed/filer/remote_mapping.go | 2 +- weed/filer/remote_storage.go | 2 +- weed/iamapi/iamapi_server.go | 8 ++--- weed/mount/wfs_filer_client.go | 2 +- weed/mq/broker/broker_segment_serde.go | 4 +-- weed/mq/broker/broker_server.go | 2 +- weed/operation/grpc_client.go | 4 +-- weed/pb/filer_pb_tail.go | 2 +- weed/pb/grpc_client_server.go | 30 +++++++++++-------- weed/remote_storage/track_sync_offset.go | 4 +-- weed/replication/replicator.go | 2 +- .../replication/sink/filersink/fetch_write.go | 2 +- weed/replication/sink/filersink/filer_sink.go | 2 ++ weed/replication/source/filer_source.go | 4 ++- weed/s3api/auth_credentials.go | 2 +- weed/s3api/s3api_circuit_breaker.go | 2 +- weed/s3api/s3api_handlers.go | 2 +- weed/server/filer_grpc_server_admin.go | 2 +- weed/server/master_grpc_server_admin.go | 2 +- weed/server/volume_grpc_admin.go | 2 +- weed/server/webdav_server.go | 2 +- weed/shell/command_cluster_check.go | 6 ++-- weed/shell/command_cluster_ps.go | 4 +-- weed/shell/commands.go | 2 +- 36 files changed, 74 insertions(+), 61 deletions(-) diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 06e47c113..762af2088 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "strconv" @@ -52,7 +53,7 @@ func main() { } func startGenerateMetadata() { - pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, util.RandomInt32(), pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), func(client filer_pb.SeaweedFilerClient) error { for i := 0; i < *n; i++ { name := fmt.Sprintf("file%d", i) diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index c310b2b43..2ef3bfc33 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -96,7 +96,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, util.RandomInt32(), filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 0c4626317..4cef053fc 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -159,6 +159,7 @@ func runCopy(cmd *Command, args []string) bool { worker := FileCopyWorker{ options: ©, filerAddress: filerAddress, + signature: util.RandomInt32(), } if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) @@ -172,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, 0, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -225,6 +226,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi type FileCopyWorker struct { options *CopyOptions filerAddress pb.ServerAddress + signature int32 } func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { @@ -302,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -368,7 +370,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano())) } - if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -479,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -569,7 +571,7 @@ var _ = filer_pb.FilerClient(&FileCopyWorker{}) func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { filerGrpcAddress := worker.filerAddress.ToGrpcAddress() - err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = pb.WithGrpcClient(streamingMode, worker.signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress, false, worker.options.grpcDialOption) diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index e0f23ee27..f2cba9382 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -225,7 +225,7 @@ var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, metaBackup.clientId, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 6446d28d0..f7b1f3146 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -35,7 +35,7 @@ type RemoteGatewayOptions struct { var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index d22fd57f8..261e024a6 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -26,7 +26,7 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 992b9dd4e..efef6250e 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -304,7 +304,7 @@ func getSignaturePrefixByPath(path string) string { func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -330,7 +330,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) diff --git a/weed/command/iam.go b/weed/command/iam.go index 43234aa70..95964994f 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -50,7 +50,7 @@ func (iamopt *IamOptions) startIamServer() bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/s3.go b/weed/command/s3.go index 369340151..39d1c6fce 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -163,7 +163,7 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 987fc388e..67e6ce69c 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -87,7 +87,7 @@ func (wo *WebDavOption) startWebDav() bool { var cipher bool // connect to filer for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index ccb1acb3c..b12d54fc8 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -32,7 +32,7 @@ type FilerConf struct { func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) { var buf bytes.Buffer - if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, 0, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if masterClient != nil { return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf) } else { diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 8cd7d5bf9..50cd75994 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -192,7 +192,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, } glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId) - err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(true, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1) @@ -228,7 +228,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, } func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) { - err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go index 9fb73ef59..8a7bea3d6 100644 --- a/weed/filer/remote_mapping.go +++ b/weed/filer/remote_mapping.go @@ -11,7 +11,7 @@ import ( func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return readErr }); readErr != nil { diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go index d8acf2572..88e7e9614 100644 --- a/weed/filer/remote_storage.go +++ b/weed/filer/remote_storage.go @@ -133,7 +133,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) return readErr }); readErr != nil { diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index 8bc4c1bf3..223bcb296 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -77,7 +77,7 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) { func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { return err } @@ -99,7 +99,7 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat if err := filer.ProtoToText(&buf, s3cfg); err != nil { return fmt.Errorf("ProtoToText: %s", err) } - return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err = util.Retry("saveIamIdentity", func() error { return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) }) @@ -112,7 +112,7 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil { return err } @@ -136,7 +136,7 @@ func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { if b, err = json.Marshal(policies); err != nil { return err } - return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil { return err } diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go index e991d8b39..5dd09363f 100644 --- a/weed/mount/wfs_filer_client.go +++ b/weed/mount/wfs_filer_client.go @@ -22,7 +22,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile for x := 0; x < n; x++ { filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress() - err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = pb.WithGrpcClient(streamingMode, wfs.signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress, false, wfs.option.GrpcDialOption) diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go index a8b1d05bc..e36867da0 100644 --- a/weed/mq/broker/broker_segment_serde.go +++ b/weed/mq/broker/broker_segment_serde.go @@ -51,7 +51,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info return } - err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { // read filer conf first data, err := filer.ReadInsideFiler(client, dir, name) if err != nil { @@ -76,7 +76,7 @@ func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info * var buf bytes.Buffer filer.ProtoToText(&buf, info) - err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { // read filer conf first err := filer.SaveInsideFiler(client, dir, name, buf.Bytes()) if err != nil { diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 4c86d813f..7ec7fb431 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -83,7 +83,7 @@ func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress { func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, broker.GetFiler(), broker.grpcDialOption, fn) + return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn) } diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index c1f2bba82..ecd8117ee 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -10,7 +10,7 @@ import ( func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, volumeServer.ToGrpcAddress(), false, grpcDialOption) @@ -19,7 +19,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, g func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterServer.ToGrpcAddress(), false, grpcDialOption) diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index b8a74f1d5..32fa4d497 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -25,7 +25,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { - err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) + err := WithFilerClient(true, clientId, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) } diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index f3cca7fba..5f685912e 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -3,6 +3,7 @@ package pb import ( "context" "fmt" + "google.golang.org/grpc/metadata" "math/rand" "net/http" "strconv" @@ -118,7 +119,7 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO } // WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. -func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { +func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { if !streamingMode { vgc, err := getOrCreateConnection(address, waitForReady, opts...) @@ -141,7 +142,12 @@ func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address } return executionErr } else { - grpcConnection, err := GrpcDial(context.Background(), address, waitForReady, opts...) + ctx := context.Background() + if signature != 0 { + md := metadata.New(map[string]string{"sw-client-id": fmt.Sprintf("%d", signature)}) + ctx = metadata.NewOutgoingContext(ctx, md) + } + grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...) if err != nil { return fmt.Errorf("fail to dial %s: %v", address, err) } @@ -204,7 +210,7 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { } func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, master.ToGrpcAddress(), waitForReady, grpcDialOption) @@ -212,7 +218,7 @@ func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption g } func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpcDialOption grpc.DialOption, fn func(client volume_server_pb.VolumeServerClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, volumeServer.ToGrpcAddress(), false, grpcDialOption) @@ -220,7 +226,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc } func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, broker.ToGrpcAddress(), false, grpcDialOption) @@ -230,7 +236,7 @@ func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption g func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { - err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption) @@ -244,22 +250,22 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, false, grpcDialOption) } -func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithFilerClient(streamingMode bool, signature int32, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn) + return WithGrpcFilerClient(streamingMode, signature, filer, grpcDialOption, fn) } -func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, signature int32, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption) @@ -269,7 +275,7 @@ func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grp func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { for _, filerAddress := range filerAddresses { - err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerAddress.ToGrpcAddress(), false, grpcDialOption) diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go index cdf6c99bf..38cb7bd24 100644 --- a/weed/remote_storage/track_sync_offset.go +++ b/weed/remote_storage/track_sync_offset.go @@ -17,7 +17,7 @@ func GetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s dirHash := uint32(util.HashStringToLong(dir)) - readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(SyncKeyPrefix + "____") util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) @@ -46,7 +46,7 @@ func SetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s dirHash := uint32(util.HashStringToLong(dir)) - return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(SyncKeyPrefix + "____") util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index cd2b416f9..57aa63e5f 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -92,7 +92,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p } func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) { - if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil { return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err) } else { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d39589047..63e1226b6 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -135,7 +135,7 @@ var _ = filer_pb.FilerClient(&FilerSink{}) func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, false, fs.grpcDialOption) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 35b6ffa73..ce2de41b9 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -33,6 +33,7 @@ type FilerSink struct { writeChunkByFiler bool isIncremental bool executor *util.LimitedConcurrentExecutor + signature int32 } func init() { @@ -54,6 +55,7 @@ func (fs *FilerSink) IsIncremental() bool { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { fs.isIncremental = configuration.GetBool(prefix + "is_incremental") fs.dataCenter = configuration.GetString(prefix + "dataCenter") + fs.signature = util.RandomInt32() return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"), diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 2da883ba6..167907a5a 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -28,10 +28,12 @@ type FilerSource struct { address string proxyByFiler bool dataCenter string + signature int32 } func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { fs.dataCenter = configuration.GetString(prefix + "dataCenter") + fs.signature = util.RandomInt32() return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"), @@ -128,7 +130,7 @@ var _ = filer_pb.FilerClient(&FilerSource{}) func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, false, fs.grpcDialOption) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 82ac3688c..876acd7cf 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -92,7 +92,7 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) { var content []byte - err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err = filer.ReadInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile) return err }) diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 870c65d2b..0cc0b08d2 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -28,7 +28,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { limitations: make(map[string]int64), } - err := pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) if err != nil { return fmt.Errorf("read S3 circuit breaker config: %v", err) diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index b85ff485d..81d7017dc 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -16,7 +16,7 @@ var _ = filer_pb.FilerClient(&S3ApiServer{}) func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 32cb2830d..58215a927 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -49,7 +49,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res StartTimeNs: time.Now().UnixNano(), } if req.TargetType == cluster.FilerType { - pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 77d11df8a..7a28e0ed6 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -161,7 +161,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r StartTimeNs: time.Now().UnixNano(), } if req.TargetType == cluster.FilerType { - pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 24a9650e7..7ba9f72c9 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -317,7 +317,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ StartTimeNs: time.Now().UnixNano(), } if req.TargetType == cluster.FilerType { - pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 2991a39f1..189378dcd 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -133,7 +133,7 @@ var _ = filer_pb.FilerClient(&WebDavFileSystem{}) func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption) diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index 2cabf91b8..3fb72940f 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -161,7 +161,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, filer := range filers { for _, master := range masters { fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master)) - err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(false, 0, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ Target: string(master), TargetType: cluster.MasterType, @@ -181,7 +181,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, filer := range filers { for _, volumeServer := range volumeServers { fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer)) - err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(false, 0, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ Target: string(volumeServer), TargetType: cluster.VolumeServerType, @@ -224,7 +224,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, sourceFiler := range filers { for _, targetFiler := range filers { fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler)) - err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(false, 0, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ Target: string(targetFiler), TargetType: cluster.FilerType, diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index fc6725fad..b2c0a2237 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -105,7 +105,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W if node.Rack != "" { fmt.Fprintf(writer, " Rack: %v\n", node.Rack) } - pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, 0, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err == nil { if resp.FilerGroup != "" { @@ -120,7 +120,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W }) } for _, node := range filerNodes { - pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, 0, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address) selfSignature := filerSignatures[node] for peer, peerSignature := range filerSignatures { diff --git a/weed/shell/commands.go b/weed/shell/commands.go index af6888458..2c2032963 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -111,7 +111,7 @@ var _ = filer_pb.FilerClient(&CommandEnv{}) func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn) + return pb.WithGrpcFilerClient(streamingMode, 0, ce.option.FilerAddress, ce.option.GrpcDialOption, fn) }