1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-16 00:52:48 +02:00
seaweedfs/DESIGN.md
2025-07-24 00:37:02 -07:00

413 lines
No EOL
12 KiB
Markdown

# SeaweedFS Task Distribution System Design
## Overview
This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
## System Architecture
### High-Level Components
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Master │◄──►│ Admin Server │◄──►│ Workers │
│ │ │ │ │ │
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
│ - Shard Status │ │ - Task Assign │ │ - Progress │
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
│ │ │ │ │ │
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
│ - Report Status │ │ - State Sync │ │ - Status Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## 1. Admin Server Design
### 1.1 Core Responsibilities
- **Task Discovery**: Scan volumes to identify EC and vacuum candidates
- **Worker Management**: Track available workers and their capabilities
- **Task Assignment**: Match tasks to optimal workers
- **Progress Tracking**: Monitor in-progress tasks for capacity planning
- **State Reconciliation**: Sync with master server for volume state updates
### 1.2 Task Discovery Engine
```go
type TaskDiscoveryEngine struct {
masterClient MasterClient
volumeScanner VolumeScanner
taskDetectors map[TaskType]TaskDetector
scanInterval time.Duration
}
type VolumeCandidate struct {
VolumeID uint32
Server string
Collection string
TaskType TaskType
Priority TaskPriority
Reason string
DetectedAt time.Time
Parameters map[string]interface{}
}
```
**EC Detection Logic**:
- Find volumes >= 95% full and idle for > 1 hour
- Exclude volumes already in EC format
- Exclude volumes with ongoing operations
- Prioritize by collection and age
**Vacuum Detection Logic**:
- Find volumes with garbage ratio > 30%
- Exclude read-only volumes
- Exclude volumes with recent vacuum operations
- Prioritize by garbage percentage
### 1.3 Worker Registry & Management
```go
type WorkerRegistry struct {
workers map[string]*Worker
capabilities map[TaskType][]*Worker
lastHeartbeat map[string]time.Time
taskAssignment map[string]*Task
mutex sync.RWMutex
}
type Worker struct {
ID string
Address string
Capabilities []TaskType
MaxConcurrent int
CurrentLoad int
Status WorkerStatus
LastSeen time.Time
Performance WorkerMetrics
}
```
### 1.4 Task Assignment Algorithm
```go
type TaskScheduler struct {
registry *WorkerRegistry
taskQueue *PriorityQueue
inProgressTasks map[string]*InProgressTask
volumeReservations map[uint32]*VolumeReservation
}
// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)
```
## 2. Worker Process Design
### 2.1 Worker Architecture
```go
type MaintenanceWorker struct {
id string
config *WorkerConfig
adminClient AdminClient
taskExecutors map[TaskType]TaskExecutor
currentTasks map[string]*RunningTask
registry *TaskRegistry
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
}
```
### 2.2 Task Execution Framework
```go
type TaskExecutor interface {
Execute(ctx context.Context, task *Task) error
EstimateTime(task *Task) time.Duration
ValidateResources(task *Task) error
GetProgress() float64
Cancel() error
}
type ErasureCodingExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
type VacuumExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
```
### 2.3 Worker Capabilities & Registration
```go
type WorkerCapabilities struct {
SupportedTasks []TaskType
MaxConcurrent int
ResourceLimits ResourceLimits
PreferredServers []string // Affinity for specific volume servers
}
type ResourceLimits struct {
MaxMemoryMB int64
MaxDiskSpaceMB int64
MaxNetworkMbps int64
MaxCPUPercent float64
}
```
## 3. Task Lifecycle Management
### 3.1 Task States
```go
type TaskState string
const (
TaskStatePending TaskState = "pending"
TaskStateAssigned TaskState = "assigned"
TaskStateInProgress TaskState = "in_progress"
TaskStateCompleted TaskState = "completed"
TaskStateFailed TaskState = "failed"
TaskStateCancelled TaskState = "cancelled"
TaskStateStuck TaskState = "stuck" // Taking too long
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
)
```
### 3.2 Progress Tracking & Monitoring
```go
type InProgressTask struct {
Task *Task
WorkerID string
StartedAt time.Time
LastUpdate time.Time
Progress float64
EstimatedEnd time.Time
VolumeReserved bool // Reserved for capacity planning
}
type TaskMonitor struct {
inProgressTasks map[string]*InProgressTask
timeoutChecker *time.Ticker
stuckDetector *time.Ticker
duplicateChecker *time.Ticker
}
```
## 4. Volume Capacity Reconciliation
### 4.1 Volume State Tracking
```go
type VolumeStateManager struct {
masterClient MasterClient
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
committedChanges map[uint32]*VolumeChange // Changes not yet in master
reconcileInterval time.Duration
}
type VolumeChange struct {
VolumeID uint32
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
OldCapacity int64
NewCapacity int64
TaskID string
CompletedAt time.Time
ReportedToMaster bool
}
```
### 4.2 Shard Assignment Integration
When the master needs to assign shards, it must consider:
1. **Current volume state** from its own records
2. **In-progress capacity changes** from admin server
3. **Committed but unreported changes** from admin server
```go
type CapacityOracle struct {
adminServer AdminServerClient
masterState *MasterVolumeState
updateFreq time.Duration
}
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
baseCapacity := o.masterState.GetCapacity(volumeID)
// Adjust for in-progress tasks
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
switch task.Type {
case TaskTypeErasureCoding:
// EC reduces effective capacity
return baseCapacity / 2 // Simplified
case TaskTypeVacuum:
// Vacuum may increase available space
return baseCapacity + int64(float64(baseCapacity) * 0.3)
}
}
// Adjust for completed but unreported changes
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
return change.NewCapacity
}
return baseCapacity
}
```
## 5. Error Handling & Recovery
### 5.1 Worker Failure Scenarios
```go
type FailureHandler struct {
taskRescheduler *TaskRescheduler
workerMonitor *WorkerMonitor
alertManager *AlertManager
}
// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion
```
### 5.2 Recovery Strategies
**Worker Timeout Recovery**:
- Mark worker as inactive after 3 missed heartbeats
- Reschedule all assigned tasks to other workers
- Cleanup any partial state
**Task Stuck Recovery**:
- Detect tasks with no progress for > 2x estimated time
- Cancel stuck task and mark volume for cleanup
- Reschedule if retry count < max_retries
**Duplicate Task Prevention**:
```go
type DuplicateDetector struct {
activeFingerprints map[string]bool // VolumeID+TaskType
recentCompleted *LRUCache // Recently completed tasks
}
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
return d.activeFingerprints[fingerprint] ||
d.recentCompleted.Contains(fingerprint)
}
```
## 6. Simulation & Testing Framework
### 6.1 Failure Simulation
```go
type TaskSimulator struct {
scenarios map[string]SimulationScenario
}
type SimulationScenario struct {
Name string
WorkerCount int
VolumeCount int
FailurePatterns []FailurePattern
Duration time.Duration
}
type FailurePattern struct {
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
Probability float64 // 0.0 to 1.0
Timing TimingSpec // When during task execution
Duration time.Duration
}
```
### 6.2 Test Scenarios
**Scenario 1: Worker Timeout During EC**
- Start EC task on 30GB volume
- Kill worker at 50% progress
- Verify task reassignment
- Verify no duplicate EC operations
**Scenario 2: Stuck Vacuum Task**
- Start vacuum on high-garbage volume
- Simulate worker hanging at 75% progress
- Verify timeout detection and cleanup
- Verify volume state consistency
**Scenario 3: Duplicate Task Prevention**
- Submit same EC task from multiple sources
- Verify only one task executes
- Verify proper conflict resolution
**Scenario 4: Master-Admin State Divergence**
- Create in-progress EC task
- Simulate master restart
- Verify state reconciliation
- Verify shard assignment accounts for in-progress work
## 7. Performance & Scalability
### 7.1 Metrics & Monitoring
```go
type SystemMetrics struct {
TasksPerSecond float64
WorkerUtilization float64
AverageTaskTime time.Duration
FailureRate float64
QueueDepth int
VolumeStatesSync bool
}
```
### 7.2 Scalability Considerations
- **Horizontal Worker Scaling**: Add workers without admin server changes
- **Admin Server HA**: Master-slave admin servers for fault tolerance
- **Task Partitioning**: Partition tasks by collection or datacenter
- **Batch Operations**: Group similar tasks for efficiency
## 8. Implementation Plan
### Phase 1: Core Infrastructure
1. Admin server basic framework
2. Worker registration and heartbeat
3. Simple task assignment
4. Basic progress tracking
### Phase 2: Advanced Features
1. Volume state reconciliation
2. Sophisticated worker selection
3. Failure detection and recovery
4. Duplicate prevention
### Phase 3: Optimization & Monitoring
1. Performance metrics
2. Load balancing algorithms
3. Capacity planning integration
4. Comprehensive monitoring
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.