mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-08-16 00:52:48 +02:00
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
413 lines
No EOL
12 KiB
Markdown
413 lines
No EOL
12 KiB
Markdown
# SeaweedFS Task Distribution System Design
|
|
|
|
## Overview
|
|
|
|
This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
|
|
|
|
## System Architecture
|
|
|
|
### High-Level Components
|
|
|
|
```
|
|
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
|
│ Master │◄──►│ Admin Server │◄──►│ Workers │
|
|
│ │ │ │ │ │
|
|
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
|
|
│ - Shard Status │ │ - Task Assign │ │ - Progress │
|
|
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
|
|
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
|
│ │ │
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
|
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
|
|
│ │ │ │ │ │
|
|
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
|
|
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
|
|
│ - Report Status │ │ - State Sync │ │ - Status Report │
|
|
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
|
```
|
|
|
|
## 1. Admin Server Design
|
|
|
|
### 1.1 Core Responsibilities
|
|
|
|
- **Task Discovery**: Scan volumes to identify EC and vacuum candidates
|
|
- **Worker Management**: Track available workers and their capabilities
|
|
- **Task Assignment**: Match tasks to optimal workers
|
|
- **Progress Tracking**: Monitor in-progress tasks for capacity planning
|
|
- **State Reconciliation**: Sync with master server for volume state updates
|
|
|
|
### 1.2 Task Discovery Engine
|
|
|
|
```go
|
|
type TaskDiscoveryEngine struct {
|
|
masterClient MasterClient
|
|
volumeScanner VolumeScanner
|
|
taskDetectors map[TaskType]TaskDetector
|
|
scanInterval time.Duration
|
|
}
|
|
|
|
type VolumeCandidate struct {
|
|
VolumeID uint32
|
|
Server string
|
|
Collection string
|
|
TaskType TaskType
|
|
Priority TaskPriority
|
|
Reason string
|
|
DetectedAt time.Time
|
|
Parameters map[string]interface{}
|
|
}
|
|
```
|
|
|
|
**EC Detection Logic**:
|
|
- Find volumes >= 95% full and idle for > 1 hour
|
|
- Exclude volumes already in EC format
|
|
- Exclude volumes with ongoing operations
|
|
- Prioritize by collection and age
|
|
|
|
**Vacuum Detection Logic**:
|
|
- Find volumes with garbage ratio > 30%
|
|
- Exclude read-only volumes
|
|
- Exclude volumes with recent vacuum operations
|
|
- Prioritize by garbage percentage
|
|
|
|
### 1.3 Worker Registry & Management
|
|
|
|
```go
|
|
type WorkerRegistry struct {
|
|
workers map[string]*Worker
|
|
capabilities map[TaskType][]*Worker
|
|
lastHeartbeat map[string]time.Time
|
|
taskAssignment map[string]*Task
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
type Worker struct {
|
|
ID string
|
|
Address string
|
|
Capabilities []TaskType
|
|
MaxConcurrent int
|
|
CurrentLoad int
|
|
Status WorkerStatus
|
|
LastSeen time.Time
|
|
Performance WorkerMetrics
|
|
}
|
|
```
|
|
|
|
### 1.4 Task Assignment Algorithm
|
|
|
|
```go
|
|
type TaskScheduler struct {
|
|
registry *WorkerRegistry
|
|
taskQueue *PriorityQueue
|
|
inProgressTasks map[string]*InProgressTask
|
|
volumeReservations map[uint32]*VolumeReservation
|
|
}
|
|
|
|
// Worker Selection Criteria:
|
|
// 1. Has required capability (EC or Vacuum)
|
|
// 2. Available capacity (CurrentLoad < MaxConcurrent)
|
|
// 3. Best performance history for task type
|
|
// 4. Lowest current load
|
|
// 5. Geographically close to volume server (optional)
|
|
```
|
|
|
|
## 2. Worker Process Design
|
|
|
|
### 2.1 Worker Architecture
|
|
|
|
```go
|
|
type MaintenanceWorker struct {
|
|
id string
|
|
config *WorkerConfig
|
|
adminClient AdminClient
|
|
taskExecutors map[TaskType]TaskExecutor
|
|
currentTasks map[string]*RunningTask
|
|
registry *TaskRegistry
|
|
heartbeatTicker *time.Ticker
|
|
requestTicker *time.Ticker
|
|
}
|
|
```
|
|
|
|
### 2.2 Task Execution Framework
|
|
|
|
```go
|
|
type TaskExecutor interface {
|
|
Execute(ctx context.Context, task *Task) error
|
|
EstimateTime(task *Task) time.Duration
|
|
ValidateResources(task *Task) error
|
|
GetProgress() float64
|
|
Cancel() error
|
|
}
|
|
|
|
type ErasureCodingExecutor struct {
|
|
volumeClient VolumeServerClient
|
|
progress float64
|
|
cancelled bool
|
|
}
|
|
|
|
type VacuumExecutor struct {
|
|
volumeClient VolumeServerClient
|
|
progress float64
|
|
cancelled bool
|
|
}
|
|
```
|
|
|
|
### 2.3 Worker Capabilities & Registration
|
|
|
|
```go
|
|
type WorkerCapabilities struct {
|
|
SupportedTasks []TaskType
|
|
MaxConcurrent int
|
|
ResourceLimits ResourceLimits
|
|
PreferredServers []string // Affinity for specific volume servers
|
|
}
|
|
|
|
type ResourceLimits struct {
|
|
MaxMemoryMB int64
|
|
MaxDiskSpaceMB int64
|
|
MaxNetworkMbps int64
|
|
MaxCPUPercent float64
|
|
}
|
|
```
|
|
|
|
## 3. Task Lifecycle Management
|
|
|
|
### 3.1 Task States
|
|
|
|
```go
|
|
type TaskState string
|
|
|
|
const (
|
|
TaskStatePending TaskState = "pending"
|
|
TaskStateAssigned TaskState = "assigned"
|
|
TaskStateInProgress TaskState = "in_progress"
|
|
TaskStateCompleted TaskState = "completed"
|
|
TaskStateFailed TaskState = "failed"
|
|
TaskStateCancelled TaskState = "cancelled"
|
|
TaskStateStuck TaskState = "stuck" // Taking too long
|
|
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
|
|
)
|
|
```
|
|
|
|
### 3.2 Progress Tracking & Monitoring
|
|
|
|
```go
|
|
type InProgressTask struct {
|
|
Task *Task
|
|
WorkerID string
|
|
StartedAt time.Time
|
|
LastUpdate time.Time
|
|
Progress float64
|
|
EstimatedEnd time.Time
|
|
VolumeReserved bool // Reserved for capacity planning
|
|
}
|
|
|
|
type TaskMonitor struct {
|
|
inProgressTasks map[string]*InProgressTask
|
|
timeoutChecker *time.Ticker
|
|
stuckDetector *time.Ticker
|
|
duplicateChecker *time.Ticker
|
|
}
|
|
```
|
|
|
|
## 4. Volume Capacity Reconciliation
|
|
|
|
### 4.1 Volume State Tracking
|
|
|
|
```go
|
|
type VolumeStateManager struct {
|
|
masterClient MasterClient
|
|
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
|
|
committedChanges map[uint32]*VolumeChange // Changes not yet in master
|
|
reconcileInterval time.Duration
|
|
}
|
|
|
|
type VolumeChange struct {
|
|
VolumeID uint32
|
|
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
|
|
OldCapacity int64
|
|
NewCapacity int64
|
|
TaskID string
|
|
CompletedAt time.Time
|
|
ReportedToMaster bool
|
|
}
|
|
```
|
|
|
|
### 4.2 Shard Assignment Integration
|
|
|
|
When the master needs to assign shards, it must consider:
|
|
1. **Current volume state** from its own records
|
|
2. **In-progress capacity changes** from admin server
|
|
3. **Committed but unreported changes** from admin server
|
|
|
|
```go
|
|
type CapacityOracle struct {
|
|
adminServer AdminServerClient
|
|
masterState *MasterVolumeState
|
|
updateFreq time.Duration
|
|
}
|
|
|
|
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
|
|
baseCapacity := o.masterState.GetCapacity(volumeID)
|
|
|
|
// Adjust for in-progress tasks
|
|
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
|
|
switch task.Type {
|
|
case TaskTypeErasureCoding:
|
|
// EC reduces effective capacity
|
|
return baseCapacity / 2 // Simplified
|
|
case TaskTypeVacuum:
|
|
// Vacuum may increase available space
|
|
return baseCapacity + int64(float64(baseCapacity) * 0.3)
|
|
}
|
|
}
|
|
|
|
// Adjust for completed but unreported changes
|
|
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
|
|
return change.NewCapacity
|
|
}
|
|
|
|
return baseCapacity
|
|
}
|
|
```
|
|
|
|
## 5. Error Handling & Recovery
|
|
|
|
### 5.1 Worker Failure Scenarios
|
|
|
|
```go
|
|
type FailureHandler struct {
|
|
taskRescheduler *TaskRescheduler
|
|
workerMonitor *WorkerMonitor
|
|
alertManager *AlertManager
|
|
}
|
|
|
|
// Failure Scenarios:
|
|
// 1. Worker becomes unresponsive (heartbeat timeout)
|
|
// 2. Task execution fails (reported by worker)
|
|
// 3. Task gets stuck (progress timeout)
|
|
// 4. Duplicate task detection
|
|
// 5. Resource exhaustion
|
|
```
|
|
|
|
### 5.2 Recovery Strategies
|
|
|
|
**Worker Timeout Recovery**:
|
|
- Mark worker as inactive after 3 missed heartbeats
|
|
- Reschedule all assigned tasks to other workers
|
|
- Cleanup any partial state
|
|
|
|
**Task Stuck Recovery**:
|
|
- Detect tasks with no progress for > 2x estimated time
|
|
- Cancel stuck task and mark volume for cleanup
|
|
- Reschedule if retry count < max_retries
|
|
|
|
**Duplicate Task Prevention**:
|
|
```go
|
|
type DuplicateDetector struct {
|
|
activeFingerprints map[string]bool // VolumeID+TaskType
|
|
recentCompleted *LRUCache // Recently completed tasks
|
|
}
|
|
|
|
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
|
|
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
|
|
return d.activeFingerprints[fingerprint] ||
|
|
d.recentCompleted.Contains(fingerprint)
|
|
}
|
|
```
|
|
|
|
## 6. Simulation & Testing Framework
|
|
|
|
### 6.1 Failure Simulation
|
|
|
|
```go
|
|
type TaskSimulator struct {
|
|
scenarios map[string]SimulationScenario
|
|
}
|
|
|
|
type SimulationScenario struct {
|
|
Name string
|
|
WorkerCount int
|
|
VolumeCount int
|
|
FailurePatterns []FailurePattern
|
|
Duration time.Duration
|
|
}
|
|
|
|
type FailurePattern struct {
|
|
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
|
|
Probability float64 // 0.0 to 1.0
|
|
Timing TimingSpec // When during task execution
|
|
Duration time.Duration
|
|
}
|
|
```
|
|
|
|
### 6.2 Test Scenarios
|
|
|
|
**Scenario 1: Worker Timeout During EC**
|
|
- Start EC task on 30GB volume
|
|
- Kill worker at 50% progress
|
|
- Verify task reassignment
|
|
- Verify no duplicate EC operations
|
|
|
|
**Scenario 2: Stuck Vacuum Task**
|
|
- Start vacuum on high-garbage volume
|
|
- Simulate worker hanging at 75% progress
|
|
- Verify timeout detection and cleanup
|
|
- Verify volume state consistency
|
|
|
|
**Scenario 3: Duplicate Task Prevention**
|
|
- Submit same EC task from multiple sources
|
|
- Verify only one task executes
|
|
- Verify proper conflict resolution
|
|
|
|
**Scenario 4: Master-Admin State Divergence**
|
|
- Create in-progress EC task
|
|
- Simulate master restart
|
|
- Verify state reconciliation
|
|
- Verify shard assignment accounts for in-progress work
|
|
|
|
## 7. Performance & Scalability
|
|
|
|
### 7.1 Metrics & Monitoring
|
|
|
|
```go
|
|
type SystemMetrics struct {
|
|
TasksPerSecond float64
|
|
WorkerUtilization float64
|
|
AverageTaskTime time.Duration
|
|
FailureRate float64
|
|
QueueDepth int
|
|
VolumeStatesSync bool
|
|
}
|
|
```
|
|
|
|
### 7.2 Scalability Considerations
|
|
|
|
- **Horizontal Worker Scaling**: Add workers without admin server changes
|
|
- **Admin Server HA**: Master-slave admin servers for fault tolerance
|
|
- **Task Partitioning**: Partition tasks by collection or datacenter
|
|
- **Batch Operations**: Group similar tasks for efficiency
|
|
|
|
## 8. Implementation Plan
|
|
|
|
### Phase 1: Core Infrastructure
|
|
1. Admin server basic framework
|
|
2. Worker registration and heartbeat
|
|
3. Simple task assignment
|
|
4. Basic progress tracking
|
|
|
|
### Phase 2: Advanced Features
|
|
1. Volume state reconciliation
|
|
2. Sophisticated worker selection
|
|
3. Failure detection and recovery
|
|
4. Duplicate prevention
|
|
|
|
### Phase 3: Optimization & Monitoring
|
|
1. Performance metrics
|
|
2. Load balancing algorithms
|
|
3. Capacity planning integration
|
|
4. Comprehensive monitoring
|
|
|
|
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns. |