mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-08-16 17:12:46 +02:00
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
183 lines
4.8 KiB
Go
183 lines
4.8 KiB
Go
package topology
|
|
|
|
import (
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
)
|
|
|
|
type EcShardLocations struct {
|
|
Collection string
|
|
Locations [erasure_coding.TotalShardsCount][]*DataNode
|
|
}
|
|
|
|
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
|
|
// convert into in memory struct storage.VolumeInfo
|
|
var shards []*erasure_coding.EcVolumeInfo
|
|
for _, shardInfo := range shardInfos {
|
|
shards = append(shards,
|
|
erasure_coding.NewEcVolumeInfo(
|
|
shardInfo.DiskType,
|
|
shardInfo.Collection,
|
|
needle.VolumeId(shardInfo.Id),
|
|
erasure_coding.ShardBits(shardInfo.EcIndexBits),
|
|
shardInfo.ExpireAtSec,
|
|
shardInfo.DiskId))
|
|
}
|
|
// find out the delta volumes
|
|
newShards, deletedShards = dn.UpdateEcShards(shards)
|
|
for _, v := range newShards {
|
|
t.RegisterEcShards(v, dn)
|
|
}
|
|
for _, v := range deletedShards {
|
|
t.UnRegisterEcShards(v, dn)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
|
|
// convert into in memory struct storage.VolumeInfo
|
|
var newShards, deletedShards []*erasure_coding.EcVolumeInfo
|
|
for _, shardInfo := range newEcShards {
|
|
newShards = append(newShards,
|
|
erasure_coding.NewEcVolumeInfo(
|
|
shardInfo.DiskType,
|
|
shardInfo.Collection,
|
|
needle.VolumeId(shardInfo.Id),
|
|
erasure_coding.ShardBits(shardInfo.EcIndexBits),
|
|
shardInfo.ExpireAtSec,
|
|
shardInfo.DiskId))
|
|
}
|
|
for _, shardInfo := range deletedEcShards {
|
|
deletedShards = append(deletedShards,
|
|
erasure_coding.NewEcVolumeInfo(
|
|
shardInfo.DiskType,
|
|
shardInfo.Collection,
|
|
needle.VolumeId(shardInfo.Id),
|
|
erasure_coding.ShardBits(shardInfo.EcIndexBits),
|
|
shardInfo.ExpireAtSec,
|
|
shardInfo.DiskId))
|
|
}
|
|
|
|
dn.DeltaUpdateEcShards(newShards, deletedShards)
|
|
|
|
for _, v := range newShards {
|
|
t.RegisterEcShards(v, dn)
|
|
}
|
|
for _, v := range deletedShards {
|
|
t.UnRegisterEcShards(v, dn)
|
|
}
|
|
return
|
|
}
|
|
|
|
func NewEcShardLocations(collection string) *EcShardLocations {
|
|
return &EcShardLocations{
|
|
Collection: collection,
|
|
}
|
|
}
|
|
|
|
func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
|
|
dataNodes := loc.Locations[shardId]
|
|
for _, n := range dataNodes {
|
|
if n.Id() == dn.Id() {
|
|
return false
|
|
}
|
|
}
|
|
loc.Locations[shardId] = append(dataNodes, dn)
|
|
return true
|
|
}
|
|
|
|
func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
|
|
dataNodes := loc.Locations[shardId]
|
|
foundIndex := -1
|
|
for index, n := range dataNodes {
|
|
if n.Id() == dn.Id() {
|
|
foundIndex = index
|
|
}
|
|
}
|
|
if foundIndex < 0 {
|
|
return false
|
|
}
|
|
loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
|
|
return true
|
|
}
|
|
|
|
func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
|
|
|
|
t.ecShardMapLock.Lock()
|
|
defer t.ecShardMapLock.Unlock()
|
|
|
|
locations, found := t.ecShardMap[ecShardInfos.VolumeId]
|
|
if !found {
|
|
locations = NewEcShardLocations(ecShardInfos.Collection)
|
|
t.ecShardMap[ecShardInfos.VolumeId] = locations
|
|
}
|
|
for _, shardId := range ecShardInfos.ShardIds() {
|
|
locations.AddShard(shardId, dn)
|
|
}
|
|
}
|
|
|
|
func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
|
|
glog.Infof("removing ec shard info:%+v", ecShardInfos)
|
|
t.ecShardMapLock.Lock()
|
|
defer t.ecShardMapLock.Unlock()
|
|
|
|
locations, found := t.ecShardMap[ecShardInfos.VolumeId]
|
|
if !found {
|
|
return
|
|
}
|
|
for _, shardId := range ecShardInfos.ShardIds() {
|
|
locations.DeleteShard(shardId, dn)
|
|
}
|
|
}
|
|
|
|
func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
|
|
t.ecShardMapLock.RLock()
|
|
defer t.ecShardMapLock.RUnlock()
|
|
|
|
locations, found = t.ecShardMap[vid]
|
|
|
|
return
|
|
}
|
|
|
|
func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) {
|
|
t.ecShardMapLock.RLock()
|
|
defer t.ecShardMapLock.RUnlock()
|
|
|
|
dateNodeMap := make(map[pb.ServerAddress]bool)
|
|
for _, ecVolumeLocation := range t.ecShardMap {
|
|
if ecVolumeLocation.Collection == collection {
|
|
for _, locations := range ecVolumeLocation.Locations {
|
|
for _, loc := range locations {
|
|
dateNodeMap[loc.ServerAddress()] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for k, _ := range dateNodeMap {
|
|
dataNodes = append(dataNodes, k)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (t *Topology) DeleteEcCollection(collection string) {
|
|
t.ecShardMapLock.Lock()
|
|
defer t.ecShardMapLock.Unlock()
|
|
|
|
var vids []needle.VolumeId
|
|
for vid, ecVolumeLocation := range t.ecShardMap {
|
|
if ecVolumeLocation.Collection == collection {
|
|
vids = append(vids, vid)
|
|
}
|
|
}
|
|
|
|
for _, vid := range vids {
|
|
delete(t.ecShardMap, vid)
|
|
}
|
|
|
|
return
|
|
}
|