From af67d99ca4aae35b3732654dda52aaa348a75fd9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 May 2019 00:21:17 -0700 Subject: [PATCH] incrementally update master ec shards state --- weed/server/master_grpc_server.go | 7 ++++ weed/storage/erasure_coding/ec_volume_info.go | 4 ++ weed/topology/data_node_ec.go | 37 +++++++++++++++++++ weed/topology/topology_ec.go | 29 +++++++++++++++ 4 files changed, 77 insertions(+) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index bc6463251..69bd56df0 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -103,6 +103,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } + if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { + // TODO send out the delta + + // update master internal volume layouts + t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) + } + if len(heartbeat.EcShards) > 0 { glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index ef8cc4ed4..48f4713d9 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -91,3 +91,7 @@ func (b ShardBits) ShardIdCount() (count int) { func (b ShardBits) Minus(other ShardBits) (ShardBits) { return b &^ other } + +func (b ShardBits) Plus(other ShardBits) (ShardBits) { + return b | other +} diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 63c8f2127..95635331b 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -55,3 +55,40 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) return } + +func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) { + + for _, newShard := range newShards { + dn.AddOrUpdateEcShard(newShard) + } + + for _, deletedShard := range deletedShards { + dn.DeleteEcShard(deletedShard) + } + +} + +func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + defer dn.ecShardsLock.Unlock() + + if existing, ok := dn.ecShards[s.VolumeId]; !ok { + dn.ecShards[s.VolumeId] = s + } else { + existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) + } + +} + +func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { + dn.ecShardsLock.Lock() + defer dn.ecShardsLock.Unlock() + + if existing, ok := dn.ecShards[s.VolumeId]; ok { + existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) + if existing.ShardBits.ShardIdCount() == 0 { + delete(dn.ecShards, s.VolumeId) + } + } + +} diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 050a0b901..61de86753 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -33,6 +33,35 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf return } +func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) { + // convert into in memory struct storage.VolumeInfo + var newShards, deletedShards []*erasure_coding.EcVolumeInfo + for _, shardInfo := range newEcShards { + newShards = append(newShards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + for _, shardInfo := range deletedEcShards { + deletedShards = append(deletedShards, + erasure_coding.NewEcVolumeInfo( + shardInfo.Collection, + needle.VolumeId(shardInfo.Id), + erasure_coding.ShardBits(shardInfo.EcIndexBits))) + } + + dn.DeltaUpdateEcShards(newShards, deletedShards) + + for _, v := range newShards { + t.RegisterEcShards(v, dn) + } + for _, v := range deletedShards { + t.UnRegisterEcShards(v, dn) + } + return +} + func NewEcShardLocations(collection string) *EcShardLocations { return &EcShardLocations{ Collection: collection,