1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-15 16:42:48 +02:00
seaweedfs/DESIGN.md
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

12 KiB

SeaweedFS Task Distribution System Design

Overview

This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.

System Architecture

High-Level Components

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Master        │◄──►│  Admin Server    │◄──►│   Workers       │
│                 │    │                  │    │                 │
│ - Volume Info   │    │ - Task Discovery │    │ - Task Exec     │
│ - Shard Status  │    │ - Task Assign    │    │ - Progress      │
│ - Heartbeats    │    │ - Progress Track │    │ - Error Report  │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                        │                        │
         │                        │                        │
         ▼                        ▼                        ▼
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Volume Servers  │    │ Volume Monitor   │    │ Task Execution  │
│                 │    │                  │    │                 │
│ - Store Volumes │    │ - Health Check   │    │ - EC Convert    │
│ - EC Shards     │    │ - Usage Stats    │    │ - Vacuum Clean  │
│ - Report Status │    │ - State Sync     │    │ - Status Report │
└─────────────────┘    └──────────────────┘    └─────────────────┘

1. Admin Server Design

1.1 Core Responsibilities

  • Task Discovery: Scan volumes to identify EC and vacuum candidates
  • Worker Management: Track available workers and their capabilities
  • Task Assignment: Match tasks to optimal workers
  • Progress Tracking: Monitor in-progress tasks for capacity planning
  • State Reconciliation: Sync with master server for volume state updates

1.2 Task Discovery Engine

type TaskDiscoveryEngine struct {
    masterClient   MasterClient
    volumeScanner  VolumeScanner
    taskDetectors  map[TaskType]TaskDetector
    scanInterval   time.Duration
}

type VolumeCandidate struct {
    VolumeID       uint32
    Server         string
    Collection     string
    TaskType       TaskType
    Priority       TaskPriority
    Reason         string
    DetectedAt     time.Time
    Parameters     map[string]interface{}
}

EC Detection Logic:

  • Find volumes >= 95% full and idle for > 1 hour
  • Exclude volumes already in EC format
  • Exclude volumes with ongoing operations
  • Prioritize by collection and age

Vacuum Detection Logic:

  • Find volumes with garbage ratio > 30%
  • Exclude read-only volumes
  • Exclude volumes with recent vacuum operations
  • Prioritize by garbage percentage

1.3 Worker Registry & Management

type WorkerRegistry struct {
    workers        map[string]*Worker
    capabilities   map[TaskType][]*Worker
    lastHeartbeat  map[string]time.Time
    taskAssignment map[string]*Task
    mutex          sync.RWMutex
}

type Worker struct {
    ID            string
    Address       string
    Capabilities  []TaskType
    MaxConcurrent int
    CurrentLoad   int
    Status        WorkerStatus
    LastSeen      time.Time
    Performance   WorkerMetrics
}

1.4 Task Assignment Algorithm

type TaskScheduler struct {
    registry       *WorkerRegistry
    taskQueue      *PriorityQueue
    inProgressTasks map[string]*InProgressTask
    volumeReservations map[uint32]*VolumeReservation
}

// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)

2. Worker Process Design

2.1 Worker Architecture

type MaintenanceWorker struct {
    id              string
    config          *WorkerConfig
    adminClient     AdminClient
    taskExecutors   map[TaskType]TaskExecutor
    currentTasks    map[string]*RunningTask
    registry        *TaskRegistry
    heartbeatTicker *time.Ticker
    requestTicker   *time.Ticker
}

2.2 Task Execution Framework

type TaskExecutor interface {
    Execute(ctx context.Context, task *Task) error
    EstimateTime(task *Task) time.Duration
    ValidateResources(task *Task) error
    GetProgress() float64
    Cancel() error
}

type ErasureCodingExecutor struct {
    volumeClient VolumeServerClient
    progress     float64
    cancelled    bool
}

type VacuumExecutor struct {
    volumeClient VolumeServerClient
    progress     float64
    cancelled    bool
}

2.3 Worker Capabilities & Registration

type WorkerCapabilities struct {
    SupportedTasks   []TaskType
    MaxConcurrent    int
    ResourceLimits   ResourceLimits
    PreferredServers []string  // Affinity for specific volume servers
}

type ResourceLimits struct {
    MaxMemoryMB      int64
    MaxDiskSpaceMB   int64
    MaxNetworkMbps   int64
    MaxCPUPercent    float64
}

3. Task Lifecycle Management

3.1 Task States

type TaskState string

const (
    TaskStatePending     TaskState = "pending"
    TaskStateAssigned    TaskState = "assigned"
    TaskStateInProgress  TaskState = "in_progress"
    TaskStateCompleted   TaskState = "completed"
    TaskStateFailed      TaskState = "failed"
    TaskStateCancelled   TaskState = "cancelled"
    TaskStateStuck       TaskState = "stuck"       // Taking too long
    TaskStateDuplicate   TaskState = "duplicate"   // Detected duplicate
)

3.2 Progress Tracking & Monitoring

type InProgressTask struct {
    Task           *Task
    WorkerID       string
    StartedAt      time.Time
    LastUpdate     time.Time
    Progress       float64
    EstimatedEnd   time.Time
    VolumeReserved bool  // Reserved for capacity planning
}

