diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 6c293fe95..f24cea619 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -6,7 +6,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/topology" "google.golang.org/grpc/peer" ) @@ -50,21 +49,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } - var volumeInfos []storage.VolumeInfo - for _, v := range heartbeat.Volumes { - if vi, err := storage.NewVolumeInfo(v); err == nil { - volumeInfos = append(volumeInfos, vi) - } else { - glog.V(0).Infof("Fail to convert joined volume information: %v", err) - } - } - deletedVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { - t.RegisterVolumeLayout(v, dn) - } - for _, v := range deletedVolumes { - t.UnRegisterVolumeLayout(v, dn) - } + t.SyncDataNodeRegistration(heartbeat.Volumes, dn) } else { return err diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 27d22ee7e..cee156dc1 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) type Topology struct { @@ -145,3 +146,21 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { t.LinkChildNode(dc) return dc } + +func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) { + var volumeInfos []storage.VolumeInfo + for _, v := range volumes { + if vi, err := storage.NewVolumeInfo(v); err == nil { + volumeInfos = append(volumeInfos, vi) + } else { + glog.V(0).Infof("Fail to convert joined volume information: %v", err) + } + } + deletedVolumes := dn.UpdateVolumes(volumeInfos) + for _, v := range volumeInfos { + t.RegisterVolumeLayout(v, dn) + } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } +} diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 06f506c37..36aa07157 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage" ) func TestRemoveDataCenter(t *testing.T) { @@ -44,20 +43,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { } volumeMessages = append(volumeMessages, volumeMessage) } - var volumeInfos []storage.VolumeInfo - for _, v := range volumeMessages { - if vi, err := storage.NewVolumeInfo(v); err == nil { - volumeInfos = append(volumeInfos, vi) - } - } - deletedVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { - topo.RegisterVolumeLayout(v, dn) - } - for _, v := range deletedVolumes { - topo.UnRegisterVolumeLayout(v, dn) - } + topo.SyncDataNodeRegistration(volumeMessages, dn) assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) assert(t, "volumeCount", topo.volumeCount, volumeCount) @@ -81,20 +68,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { } volumeMessages = append(volumeMessages, volumeMessage) } - var volumeInfos []storage.VolumeInfo - for _, v := range volumeMessages { - if vi, err := storage.NewVolumeInfo(v); err == nil { - volumeInfos = append(volumeInfos, vi) - } - } - - deletedVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { - topo.RegisterVolumeLayout(v, dn) - } - for _, v := range deletedVolumes { - topo.UnRegisterVolumeLayout(v, dn) - } + topo.SyncDataNodeRegistration(volumeMessages, dn) assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) assert(t, "volumeCount", topo.volumeCount, volumeCount)