mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-08-17 01:22:47 +02:00
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
389 lines
11 KiB
Go
389 lines
11 KiB
Go
package handlers
|
|
|
|
import (
|
|
"net/url"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/config"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
|
|
)
|
|
|
|
func TestParseTaskConfigFromForm_WithEmbeddedStruct(t *testing.T) {
|
|
// Create a maintenance handlers instance for testing
|
|
h := &MaintenanceHandlers{}
|
|
|
|
// Test with balance config
|
|
t.Run("Balance Config", func(t *testing.T) {
|
|
// Simulate form data
|
|
formData := url.Values{
|
|
"enabled": {"on"}, // checkbox field
|
|
"scan_interval_seconds_value": {"30"}, // interval field
|
|
"scan_interval_seconds_unit": {"minutes"}, // interval unit
|
|
"max_concurrent": {"2"}, // number field
|
|
"imbalance_threshold": {"0.15"}, // float field
|
|
"min_server_count": {"3"}, // number field
|
|
}
|
|
|
|
// Get schema
|
|
schema := tasks.GetTaskConfigSchema("balance")
|
|
if schema == nil {
|
|
t.Fatal("Failed to get balance schema")
|
|
}
|
|
|
|
// Create config instance
|
|
config := &balance.Config{}
|
|
|
|
// Parse form data
|
|
err := h.parseTaskConfigFromForm(formData, schema, config)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse form data: %v", err)
|
|
}
|
|
|
|
// Verify embedded struct fields were set correctly
|
|
if !config.Enabled {
|
|
t.Errorf("Expected Enabled=true, got %v", config.Enabled)
|
|
}
|
|
|
|
if config.ScanIntervalSeconds != 1800 { // 30 minutes * 60
|
|
t.Errorf("Expected ScanIntervalSeconds=1800, got %v", config.ScanIntervalSeconds)
|
|
}
|
|
|
|
if config.MaxConcurrent != 2 {
|
|
t.Errorf("Expected MaxConcurrent=2, got %v", config.MaxConcurrent)
|
|
}
|
|
|
|
// Verify balance-specific fields were set correctly
|
|
if config.ImbalanceThreshold != 0.15 {
|
|
t.Errorf("Expected ImbalanceThreshold=0.15, got %v", config.ImbalanceThreshold)
|
|
}
|
|
|
|
if config.MinServerCount != 3 {
|
|
t.Errorf("Expected MinServerCount=3, got %v", config.MinServerCount)
|
|
}
|
|
})
|
|
|
|
// Test with vacuum config
|
|
t.Run("Vacuum Config", func(t *testing.T) {
|
|
// Simulate form data
|
|
formData := url.Values{
|
|
// "enabled" field omitted to simulate unchecked checkbox
|
|
"scan_interval_seconds_value": {"4"}, // interval field
|
|
"scan_interval_seconds_unit": {"hours"}, // interval unit
|
|
"max_concurrent": {"3"}, // number field
|
|
"garbage_threshold": {"0.4"}, // float field
|
|
"min_volume_age_seconds_value": {"2"}, // interval field
|
|
"min_volume_age_seconds_unit": {"days"}, // interval unit
|
|
"min_interval_seconds_value": {"1"}, // interval field
|
|
"min_interval_seconds_unit": {"days"}, // interval unit
|
|
}
|
|
|
|
// Get schema
|
|
schema := tasks.GetTaskConfigSchema("vacuum")
|
|
if schema == nil {
|
|
t.Fatal("Failed to get vacuum schema")
|
|
}
|
|
|
|
// Create config instance
|
|
config := &vacuum.Config{}
|
|
|
|
// Parse form data
|
|
err := h.parseTaskConfigFromForm(formData, schema, config)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse form data: %v", err)
|
|
}
|
|
|
|
// Verify embedded struct fields were set correctly
|
|
if config.Enabled {
|
|
t.Errorf("Expected Enabled=false, got %v", config.Enabled)
|
|
}
|
|
|
|
if config.ScanIntervalSeconds != 14400 { // 4 hours * 3600
|
|
t.Errorf("Expected ScanIntervalSeconds=14400, got %v", config.ScanIntervalSeconds)
|
|
}
|
|
|
|
if config.MaxConcurrent != 3 {
|
|
t.Errorf("Expected MaxConcurrent=3, got %v", config.MaxConcurrent)
|
|
}
|
|
|
|
// Verify vacuum-specific fields were set correctly
|
|
if config.GarbageThreshold != 0.4 {
|
|
t.Errorf("Expected GarbageThreshold=0.4, got %v", config.GarbageThreshold)
|
|
}
|
|
|
|
if config.MinVolumeAgeSeconds != 172800 { // 2 days * 86400
|
|
t.Errorf("Expected MinVolumeAgeSeconds=172800, got %v", config.MinVolumeAgeSeconds)
|
|
}
|
|
|
|
if config.MinIntervalSeconds != 86400 { // 1 day * 86400
|
|
t.Errorf("Expected MinIntervalSeconds=86400, got %v", config.MinIntervalSeconds)
|
|
}
|
|
})
|
|
|
|
// Test with erasure coding config
|
|
t.Run("Erasure Coding Config", func(t *testing.T) {
|
|
// Simulate form data
|
|
formData := url.Values{
|
|
"enabled": {"on"}, // checkbox field
|
|
"scan_interval_seconds_value": {"2"}, // interval field
|
|
"scan_interval_seconds_unit": {"hours"}, // interval unit
|
|
"max_concurrent": {"1"}, // number field
|
|
"quiet_for_seconds_value": {"10"}, // interval field
|
|
"quiet_for_seconds_unit": {"minutes"}, // interval unit
|
|
"fullness_ratio": {"0.85"}, // float field
|
|
"collection_filter": {"test_collection"}, // string field
|
|
"min_size_mb": {"50"}, // number field
|
|
}
|
|
|
|
// Get schema
|
|
schema := tasks.GetTaskConfigSchema("erasure_coding")
|
|
if schema == nil {
|
|
t.Fatal("Failed to get erasure_coding schema")
|
|
}
|
|
|
|
// Create config instance
|
|
config := &erasure_coding.Config{}
|
|
|
|
// Parse form data
|
|
err := h.parseTaskConfigFromForm(formData, schema, config)
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse form data: %v", err)
|
|
}
|
|
|
|
// Verify embedded struct fields were set correctly
|
|
if !config.Enabled {
|
|
t.Errorf("Expected Enabled=true, got %v", config.Enabled)
|
|
}
|
|
|
|
if config.ScanIntervalSeconds != 7200 { // 2 hours * 3600
|
|
t.Errorf("Expected ScanIntervalSeconds=7200, got %v", config.ScanIntervalSeconds)
|
|
}
|
|
|
|
if config.MaxConcurrent != 1 {
|
|
t.Errorf("Expected MaxConcurrent=1, got %v", config.MaxConcurrent)
|
|
}
|
|
|
|
// Verify erasure coding-specific fields were set correctly
|
|
if config.QuietForSeconds != 600 { // 10 minutes * 60
|
|
t.Errorf("Expected QuietForSeconds=600, got %v", config.QuietForSeconds)
|
|
}
|
|
|
|
if config.FullnessRatio != 0.85 {
|
|
t.Errorf("Expected FullnessRatio=0.85, got %v", config.FullnessRatio)
|
|
}
|
|
|
|
if config.CollectionFilter != "test_collection" {
|
|
t.Errorf("Expected CollectionFilter='test_collection', got %v", config.CollectionFilter)
|
|
}
|
|
|
|
if config.MinSizeMB != 50 {
|
|
t.Errorf("Expected MinSizeMB=50, got %v", config.MinSizeMB)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestConfigurationValidation(t *testing.T) {
|
|
// Test that config structs can be validated and converted to protobuf format
|
|
taskTypes := []struct {
|
|
name string
|
|
config interface{}
|
|
}{
|
|
{
|
|
"balance",
|
|
&balance.Config{
|
|
BaseConfig: base.BaseConfig{
|
|
Enabled: true,
|
|
ScanIntervalSeconds: 2400,
|
|
MaxConcurrent: 3,
|
|
},
|
|
ImbalanceThreshold: 0.18,
|
|
MinServerCount: 4,
|
|
},
|
|
},
|
|
{
|
|
"vacuum",
|
|
&vacuum.Config{
|
|
BaseConfig: base.BaseConfig{
|
|
Enabled: false,
|
|
ScanIntervalSeconds: 7200,
|
|
MaxConcurrent: 2,
|
|
},
|
|
GarbageThreshold: 0.35,
|
|
MinVolumeAgeSeconds: 86400,
|
|
MinIntervalSeconds: 604800,
|
|
},
|
|
},
|
|
{
|
|
"erasure_coding",
|
|
&erasure_coding.Config{
|
|
BaseConfig: base.BaseConfig{
|
|
Enabled: true,
|
|
ScanIntervalSeconds: 3600,
|
|
MaxConcurrent: 1,
|
|
},
|
|
QuietForSeconds: 900,
|
|
FullnessRatio: 0.9,
|
|
CollectionFilter: "important",
|
|
MinSizeMB: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, test := range taskTypes {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
// Test that configs can be converted to protobuf TaskPolicy
|
|
switch cfg := test.config.(type) {
|
|
case *balance.Config:
|
|
policy := cfg.ToTaskPolicy()
|
|
if policy == nil {
|
|
t.Fatal("ToTaskPolicy returned nil")
|
|
}
|
|
if policy.Enabled != cfg.Enabled {
|
|
t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled)
|
|
}
|
|
if policy.MaxConcurrent != int32(cfg.MaxConcurrent) {
|
|
t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent)
|
|
}
|
|
case *vacuum.Config:
|
|
policy := cfg.ToTaskPolicy()
|
|
if policy == nil {
|
|
t.Fatal("ToTaskPolicy returned nil")
|
|
}
|
|
if policy.Enabled != cfg.Enabled {
|
|
t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled)
|
|
}
|
|
if policy.MaxConcurrent != int32(cfg.MaxConcurrent) {
|
|
t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent)
|
|
}
|
|
case *erasure_coding.Config:
|
|
policy := cfg.ToTaskPolicy()
|
|
if policy == nil {
|
|
t.Fatal("ToTaskPolicy returned nil")
|
|
}
|
|
if policy.Enabled != cfg.Enabled {
|
|
t.Errorf("Expected Enabled=%v, got %v", cfg.Enabled, policy.Enabled)
|
|
}
|
|
if policy.MaxConcurrent != int32(cfg.MaxConcurrent) {
|
|
t.Errorf("Expected MaxConcurrent=%v, got %v", cfg.MaxConcurrent, policy.MaxConcurrent)
|
|
}
|
|
default:
|
|
t.Fatalf("Unknown config type: %T", test.config)
|
|
}
|
|
|
|
// Test that configs can be validated
|
|
switch cfg := test.config.(type) {
|
|
case *balance.Config:
|
|
if err := cfg.Validate(); err != nil {
|
|
t.Errorf("Validation failed: %v", err)
|
|
}
|
|
case *vacuum.Config:
|
|
if err := cfg.Validate(); err != nil {
|
|
t.Errorf("Validation failed: %v", err)
|
|
}
|
|
case *erasure_coding.Config:
|
|
if err := cfg.Validate(); err != nil {
|
|
t.Errorf("Validation failed: %v", err)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestParseFieldFromForm_EdgeCases(t *testing.T) {
|
|
h := &MaintenanceHandlers{}
|
|
|
|
// Test checkbox parsing (boolean fields)
|
|
t.Run("Checkbox Fields", func(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
formData url.Values
|
|
expectedValue bool
|
|
}{
|
|
{"Checked checkbox", url.Values{"test_field": {"on"}}, true},
|
|
{"Unchecked checkbox", url.Values{}, false},
|
|
{"Empty value checkbox", url.Values{"test_field": {""}}, true}, // Present but empty means checked
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
schema := &tasks.TaskConfigSchema{
|
|
Schema: config.Schema{
|
|
Fields: []*config.Field{
|
|
{
|
|
JSONName: "test_field",
|
|
Type: config.FieldTypeBool,
|
|
InputType: "checkbox",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
type TestConfig struct {
|
|
TestField bool `json:"test_field"`
|
|
}
|
|
|
|
config := &TestConfig{}
|
|
err := h.parseTaskConfigFromForm(test.formData, schema, config)
|
|
if err != nil {
|
|
t.Fatalf("parseTaskConfigFromForm failed: %v", err)
|
|
}
|
|
|
|
if config.TestField != test.expectedValue {
|
|
t.Errorf("Expected %v, got %v", test.expectedValue, config.TestField)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
// Test interval parsing
|
|
t.Run("Interval Fields", func(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
value string
|
|
unit string
|
|
expectedSecs int
|
|
}{
|
|
{"Minutes", "30", "minutes", 1800},
|
|
{"Hours", "2", "hours", 7200},
|
|
{"Days", "1", "days", 86400},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
formData := url.Values{
|
|
"test_field_value": {test.value},
|
|
"test_field_unit": {test.unit},
|
|
}
|
|
|
|
schema := &tasks.TaskConfigSchema{
|
|
Schema: config.Schema{
|
|
Fields: []*config.Field{
|
|
{
|
|
JSONName: "test_field",
|
|
Type: config.FieldTypeInterval,
|
|
InputType: "interval",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
type TestConfig struct {
|
|
TestField int `json:"test_field"`
|
|
}
|
|
|
|
config := &TestConfig{}
|
|
err := h.parseTaskConfigFromForm(formData, schema, config)
|
|
if err != nil {
|
|
t.Fatalf("parseTaskConfigFromForm failed: %v", err)
|
|
}
|
|
|
|
if config.TestField != test.expectedSecs {
|
|
t.Errorf("Expected %d seconds, got %d", test.expectedSecs, config.TestField)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|