type TaskMonitor struct {
    inProgressTasks map[string]*InProgressTask
    timeoutChecker  *time.Ticker
    stuckDetector   *time.Ticker
    duplicateChecker *time.Ticker
}

4. Volume Capacity Reconciliation

4.1 Volume State Tracking

type VolumeStateManager struct {
    masterClient      MasterClient
    inProgressTasks   map[uint32]*InProgressTask  // VolumeID -> Task
    committedChanges  map[uint32]*VolumeChange    // Changes not yet in master
    reconcileInterval time.Duration
}

type VolumeChange struct {
    VolumeID     uint32
    ChangeType   ChangeType  // "ec_encoding", "vacuum_completed"
    OldCapacity  int64
    NewCapacity  int64
    TaskID       string
    CompletedAt  time.Time
    ReportedToMaster bool
}

4.2 Shard Assignment Integration

When the master needs to assign shards, it must consider:

  1. Current volume state from its own records
  2. In-progress capacity changes from admin server
  3. Committed but unreported changes from admin server
type CapacityOracle struct {
    adminServer   AdminServerClient
    masterState   *MasterVolumeState
    updateFreq    time.Duration
}

func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
    baseCapacity := o.masterState.GetCapacity(volumeID)
    
    // Adjust for in-progress tasks
    if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
        switch task.Type {
        case TaskTypeErasureCoding:
            // EC reduces effective capacity
            return baseCapacity / 2  // Simplified
        case TaskTypeVacuum:
            // Vacuum may increase available space
            return baseCapacity + int64(float64(baseCapacity) * 0.3)
        }
    }
    
    // Adjust for completed but unreported changes
    if change := o.adminServer.GetPendingChange(volumeID); change != nil {
        return change.NewCapacity
    }
    
    return baseCapacity
}

5. Error Handling & Recovery

5.1 Worker Failure Scenarios

type FailureHandler struct {
    taskRescheduler *TaskRescheduler
    workerMonitor   *WorkerMonitor
    alertManager    *AlertManager
}

// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion

5.2 Recovery Strategies

Worker Timeout Recovery:

  • Mark worker as inactive after 3 missed heartbeats
  • Reschedule all assigned tasks to other workers
  • Cleanup any partial state

Task Stuck Recovery:

  • Detect tasks with no progress for > 2x estimated time
  • Cancel stuck task and mark volume for cleanup
  • Reschedule if retry count < max_retries

Duplicate Task Prevention:

type DuplicateDetector struct {
    activeFingerprints map[string]bool  // VolumeID+TaskType
    recentCompleted    *LRUCache        // Recently completed tasks
}

func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
    fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
    return d.activeFingerprints[fingerprint] || 
           d.recentCompleted.Contains(fingerprint)
}

6. Simulation & Testing Framework

6.1 Failure Simulation

type TaskSimulator struct {
    scenarios map[string]SimulationScenario
}

type SimulationScenario struct {
    Name            string
    WorkerCount     int
    VolumeCount     int
    FailurePatterns []FailurePattern
    Duration        time.Duration
}

type FailurePattern struct {
    Type        FailureType  // "worker_timeout", "task_stuck", "duplicate"
    Probability float64      // 0.0 to 1.0
    Timing      TimingSpec   // When during task execution
    Duration    time.Duration
}

6.2 Test Scenarios

Scenario 1: Worker Timeout During EC

  • Start EC task on 30GB volume
  • Kill worker at 50% progress
  • Verify task reassignment
  • Verify no duplicate EC operations

Scenario 2: Stuck Vacuum Task

  • Start vacuum on high-garbage volume
  • Simulate worker hanging at 75% progress
  • Verify timeout detection and cleanup
  • Verify volume state consistency

Scenario 3: Duplicate Task Prevention

  • Submit same EC task from multiple sources
  • Verify only one task executes
  • Verify proper conflict resolution

Scenario 4: Master-Admin State Divergence

  • Create in-progress EC task
  • Simulate master restart
  • Verify state reconciliation
  • Verify shard assignment accounts for in-progress work

7. Performance & Scalability

7.1 Metrics & Monitoring

type SystemMetrics struct {
    TasksPerSecond     float64
    WorkerUtilization  float64
    AverageTaskTime    time.Duration
    FailureRate        float64
    QueueDepth         int
    VolumeStatesSync   bool
}

7.2 Scalability Considerations

  • Horizontal Worker Scaling: Add workers without admin server changes
  • Admin Server HA: Master-slave admin servers for fault tolerance
  • Task Partitioning: Partition tasks by collection or datacenter
  • Batch Operations: Group similar tasks for efficiency

8. Implementation Plan

Phase 1: Core Infrastructure

  1. Admin server basic framework
  2. Worker registration and heartbeat
  3. Simple task assignment
  4. Basic progress tracking

Phase 2: Advanced Features

  1. Volume state reconciliation
  2. Sophisticated worker selection
  3. Failure detection and recovery
  4. Duplicate prevention

Phase 3: Optimization & Monitoring

  1. Performance metrics
  2. Load balancing algorithms
  3. Capacity planning integration
  4. Comprehensive monitoring

This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.