mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-10 05:12:47 +02:00
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
207 lines
6.8 KiB
Go
207 lines
6.8 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/config"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
)
|
|
|
|
// Config extends BaseConfig with erasure coding specific settings
|
|
type Config struct {
|
|
base.BaseConfig
|
|
QuietForSeconds int `json:"quiet_for_seconds"`
|
|
FullnessRatio float64 `json:"fullness_ratio"`
|
|
CollectionFilter string `json:"collection_filter"`
|
|
MinSizeMB int `json:"min_size_mb"`
|
|
}
|
|
|
|
// NewDefaultConfig creates a new default erasure coding configuration
|
|
func NewDefaultConfig() *Config {
|
|
return &Config{
|
|
BaseConfig: base.BaseConfig{
|
|
Enabled: true,
|
|
ScanIntervalSeconds: 60 * 60, // 1 hour
|
|
MaxConcurrent: 1,
|
|
},
|
|
QuietForSeconds: 300, // 5 minutes
|
|
FullnessRatio: 0.8, // 80%
|
|
CollectionFilter: "",
|
|
MinSizeMB: 30, // 30MB (more reasonable than 100MB)
|
|
}
|
|
}
|
|
|
|
// GetConfigSpec returns the configuration schema for erasure coding tasks
|
|
func GetConfigSpec() base.ConfigSpec {
|
|
return base.ConfigSpec{
|
|
Fields: []*config.Field{
|
|
{
|
|
Name: "enabled",
|
|
JSONName: "enabled",
|
|
Type: config.FieldTypeBool,
|
|
DefaultValue: true,
|
|
Required: false,
|
|
DisplayName: "Enable Erasure Coding Tasks",
|
|
Description: "Whether erasure coding tasks should be automatically created",
|
|
HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
|
|
InputType: "checkbox",
|
|
CSSClasses: "form-check-input",
|
|
},
|
|
{
|
|
Name: "scan_interval_seconds",
|
|
JSONName: "scan_interval_seconds",
|
|
Type: config.FieldTypeInterval,
|
|
DefaultValue: 60 * 60,
|
|
MinValue: 10 * 60,
|
|
MaxValue: 24 * 60 * 60,
|
|
Required: true,
|
|
DisplayName: "Scan Interval",
|
|
Description: "How often to scan for volumes needing erasure coding",
|
|
HelpText: "The system will check for volumes that need erasure coding at this interval",
|
|
Placeholder: "1",
|
|
Unit: config.UnitHours,
|
|
InputType: "interval",
|
|
CSSClasses: "form-control",
|
|
},
|
|
{
|
|
Name: "max_concurrent",
|
|
JSONName: "max_concurrent",
|
|
Type: config.FieldTypeInt,
|
|
DefaultValue: 1,
|
|
MinValue: 1,
|
|
MaxValue: 5,
|
|
Required: true,
|
|
DisplayName: "Max Concurrent Tasks",
|
|
Description: "Maximum number of erasure coding tasks that can run simultaneously",
|
|
HelpText: "Limits the number of erasure coding operations running at the same time",
|
|
Placeholder: "1 (default)",
|
|
Unit: config.UnitCount,
|
|
InputType: "number",
|
|
CSSClasses: "form-control",
|
|
},
|
|
{
|
|
Name: "quiet_for_seconds",
|
|
JSONName: "quiet_for_seconds",
|
|
Type: config.FieldTypeInterval,
|
|
DefaultValue: 300,
|
|
MinValue: 60,
|
|
MaxValue: 3600,
|
|
Required: true,
|
|
DisplayName: "Quiet Period",
|
|
Description: "Minimum time volume must be quiet before erasure coding",
|
|
HelpText: "Volume must not be modified for this duration before erasure coding",
|
|
Placeholder: "5",
|
|
Unit: config.UnitMinutes,
|
|
InputType: "interval",
|
|
CSSClasses: "form-control",
|
|
},
|
|
{
|
|
Name: "fullness_ratio",
|
|
JSONName: "fullness_ratio",
|
|
Type: config.FieldTypeFloat,
|
|
DefaultValue: 0.8,
|
|
MinValue: 0.1,
|
|
MaxValue: 1.0,
|
|
Required: true,
|
|
DisplayName: "Fullness Ratio",
|
|
Description: "Minimum fullness ratio to trigger erasure coding",
|
|
HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
|
|
Placeholder: "0.80 (80%)",
|
|
Unit: config.UnitNone,
|
|
InputType: "number",
|
|
CSSClasses: "form-control",
|
|
},
|
|
{
|
|
Name: "collection_filter",
|
|
JSONName: "collection_filter",
|
|
Type: config.FieldTypeString,
|
|
DefaultValue: "",
|
|
Required: false,
|
|
DisplayName: "Collection Filter",
|
|
Description: "Only process volumes from specific collections",
|
|
HelpText: "Leave empty to process all collections, or specify collection name",
|
|
Placeholder: "my_collection",
|
|
InputType: "text",
|
|
CSSClasses: "form-control",
|
|
},
|
|
{
|
|
Name: "min_size_mb",
|
|
JSONName: "min_size_mb",
|
|
Type: config.FieldTypeInt,
|
|
DefaultValue: 30,
|
|
MinValue: 1,
|
|
MaxValue: 1000,
|
|
Required: true,
|
|
DisplayName: "Minimum Size (MB)",
|
|
Description: "Minimum volume size to consider for erasure coding",
|
|
HelpText: "Only volumes larger than this size will be considered for erasure coding",
|
|
Placeholder: "30",
|
|
Unit: config.UnitNone,
|
|
InputType: "number",
|
|
CSSClasses: "form-control",
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
|
|
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
|
|
return &worker_pb.TaskPolicy{
|
|
Enabled: c.Enabled,
|
|
MaxConcurrent: int32(c.MaxConcurrent),
|
|
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
|
|
CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
|
|
TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
|
|
ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
|
|
FullnessRatio: float64(c.FullnessRatio),
|
|
QuietForSeconds: int32(c.QuietForSeconds),
|
|
MinVolumeSizeMb: int32(c.MinSizeMB),
|
|
CollectionFilter: c.CollectionFilter,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
|
|
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
|
|
if policy == nil {
|
|
return fmt.Errorf("policy is nil")
|
|
}
|
|
|
|
// Set general TaskPolicy fields
|
|
c.Enabled = policy.Enabled
|
|
c.MaxConcurrent = int(policy.MaxConcurrent)
|
|
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
|
|
|
|
// Set erasure coding-specific fields from the task config
|
|
if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
|
|
c.FullnessRatio = float64(ecConfig.FullnessRatio)
|
|
c.QuietForSeconds = int(ecConfig.QuietForSeconds)
|
|
c.MinSizeMB = int(ecConfig.MinVolumeSizeMb)
|
|
c.CollectionFilter = ecConfig.CollectionFilter
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadConfigFromPersistence loads configuration from the persistence layer if available
|
|
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
|
|
config := NewDefaultConfig()
|
|
|
|
// Try to load from persistence if available
|
|
if persistence, ok := configPersistence.(interface {
|
|
LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error)
|
|
}); ok {
|
|
if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil {
|
|
if err := config.FromTaskPolicy(policy); err == nil {
|
|
glog.V(1).Infof("Loaded erasure coding configuration from persistence")
|
|
return config
|
|
}
|
|
}
|
|
}
|
|
|
|
glog.V(1).Infof("Using default erasure coding configuration")
|
|
return config
|
|
}
|