diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 34962c83c..bd28a15c8 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -87,7 +87,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } // update master internal volume layouts t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) - } else if len(heartbeat.Volumes) > 0 { + } + + if len(heartbeat.Volumes) > 0 { // process heartbeat.Volumes newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) @@ -99,7 +101,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url()) message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) } - } else if len(heartbeat.EcShards) > 0 { + } + + if len(heartbeat.EcShards) > 0 { glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 2378ff4e9..9cebb7d40 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -12,6 +12,13 @@ type EcVolumeInfo struct { shardIds uint16 // use bits to indicate the shard id } +func NewEcVolumeInfo(collection string, vid needle.VolumeId) *EcVolumeInfo { + return &EcVolumeInfo{ + Collection: collection, + VolumeId: vid, + } +} + func (ecInfo *EcVolumeInfo) AddShardId(id ShardId) { ecInfo.shardIds |= (1 << id) } diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index ff160e178..dba323518 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -21,7 +21,7 @@ type DataNode struct { Port int PublicUrl string LastSeen int64 // unix time in seconds - ecShards map[needle.VolumeId]erasure_coding.EcVolumeInfo + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo ecShardsLock sync.RWMutex } @@ -30,7 +30,7 @@ func NewDataNode(id string) *DataNode { s.id = NodeId(id) s.nodeType = "DataNode" s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) - s.ecShards = make(map[needle.VolumeId]erasure_coding.EcVolumeInfo) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) s.NodeImpl.value = s return s } diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index d1715f557..5206a3b51 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -4,7 +4,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" ) -func (dn *DataNode) GetEcShards() (ret []erasure_coding.EcVolumeInfo) { +func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { dn.RLock() for _, ecVolumeInfo := range dn.ecShards { ret = append(ret, ecVolumeInfo) @@ -12,3 +12,9 @@ func (dn *DataNode) GetEcShards() (ret []erasure_coding.EcVolumeInfo) { dn.RUnlock() return ret } + +func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + dn.ecShardsLock.Unlock() + return +} diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go new file mode 100644 index 000000000..5592b9b64 --- /dev/null +++ b/weed/topology/topology_ec.go @@ -0,0 +1,43 @@ +package topology + +import ( + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + + +func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + // convert into in memory struct storage.VolumeInfo + var shards []*erasure_coding.EcVolumeInfo + sort.Slice(shardInfos, func(i, j int) bool { + return shardInfos[i].Id < shardInfos[j].Id + }) + var prevVolumeId uint32 + var ecVolumeInfo *erasure_coding.EcVolumeInfo + for _, shardInfo := range shardInfos { + if shardInfo.Id != prevVolumeId { + ecVolumeInfo = erasure_coding.NewEcVolumeInfo(shardInfo.Collection, needle.VolumeId(shardInfo.Id)) + shards = append(shards, ecVolumeInfo) + } + ecVolumeInfo.AddShardId(erasure_coding.ShardId(shardInfo.EcIndex)) + } + // find out the delta volumes + newShards, deletedShards = dn.UpdateEcShards(shards) + for _, v := range newShards { + t.RegisterEcShards(v, dn) + } + for _, v := range deletedShards { + t.UnRegisterEcShards(v, dn) + } + return +} + +func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { +} +func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { + glog.Infof("removing ec shard info:%+v", ecShardInfos) +}