From 8a96445f402a240956adf081de3a675e2e41c767 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 23 May 2019 22:51:18 -0700 Subject: [PATCH] register ec shards to each data node --- weed/storage/erasure_coding/ec_volume_info.go | 10 +++++ weed/topology/data_node_ec.go | 41 ++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 9cebb7d40..645686a92 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -40,6 +40,16 @@ func (ecInfo *EcVolumeInfo) ShardIds() (ret []ShardId) { return } +func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) (*EcVolumeInfo) { + ret := &EcVolumeInfo{ + VolumeId: ecInfo.VolumeId, + Collection: ecInfo.Collection, + shardIds: ecInfo.shardIds &^ other.shardIds, + } + + return ret +} + func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret []*master_pb.VolumeEcShardInformationMessage) { for _, shard := range ecInfo.ShardIds() { ret = append(ret, &master_pb.VolumeEcShardInformationMessage{ diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 5206a3b51..ef395dbdb 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -2,6 +2,7 @@ package topology import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { @@ -14,7 +15,43 @@ func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) { } func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { - dn.ecShardsLock.Lock() - dn.ecShardsLock.Unlock() + // prepare the new ec shard map + actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) + for _, ecShards := range actualShards { + actualEcShardMap[ecShards.VolumeId]= ecShards + } + + // found out the newShards and deletedShards + dn.ecShardsLock.RLock() + for vid, ecShards := range dn.ecShards{ + if actualEcShards, ok := actualEcShardMap[vid]; !ok { + // dn registered ec shards not found in the new set of ec shards + deletedShards = append(deletedShards, ecShards) + } else { + // found, but maybe the actual shard could be missing + a := actualEcShards.Minus(ecShards) + if len(a.ShardIds())>0 { + newShards = append(newShards, a) + } + d := ecShards.Minus(actualEcShards) + if len(d.ShardIds())>0 { + deletedShards = append(deletedShards, d) + } + } + } + for _, ecShards := range actualShards { + if _, found := dn.ecShards[ecShards.VolumeId]; !found { + newShards = append(newShards, ecShards) + } + } + dn.ecShardsLock.RUnlock() + + if len(newShards)>0 || len(deletedShards)>0{ + // if changed, set to the new ec shard map + dn.ecShardsLock.Lock() + dn.ecShards = actualEcShardMap + dn.ecShardsLock.Unlock() + } + return }