From 77fc8c59140537bb693ccf44c63e68626322b70e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 3 Jul 2018 19:07:55 -0700 Subject: [PATCH] keep alive for gRpc calls --- weed/command/filer.go | 3 +-- weed/command/filer_copy.go | 3 +-- weed/command/master.go | 3 +-- weed/command/server.go | 3 +-- weed/filer2/filer_master.go | 4 ++-- weed/filesys/wfs.go | 4 ++-- weed/server/volume_grpc_client.go | 4 ++-- weed/util/grpc_client_server.go | 28 ++++++++++++++++++++++++++++ 8 files changed, 38 insertions(+), 14 deletions(-) create mode 100644 weed/util/grpc_client_server.go diff --git a/weed/command/filer.go b/weed/command/filer.go index 2d4696828..e5a3e379a 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -10,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" "google.golang.org/grpc/reflection" "strings" ) @@ -129,7 +128,7 @@ func (fo *FilerOptions) start() { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := grpc.NewServer() + grpcS := util.NewGrpcServer() filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 3d4e5db9f..9937bc9d6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -16,7 +16,6 @@ import ( "strconv" "io" "time" - "google.golang.org/grpc" "context" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -340,7 +339,7 @@ func detectMimeType(f *os.File) string { func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure()) + grpcConnection, err := util.GrpcDial(filerAddress) if err != nil { return fmt.Errorf("fail to dial %s: %v", filerAddress, err) } diff --git a/weed/command/master.go b/weed/command/master.go index 8abec2a3d..c1b9cf5ae 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -14,7 +14,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" "github.com/soheilhy/cmux" - "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -103,7 +102,7 @@ func runMaster(cmd *Command, args []string) bool { httpL := m.Match(cmux.Any()) // Create your protocol servers. - grpcS := grpc.NewServer() + grpcS := util.NewGrpcServer() master_pb.RegisterSeaweedServer(grpcS, ms) reflection.Register(grpcS) diff --git a/weed/command/server.go b/weed/command/server.go index 606845199..485dea7ac 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -17,7 +17,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" "github.com/soheilhy/cmux" - "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -208,7 +207,7 @@ func runServer(cmd *Command, args []string) bool { httpL := m.Match(cmux.Any()) // Create your protocol servers. - grpcS := grpc.NewServer() + grpcS := util.NewGrpcServer() master_pb.RegisterSeaweedServer(grpcS, ms) reflection.Register(grpcS) diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go index f69f68a85..51b12c237 100644 --- a/weed/filer2/filer_master.go +++ b/weed/filer2/filer_master.go @@ -7,7 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/glog" - "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util" ) func (fs *Filer) GetMaster() string { @@ -48,7 +48,7 @@ func (fs *Filer) KeepConnectedToMaster() { func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { - grpcConnection, err := grpc.Dial(master, grpc.WithInsecure()) + grpcConnection, err := util.GrpcDial(master) if err != nil { return fmt.Errorf("fail to dial %s: %v", master, err) } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index a126bf3ea..d7e133483 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/karlseguin/ccache" - "google.golang.org/grpc" "sync" "bazil.org/fuse" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) type WFS struct { @@ -43,7 +43,7 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := grpc.Dial(wfs.filerGrpcAddress, grpc.WithInsecure()) + grpcConnection, err := util.GrpcDial(wfs.filerGrpcAddress) if err != nil { return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err) } diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go index 7688745e2..b3c755239 100644 --- a/weed/server/volume_grpc_client.go +++ b/weed/server/volume_grpc_client.go @@ -8,7 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "golang.org/x/net/context" - "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) GetMaster() string { @@ -38,7 +38,7 @@ func (vs *VolumeServer) heartbeat() { func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure()) + grpcConection, err := util.GrpcDial(masterNode) if err != nil { return "", fmt.Errorf("fail to dial: %v", err) } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go new file mode 100644 index 000000000..8dbb4c0cd --- /dev/null +++ b/weed/util/grpc_client_server.go @@ -0,0 +1,28 @@ +package util + +import ( + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +func NewGrpcServer() *grpc.Server { + return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: 10 * time.Second, // wait time before ping if no activity + Timeout: 20 * time.Second, // ping timeout + }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 60 * time.Second, // min time a client should wait before sending a ping + })) +} + +func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + + opts = append(opts, grpc.WithInsecure()) + opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 30 * time.Second, // client ping server if no activity for this long + Timeout: 20 * time.Second, + })) + + return grpc.Dial(address, opts...) +}