diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 2952a8071..e12449819 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -43,55 +43,54 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ for { heartbeat, err := stream.Recv() - if err == nil { - if dn == nil { - t.Sequence.SetMax(heartbeat.MaxFileKey) - if heartbeat.Ip == "" { - if pr, ok := peer.FromContext(stream.Context()); ok { - if pr.Addr != net.Addr(nil) { - heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")] - glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip) - } + if err != nil { + return err + } + + if dn == nil { + t.Sequence.SetMax(heartbeat.MaxFileKey) + if heartbeat.Ip == "" { + if pr, ok := peer.FromContext(stream.Context()); ok { + if pr.Addr != net.Addr(nil) { + heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")] + glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip) } } - dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - dc := t.GetOrCreateDataCenter(dcName) - rack := dc.GetOrCreateRack(rackName) - dn = rack.GetOrCreateDataNode(heartbeat.Ip, - int(heartbeat.Port), heartbeat.PublicUrl, - int(heartbeat.MaxVolumeCount)) - glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) - if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), - }); err != nil { - return err - } } + dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, + int(heartbeat.Port), heartbeat.PublicUrl, + int(heartbeat.MaxVolumeCount)) + glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) + if err := stream.Send(&master_pb.HeartbeatResponse{ + VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, + SecretKey: string(ms.guard.SecretKey), + }); err != nil { + return err + } + } - newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) - message := &master_pb.VolumeLocation{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, - } - for _, v := range newVolumes { - message.NewVids = append(message.NewVids, uint32(v.Id)) - } - for _, v := range deletedVolumes { - message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) - } + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range newVolumes { + message.NewVids = append(message.NewVids, uint32(v.Id)) + } + for _, v := range deletedVolumes { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } - if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for _, ch := range ms.clientChans { - ch <- message - } - ms.clientChansLock.RUnlock() + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message } - - } else { - return err + ms.clientChansLock.RUnlock() } // tell the volume servers about the leader