diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 7fb3e0141..a3e16a996 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -66,6 +66,7 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro grpcConnection, err := util.GrpcDial(wfs.option.FilerGrpcAddress) if err != nil { + wfs.grpcClientsLock.Unlock() return fmt.Errorf("fail to dial %s: %v", wfs.option.FilerGrpcAddress, err) } diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 5e6c23709..6720fe3a3 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -9,6 +9,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/util" + "sync" + "google.golang.org/grpc" + ) + +var ( + grpcClients = make(map[string]*grpc.ClientConn) + grpcClientsLock sync.Mutex ) func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error { @@ -18,11 +25,23 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume return err } + grpcClientsLock.Lock() + + existingConnection, found := grpcClients[grpcAddress] + if found { + grpcClientsLock.Unlock() + client := volume_server_pb.NewVolumeServerClient(existingConnection) + return fn(client) + } + grpcConnection, err := util.GrpcDial(grpcAddress) if err != nil { + grpcClientsLock.Unlock() return fmt.Errorf("fail to dial %s: %v", grpcAddress, err) } - defer grpcConnection.Close() + + grpcClients[grpcAddress] = grpcConnection + grpcClientsLock.Unlock() client := volume_server_pb.NewVolumeServerClient(grpcConnection) @@ -41,11 +60,23 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error { + grpcClientsLock.Lock() + + existingConnection, found := grpcClients[masterServer] + if found { + grpcClientsLock.Unlock() + client := master_pb.NewSeaweedClient(existingConnection) + return fn(client) + } + grpcConnection, err := util.GrpcDial(masterServer) if err != nil { + grpcClientsLock.Unlock() return fmt.Errorf("fail to dial %s: %v", masterServer, err) } - defer grpcConnection.Close() + + grpcClients[masterServer] = grpcConnection + grpcClientsLock.Unlock() client := master_pb.NewSeaweedClient(grpcConnection)