1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/worker/tasks/erasure_coding/config.go
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

207 lines
6.8 KiB
Go

package erasure_coding
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
)
// Config extends BaseConfig with erasure coding specific settings
type Config struct {
base.BaseConfig
QuietForSeconds int `json:"quiet_for_seconds"`
FullnessRatio float64 `json:"fullness_ratio"`
CollectionFilter string `json:"collection_filter"`
MinSizeMB int `json:"min_size_mb"`
}
// NewDefaultConfig creates a new default erasure coding configuration
func NewDefaultConfig() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 60 * 60, // 1 hour
MaxConcurrent: 1,
},
QuietForSeconds: 300, // 5 minutes
FullnessRatio: 0.8, // 80%
CollectionFilter: "",
MinSizeMB: 30, // 30MB (more reasonable than 100MB)
}
}
// GetConfigSpec returns the configuration schema for erasure coding tasks
func GetConfigSpec() base.ConfigSpec {
return base.ConfigSpec{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Erasure Coding Tasks",
Description: "Whether erasure coding tasks should be automatically created",
HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 60 * 60,
MinValue: 10 * 60,
MaxValue: 24 * 60 * 60,
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for volumes needing erasure coding",
HelpText: "The system will check for volumes that need erasure coding at this interval",
Placeholder: "1",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_concurrent",
JSONName: "max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 1,
MinValue: 1,
MaxValue: 5,
Required: true,
DisplayName: "Max Concurrent Tasks",
Description: "Maximum number of erasure coding tasks that can run simultaneously",
HelpText: "Limits the number of erasure coding operations running at the same time",
Placeholder: "1 (default)",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "quiet_for_seconds",
JSONName: "quiet_for_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 300,
MinValue: 60,
MaxValue: 3600,
Required: true,
DisplayName: "Quiet Period",
Description: "Minimum time volume must be quiet before erasure coding",
HelpText: "Volume must not be modified for this duration before erasure coding",
Placeholder: "5",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "fullness_ratio",
JSONName: "fullness_ratio",
Type: config.FieldTypeFloat,
DefaultValue: 0.8,
MinValue: 0.1,
MaxValue: 1.0,
Required: true,
DisplayName: "Fullness Ratio",
Description: "Minimum fullness ratio to trigger erasure coding",
HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
Placeholder: "0.80 (80%)",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "collection_filter",
JSONName: "collection_filter",
Type: config.FieldTypeString,
DefaultValue: "",
Required: false,
DisplayName: "Collection Filter",
Description: "Only process volumes from specific collections",
HelpText: "Leave empty to process all collections, or specify collection name",
Placeholder: "my_collection",
InputType: "text",
CSSClasses: "form-control",
},
{
Name: "min_size_mb",
JSONName: "min_size_mb",
Type: config.FieldTypeInt,
DefaultValue: 30,
MinValue: 1,
MaxValue: 1000,
Required: true,
DisplayName: "Minimum Size (MB)",
Description: "Minimum volume size to consider for erasure coding",
HelpText: "Only volumes larger than this size will be considered for erasure coding",
Placeholder: "30",
Unit: config.UnitNone,
InputType: "number",
CSSClasses: "form-control",
},
},
}
}
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
FullnessRatio: float64(c.FullnessRatio),
QuietForSeconds: int32(c.QuietForSeconds),
MinVolumeSizeMb: int32(c.MinSizeMB),
CollectionFilter: c.CollectionFilter,
},
},
}
}
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if policy == nil {
return fmt.Errorf("policy is nil")
}
// Set general TaskPolicy fields
c.Enabled = policy.Enabled
c.MaxConcurrent = int(policy.MaxConcurrent)
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
// Set erasure coding-specific fields from the task config
if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
c.FullnessRatio = float64(ecConfig.FullnessRatio)
c.QuietForSeconds = int(ecConfig.QuietForSeconds)
c.MinSizeMB = int(ecConfig.MinVolumeSizeMb)
c.CollectionFilter = ecConfig.CollectionFilter
}
return nil
}
// LoadConfigFromPersistence loads configuration from the persistence layer if available
func LoadConfigFromPersistence(configPersistence interface{}) *Config {
config := NewDefaultConfig()
// Try to load from persistence if available
if persistence, ok := configPersistence.(interface {
LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error)
}); ok {
if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil {
if err := config.FromTaskPolicy(policy); err == nil {
glog.V(1).Infof("Loaded erasure coding configuration from persistence")
return config
}
}
}
glog.V(1).Infof("Using default erasure coding configuration")
return config
}