mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-08-16 00:52:48 +02:00
413 lines
No EOL
12 KiB
Markdown
413 lines
No EOL
12 KiB
Markdown
# SeaweedFS Task Distribution System Design
|
|
|
|
## Overview
|
|
|
|
This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
|
|
|
|
## System Architecture
|
|
|
|
### High-Level Components
|
|
|
|
```
|
|
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
|
│ Master │◄──►│ Admin Server │◄──►│ Workers │
|
|
│ │ │ │ │ │
|
|
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
|
|
│ - Shard Status │ │ - Task Assign │ │ - Progress │
|
|
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
|
|
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
|
│ │ │
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
|
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
|
|
│ │ │ │ │ │
|
|
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
|
|
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
|
|
│ - Report Status │ │ - State Sync │ │ - Status Report │
|
|
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
|
```
|
|
|
|
## 1. Admin Server Design
|
|
|
|
### 1.1 Core Responsibilities
|
|
|
|
- **Task Discovery**: Scan volumes to identify EC and vacuum candidates
|
|
- **Worker Management**: Track available workers and their capabilities
|
|
- **Task Assignment**: Match tasks to optimal workers
|
|
- **Progress Tracking**: Monitor in-progress tasks for capacity planning
|
|
- **State Reconciliation**: Sync with master server for volume state updates
|
|
|
|
### 1.2 Task Discovery Engine
|
|
|
|
```go
|
|
type TaskDiscoveryEngine struct {
|
|
masterClient MasterClient
|
|
volumeScanner VolumeScanner
|
|
taskDetectors map[TaskType]TaskDetector
|
|
scanInterval time.Duration
|
|
}
|
|
|
|
type VolumeCandidate struct {
|
|
VolumeID uint32
|
|
Server string
|
|
Collection string
|
|
TaskType TaskType
|
|
Priority TaskPriority
|
|
Reason string
|
|
DetectedAt time.Time
|
|
Parameters map[string]interface{}
|
|
}
|
|
```
|
|
|
|
**EC Detection Logic**:
|
|
- Find volumes >= 95% full and idle for > 1 hour
|
|
- Exclude volumes already in EC format
|
|
- Exclude volumes with ongoing operations
|
|
- Prioritize by collection and age
|
|
|
|
**Vacuum Detection Logic**:
|
|
- Find volumes with garbage ratio > 30%
|
|
- Exclude read-only volumes
|
|
- Exclude volumes with recent vacuum operations
|
|
- Prioritize by garbage percentage
|
|
|
|
### 1.3 Worker Registry & Management
|
|
|
|
```go
|
|
type WorkerRegistry struct {
|
|
workers map[string]*Worker
|
|
capabilities map[TaskType][]*Worker
|
|
lastHeartbeat map[string]time.Time
|
|
taskAssignment map[string]*Task
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
type Worker struct {
|
|
ID string
|
|
Address string
|
|
Capabilities []TaskType
|
|
MaxConcurrent int
|
|
CurrentLoad int
|
|
Status WorkerStatus
|
|
LastSeen time.Time
|
|
Performance WorkerMetrics
|
|
}
|
|
```
|
|
|
|
### 1.4 Task Assignment Algorithm
|
|
|
|
```go
|
|
type TaskScheduler struct {
|
|
registry *WorkerRegistry
|
|
taskQueue *PriorityQueue
|
|
inProgressTasks map[string]*InProgressTask
|
|
volumeReservations map[uint32]*VolumeReservation
|
|
}
|
|
|
|
// Worker Selection Criteria:
|
|
// 1. Has required capability (EC or Vacuum)
|
|
// 2. Available capacity (CurrentLoad < MaxConcurrent)
|
|
// 3. Best performance history for task type
|
|
// 4. Lowest current load
|
|
// 5. Geographically close to volume server (optional)
|
|
```
|
|
|
|
## 2. Worker Process Design
|
|
|
|
### 2.1 Worker Architecture
|
|
|
|
```go
|
|
type MaintenanceWorker struct {
|
|
id string
|
|
config *WorkerConfig
|
|
adminClient AdminClient
|
|
taskExecutors map[TaskType]TaskExecutor
|
|
currentTasks map[string]*RunningTask
|
|
registry *TaskRegistry
|
|
heartbeatTicker *time.Ticker
|
|
requestTicker *time.Ticker
|
|
}
|
|
```
|
|
|
|
### 2.2 Task Execution Framework
|
|
|
|
```go
|
|
type TaskExecutor interface {
|
|
Execute(ctx context.Context, task *Task) error
|
|
EstimateTime(task *Task) time.Duration
|
|
ValidateResources(task *Task) error
|
|
GetProgress() float64
|
|
Cancel() error
|
|
}
|
|
|
|
type ErasureCodingExecutor struct {
|
|
volumeClient VolumeServerClient
|
|
progress float64
|
|
cancelled bool
|
|
}
|
|
|
|
type VacuumExecutor struct {
|
|
volumeClient VolumeServerClient
|
|
progress float64
|
|
cancelled bool
|
|
}
|
|
```
|
|
|
|
### 2.3 Worker Capabilities & Registration
|
|
|
|
```go
|
|
type WorkerCapabilities struct {
|
|
SupportedTasks []TaskType
|
|
MaxConcurrent int
|
|
ResourceLimits ResourceLimits
|
|
PreferredServers []string // Affinity for specific volume servers
|
|
}
|
|
|
|
type ResourceLimits struct {
|
|
MaxMemoryMB int64
|
|
MaxDiskSpaceMB int64
|
|
MaxNetworkMbps int64
|
|
MaxCPUPercent float64
|
|
}
|
|
```
|
|
|
|
## 3. Task Lifecycle Management
|
|
|
|
### 3.1 Task States
|
|
|
|
```go
|
|
type TaskState string
|
|
|
|
const (
|
|
TaskStatePending TaskState = "pending"
|
|
TaskStateAssigned TaskState = "assigned"
|
|
TaskStateInProgress TaskState = "in_progress"
|
|
TaskStateCompleted TaskState = "completed"
|
|
TaskStateFailed TaskState = "failed"
|
|
TaskStateCancelled TaskState = "cancelled"
|
|
TaskStateStuck TaskState = "stuck" // Taking too long
|
|
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
|
|
)
|
|
```
|
|
|
|
### 3.2 Progress Tracking & Monitoring
|
|
|
|
```go
|
|
type InProgressTask struct {
|
|
Task *Task
|
|
WorkerID string
|
|
StartedAt time.Time
|
|
LastUpdate time.Time
|
|
Progress float64
|
|
EstimatedEnd time.Time
|
|
VolumeReserved bool // Reserved for capacity planning
|
|
}
|
|
|
|
type TaskMonitor struct {
|
|
inProgressTasks map[string]*InProgressTask
|
|
timeoutChecker *time.Ticker
|
|
stuckDetector *time.Ticker
|
|
duplicateChecker *time.Ticker
|
|
}
|
|
```
|
|
|
|
## 4. Volume Capacity Reconciliation
|
|
|
|
### 4.1 Volume State Tracking
|
|
|
|
```go
|
|
type VolumeStateManager struct {
|
|
masterClient MasterClient
|
|
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
|
|
committedChanges map[uint32]*VolumeChange // Changes not yet in master
|
|
reconcileInterval time.Duration
|
|
}
|
|
|
|
type VolumeChange struct {
|
|
VolumeID uint32
|
|
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
|
|
OldCapacity int64
|
|
NewCapacity int64
|
|
TaskID string
|
|
CompletedAt time.Time
|
|
ReportedToMaster bool
|
|
}
|
|
```
|
|
|
|
### 4.2 Shard Assignment Integration
|
|
|
|
When the master needs to assign shards, it must consider:
|
|
1. **Current volume state** from its own records
|
|
2. **In-progress capacity changes** from admin server
|
|
3. **Committed but unreported changes** from admin server
|
|
|
|
```go
|
|
type CapacityOracle struct {
|
|
adminServer AdminServerClient
|
|
masterState *MasterVolumeState
|
|
updateFreq time.Duration
|
|
}
|
|
|
|
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
|
|
baseCapacity := o.masterState.GetCapacity(volumeID)
|
|
|
|
// Adjust for in-progress tasks
|
|
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
|
|
switch task.Type {
|
|
case TaskTypeErasureCoding:
|
|
// EC reduces effective capacity
|
|
return baseCapacity / 2 // Simplified
|
|
case TaskTypeVacuum:
|
|
// Vacuum may increase available space
|
|
return baseCapacity + int64(float64(baseCapacity) * 0.3)
|
|
}
|
|
}
|
|
|
|
// Adjust for completed but unreported changes
|
|
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
|
|
return change.NewCapacity
|
|
}
|
|
|
|
return baseCapacity
|
|
}
|
|
```
|
|
|
|
## 5. Error Handling & Recovery
|
|
|
|
### 5.1 Worker Failure Scenarios
|
|
|
|
```go
|
|
type FailureHandler struct {
|
|
taskRescheduler *TaskRescheduler
|
|
workerMonitor *WorkerMonitor
|
|
alertManager *AlertManager
|
|
}
|
|
|
|
// Failure Scenarios:
|
|
// 1. Worker becomes unresponsive (heartbeat timeout)
|
|
// 2. Task execution fails (reported by worker)
|
|
// 3. Task gets stuck (progress timeout)
|
|
// 4. Duplicate task detection
|
|
// 5. Resource exhaustion
|
|
```
|
|
|
|
### 5.2 Recovery Strategies
|
|
|
|
**Worker Timeout Recovery**:
|
|
- Mark worker as inactive after 3 missed heartbeats
|
|
- Reschedule all assigned tasks to other workers
|
|
- Cleanup any partial state
|
|
|
|
**Task Stuck Recovery**:
|
|
- Detect tasks with no progress for > 2x estimated time
|
|
- Cancel stuck task and mark volume for cleanup
|
|
- Reschedule if retry count < max_retries
|
|
|
|
**Duplicate Task Prevention**:
|
|
```go
|
|
type DuplicateDetector struct {
|
|
activeFingerprints map[string]bool // VolumeID+TaskType
|
|
recentCompleted *LRUCache // Recently completed tasks
|
|
}
|
|
|
|
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
|
|
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
|
|
return d.activeFingerprints[fingerprint] ||
|
|
d.recentCompleted.Contains(fingerprint)
|
|
}
|
|
```
|
|
|
|
## 6. Simulation & Testing Framework
|
|
|
|
### 6.1 Failure Simulation
|
|
|
|
```go
|
|
type TaskSimulator struct {
|
|
scenarios map[string]SimulationScenario
|
|
}
|
|
|
|
type SimulationScenario struct {
|
|
Name string
|
|
WorkerCount int
|
|
VolumeCount int
|
|
FailurePatterns []FailurePattern
|
|
Duration time.Duration
|
|
}
|
|
|
|
type FailurePattern struct {
|
|
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
|
|
Probability float64 // 0.0 to 1.0
|
|
Timing TimingSpec // When during task execution
|
|
Duration time.Duration
|
|
}
|
|
```
|
|
|
|
### 6.2 Test Scenarios
|
|
|
|
**Scenario 1: Worker Timeout During EC**
|
|
- Start EC task on 30GB volume
|
|
- Kill worker at 50% progress
|
|
- Verify task reassignment
|
|
- Verify no duplicate EC operations
|
|
|
|
**Scenario 2: Stuck Vacuum Task**
|
|
- Start vacuum on high-garbage volume
|
|
- Simulate worker hanging at 75% progress
|
|
- Verify timeout detection and cleanup
|
|
- Verify volume state consistency
|
|
|
|
**Scenario 3: Duplicate Task Prevention**
|
|
- Submit same EC task from multiple sources
|
|
- Verify only one task executes
|
|
- Verify proper conflict resolution
|
|
|
|
**Scenario 4: Master-Admin State Divergence**
|
|
- Create in-progress EC task
|
|
- Simulate master restart
|
|
- Verify state reconciliation
|
|
- Verify shard assignment accounts for in-progress work
|
|
|
|
## 7. Performance & Scalability
|
|
|
|
### 7.1 Metrics & Monitoring
|
|
|
|
```go
|
|
type SystemMetrics struct {
|
|
TasksPerSecond float64
|
|
WorkerUtilization float64
|
|
AverageTaskTime time.Duration
|
|
FailureRate float64
|
|
QueueDepth int
|
|
VolumeStatesSync bool
|
|
}
|
|
```
|
|
|
|
### 7.2 Scalability Considerations
|
|
|
|
- **Horizontal Worker Scaling**: Add workers without admin server changes
|
|
- **Admin Server HA**: Master-slave admin servers for fault tolerance
|
|
- **Task Partitioning**: Partition tasks by collection or datacenter
|
|
- **Batch Operations**: Group similar tasks for efficiency
|
|
|
|
## 8. Implementation Plan
|
|
|
|
### Phase 1: Core Infrastructure
|
|
1. Admin server basic framework
|
|
2. Worker registration and heartbeat
|
|
3. Simple task assignment
|
|
4. Basic progress tracking
|
|
|
|
### Phase 2: Advanced Features
|
|
1. Volume state reconciliation
|
|
2. Sophisticated worker selection
|
|
3. Failure detection and recovery
|
|
4. Duplicate prevention
|
|
|
|
### Phase 3: Optimization & Monitoring
|
|
1. Performance metrics
|
|
2. Load balancing algorithms
|
|
3. Capacity planning integration
|
|
4. Comprehensive monitoring
|
|
|
|
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns. |