1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/topology/topology_ec.go
Chris Lu 9d013ea9b8
Admin UI: include ec shard sizes into volume server info (#7071)
* show ec shards on dashboard, show max in its own column

* master collect shard size info

* master send shard size via VolumeList

* change to more efficient shard sizes slice

* include ec shard sizes into volume server info

* Eliminated Redundant gRPC Calls

* much more efficient

* Efficient Counting: bits.OnesCount32() uses CPU-optimized instructions to count set bits in O(1)

* avoid extra volume list call

* simplify

* preserve existing shard sizes

* avoid hard coded value

* Update weed/storage/erasure_coding/ec_volume_info.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/admin/dash/volume_management.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update ec_volume_info.go

* address comments

* avoid duplicated functions

* Update weed/admin/dash/volume_management.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* simplify

* refactoring

* fix compilation

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-08-02 02:16:49 -07:00

192 lines
5.4 KiB
Go

package topology
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
type EcShardLocations struct {
Collection string
Locations [erasure_coding.TotalShardsCount][]*DataNode
}
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// convert into in memory struct storage.VolumeInfo
var shards []*erasure_coding.EcVolumeInfo
for _, shardInfo := range shardInfos {
// Create EcVolumeInfo directly with optimized format
ecVolumeInfo := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(shardInfo.Id),
Collection: shardInfo.Collection,
ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits),
DiskType: shardInfo.DiskType,
DiskId: shardInfo.DiskId,
ExpireAtSec: shardInfo.ExpireAtSec,
ShardSizes: shardInfo.ShardSizes,
}
shards = append(shards, ecVolumeInfo)
}
// find out the delta volumes
newShards, deletedShards = dn.UpdateEcShards(shards)
for _, v := range newShards {
t.RegisterEcShards(v, dn)
}
for _, v := range deletedShards {
t.UnRegisterEcShards(v, dn)
}
return
}
func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
// convert into in memory struct storage.VolumeInfo
var newShards, deletedShards []*erasure_coding.EcVolumeInfo
for _, shardInfo := range newEcShards {
// Create EcVolumeInfo directly with optimized format
ecVolumeInfo := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(shardInfo.Id),
Collection: shardInfo.Collection,
ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits),
DiskType: shardInfo.DiskType,
DiskId: shardInfo.DiskId,
ExpireAtSec: shardInfo.ExpireAtSec,
ShardSizes: shardInfo.ShardSizes,
}
newShards = append(newShards, ecVolumeInfo)
}
for _, shardInfo := range deletedEcShards {
// Create EcVolumeInfo directly with optimized format
ecVolumeInfo := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(shardInfo.Id),
Collection: shardInfo.Collection,
ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits),
DiskType: shardInfo.DiskType,
DiskId: shardInfo.DiskId,
ExpireAtSec: shardInfo.ExpireAtSec,
ShardSizes: shardInfo.ShardSizes,
}
deletedShards = append(deletedShards, ecVolumeInfo)
}
dn.DeltaUpdateEcShards(newShards, deletedShards)
for _, v := range newShards {
t.RegisterEcShards(v, dn)
}
for _, v := range deletedShards {
t.UnRegisterEcShards(v, dn)
}
}
func NewEcShardLocations(collection string) *EcShardLocations {
return &EcShardLocations{
Collection: collection,
}
}
func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
dataNodes := loc.Locations[shardId]
for _, n := range dataNodes {
if n.Id() == dn.Id() {
return false
}
}
loc.Locations[shardId] = append(dataNodes, dn)
return true
}
func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
dataNodes := loc.Locations[shardId]
foundIndex := -1
for index, n := range dataNodes {
if n.Id() == dn.Id() {
foundIndex = index
}
}
if foundIndex < 0 {
return false
}
loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
return true
}
func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
locations, found := t.ecShardMap[ecShardInfos.VolumeId]
if !found {
locations = NewEcShardLocations(ecShardInfos.Collection)
t.ecShardMap[ecShardInfos.VolumeId] = locations
}
for _, shardId := range ecShardInfos.ShardIds() {
locations.AddShard(shardId, dn)
}
}
func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
glog.Infof("removing ec shard info:%+v", ecShardInfos)
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
locations, found := t.ecShardMap[ecShardInfos.VolumeId]
if !found {
return
}
for _, shardId := range ecShardInfos.ShardIds() {
locations.DeleteShard(shardId, dn)
}
}
func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
locations, found = t.ecShardMap[vid]
return
}
func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
dateNodeMap := make(map[pb.ServerAddress]bool)
for _, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
for _, locations := range ecVolumeLocation.Locations {
for _, loc := range locations {
dateNodeMap[loc.ServerAddress()] = true
}
}
}
}
for k, _ := range dateNodeMap {
dataNodes = append(dataNodes, k)
}
return
}
func (t *Topology) DeleteEcCollection(collection string) {
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
var vids []needle.VolumeId
for vid, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
vids = append(vids, vid)
}
}
for _, vid := range vids {
delete(t.ecShardMap, vid)
}
}