mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-09-18 23:10:35 +02:00
incrementally update master ec shards state
This commit is contained in:
parent
db94a41f9e
commit
af67d99ca4
|
@ -103,6 +103,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
|
||||||
|
// TODO send out the delta
|
||||||
|
|
||||||
|
// update master internal volume layouts
|
||||||
|
t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
|
||||||
|
}
|
||||||
|
|
||||||
if len(heartbeat.EcShards) > 0 {
|
if len(heartbeat.EcShards) > 0 {
|
||||||
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
|
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
|
||||||
newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
|
newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
|
||||||
|
|
|
@ -91,3 +91,7 @@ func (b ShardBits) ShardIdCount() (count int) {
|
||||||
func (b ShardBits) Minus(other ShardBits) (ShardBits) {
|
func (b ShardBits) Minus(other ShardBits) (ShardBits) {
|
||||||
return b &^ other
|
return b &^ other
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b ShardBits) Plus(other ShardBits) (ShardBits) {
|
||||||
|
return b | other
|
||||||
|
}
|
||||||
|
|
|
@ -55,3 +55,40 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
|
||||||
|
|
||||||
|
for _, newShard := range newShards {
|
||||||
|
dn.AddOrUpdateEcShard(newShard)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, deletedShard := range deletedShards {
|
||||||
|
dn.DeleteEcShard(deletedShard)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
|
||||||
|
dn.ecShardsLock.Lock()
|
||||||
|
defer dn.ecShardsLock.Unlock()
|
||||||
|
|
||||||
|
if existing, ok := dn.ecShards[s.VolumeId]; !ok {
|
||||||
|
dn.ecShards[s.VolumeId] = s
|
||||||
|
} else {
|
||||||
|
existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
|
||||||
|
dn.ecShardsLock.Lock()
|
||||||
|
defer dn.ecShardsLock.Unlock()
|
||||||
|
|
||||||
|
if existing, ok := dn.ecShards[s.VolumeId]; ok {
|
||||||
|
existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
|
||||||
|
if existing.ShardBits.ShardIdCount() == 0 {
|
||||||
|
delete(dn.ecShards, s.VolumeId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -33,6 +33,35 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
|
||||||
|
// convert into in memory struct storage.VolumeInfo
|
||||||
|
var newShards, deletedShards []*erasure_coding.EcVolumeInfo
|
||||||
|
for _, shardInfo := range newEcShards {
|
||||||
|
newShards = append(newShards,
|
||||||
|
erasure_coding.NewEcVolumeInfo(
|
||||||
|
shardInfo.Collection,
|
||||||
|
needle.VolumeId(shardInfo.Id),
|
||||||
|
erasure_coding.ShardBits(shardInfo.EcIndexBits)))
|
||||||
|
}
|
||||||
|
for _, shardInfo := range deletedEcShards {
|
||||||
|
deletedShards = append(deletedShards,
|
||||||
|
erasure_coding.NewEcVolumeInfo(
|
||||||
|
shardInfo.Collection,
|
||||||
|
needle.VolumeId(shardInfo.Id),
|
||||||
|
erasure_coding.ShardBits(shardInfo.EcIndexBits)))
|
||||||
|
}
|
||||||
|
|
||||||
|
dn.DeltaUpdateEcShards(newShards, deletedShards)
|
||||||
|
|
||||||
|
for _, v := range newShards {
|
||||||
|
t.RegisterEcShards(v, dn)
|
||||||
|
}
|
||||||
|
for _, v := range deletedShards {
|
||||||
|
t.UnRegisterEcShards(v, dn)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func NewEcShardLocations(collection string) *EcShardLocations {
|
func NewEcShardLocations(collection string) *EcShardLocations {
|
||||||
return &EcShardLocations{
|
return &EcShardLocations{
|
||||||
Collection: collection,
|
Collection: collection,
|
||||||
|
|
Loading…
Reference in a new issue