1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-17 01:22:47 +02:00
seaweedfs/weed/admin/maintenance/pending_operations.go
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

311 lines
9.5 KiB
Go

package maintenance
import (
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// PendingOperationType represents the type of pending operation
type PendingOperationType string
const (
OpTypeVolumeMove PendingOperationType = "volume_move"
OpTypeVolumeBalance PendingOperationType = "volume_balance"
OpTypeErasureCoding PendingOperationType = "erasure_coding"
OpTypeVacuum PendingOperationType = "vacuum"
OpTypeReplication PendingOperationType = "replication"
)
// PendingOperation represents a pending volume/shard operation
type PendingOperation struct {
VolumeID uint32 `json:"volume_id"`
OperationType PendingOperationType `json:"operation_type"`
SourceNode string `json:"source_node"`
DestNode string `json:"dest_node,omitempty"` // Empty for non-movement operations
TaskID string `json:"task_id"`
StartTime time.Time `json:"start_time"`
EstimatedSize uint64 `json:"estimated_size"` // Bytes
Collection string `json:"collection"`
Status string `json:"status"` // "assigned", "in_progress", "completing"
}
// PendingOperations tracks all pending volume/shard operations
type PendingOperations struct {
// Operations by volume ID for conflict detection
byVolumeID map[uint32]*PendingOperation
// Operations by task ID for updates
byTaskID map[string]*PendingOperation
// Operations by node for capacity calculations
bySourceNode map[string][]*PendingOperation
byDestNode map[string][]*PendingOperation
mutex sync.RWMutex
}
// NewPendingOperations creates a new pending operations tracker
func NewPendingOperations() *PendingOperations {
return &PendingOperations{
byVolumeID: make(map[uint32]*PendingOperation),
byTaskID: make(map[string]*PendingOperation),
bySourceNode: make(map[string][]*PendingOperation),
byDestNode: make(map[string][]*PendingOperation),
}
}
// AddOperation adds a pending operation
func (po *PendingOperations) AddOperation(op *PendingOperation) {
po.mutex.Lock()
defer po.mutex.Unlock()
// Check for existing operation on this volume
if existing, exists := po.byVolumeID[op.VolumeID]; exists {
glog.V(1).Infof("Replacing existing pending operation on volume %d: %s -> %s",
op.VolumeID, existing.TaskID, op.TaskID)
po.removeOperationUnlocked(existing)
}
// Add new operation
po.byVolumeID[op.VolumeID] = op
po.byTaskID[op.TaskID] = op
// Add to node indexes
po.bySourceNode[op.SourceNode] = append(po.bySourceNode[op.SourceNode], op)
if op.DestNode != "" {
po.byDestNode[op.DestNode] = append(po.byDestNode[op.DestNode], op)
}
glog.V(2).Infof("Added pending operation: volume %d, type %s, task %s, %s -> %s",
op.VolumeID, op.OperationType, op.TaskID, op.SourceNode, op.DestNode)
}
// RemoveOperation removes a completed operation
func (po *PendingOperations) RemoveOperation(taskID string) {
po.mutex.Lock()
defer po.mutex.Unlock()
if op, exists := po.byTaskID[taskID]; exists {
po.removeOperationUnlocked(op)
glog.V(2).Infof("Removed completed operation: volume %d, task %s", op.VolumeID, taskID)
}
}
// removeOperationUnlocked removes an operation (must hold lock)
func (po *PendingOperations) removeOperationUnlocked(op *PendingOperation) {
delete(po.byVolumeID, op.VolumeID)
delete(po.byTaskID, op.TaskID)
// Remove from source node list
if ops, exists := po.bySourceNode[op.SourceNode]; exists {
for i, other := range ops {
if other.TaskID == op.TaskID {
po.bySourceNode[op.SourceNode] = append(ops[:i], ops[i+1:]...)
break
}
}
}
// Remove from dest node list
if op.DestNode != "" {
if ops, exists := po.byDestNode[op.DestNode]; exists {
for i, other := range ops {
if other.TaskID == op.TaskID {
po.byDestNode[op.DestNode] = append(ops[:i], ops[i+1:]...)
break
}
}
}
}
}
// HasPendingOperationOnVolume checks if a volume has a pending operation
func (po *PendingOperations) HasPendingOperationOnVolume(volumeID uint32) bool {
po.mutex.RLock()
defer po.mutex.RUnlock()
_, exists := po.byVolumeID[volumeID]
return exists
}
// GetPendingOperationOnVolume returns the pending operation on a volume
func (po *PendingOperations) GetPendingOperationOnVolume(volumeID uint32) *PendingOperation {
po.mutex.RLock()
defer po.mutex.RUnlock()
return po.byVolumeID[volumeID]
}
// WouldConflictWithPending checks if a new operation would conflict with pending ones
func (po *PendingOperations) WouldConflictWithPending(volumeID uint32, opType PendingOperationType) bool {
po.mutex.RLock()
defer po.mutex.RUnlock()
if existing, exists := po.byVolumeID[volumeID]; exists {
// Volume already has a pending operation
glog.V(3).Infof("Volume %d conflict: already has %s operation (task %s)",
volumeID, existing.OperationType, existing.TaskID)
return true
}
return false
}
// GetPendingCapacityImpactForNode calculates pending capacity changes for a node
func (po *PendingOperations) GetPendingCapacityImpactForNode(nodeID string) (incoming uint64, outgoing uint64) {
po.mutex.RLock()
defer po.mutex.RUnlock()
// Calculate outgoing capacity (volumes leaving this node)
if ops, exists := po.bySourceNode[nodeID]; exists {
for _, op := range ops {
// Only count movement operations
if op.DestNode != "" {
outgoing += op.EstimatedSize
}
}
}
// Calculate incoming capacity (volumes coming to this node)
if ops, exists := po.byDestNode[nodeID]; exists {
for _, op := range ops {
incoming += op.EstimatedSize
}
}
return incoming, outgoing
}
// FilterVolumeMetricsExcludingPending filters out volumes with pending operations
func (po *PendingOperations) FilterVolumeMetricsExcludingPending(metrics []*types.VolumeHealthMetrics) []*types.VolumeHealthMetrics {
po.mutex.RLock()
defer po.mutex.RUnlock()
var filtered []*types.VolumeHealthMetrics
excludedCount := 0
for _, metric := range metrics {
if _, hasPending := po.byVolumeID[metric.VolumeID]; !hasPending {
filtered = append(filtered, metric)
} else {
excludedCount++
glog.V(3).Infof("Excluding volume %d from scan due to pending operation", metric.VolumeID)
}
}
if excludedCount > 0 {
glog.V(1).Infof("Filtered out %d volumes with pending operations from %d total volumes",
excludedCount, len(metrics))
}
return filtered
}
// GetNodeCapacityProjection calculates projected capacity for a node
func (po *PendingOperations) GetNodeCapacityProjection(nodeID string, currentUsed uint64, totalCapacity uint64) NodeCapacityProjection {
incoming, outgoing := po.GetPendingCapacityImpactForNode(nodeID)
projectedUsed := currentUsed + incoming - outgoing
projectedFree := totalCapacity - projectedUsed
return NodeCapacityProjection{
NodeID: nodeID,
CurrentUsed: currentUsed,
TotalCapacity: totalCapacity,
PendingIncoming: incoming,
PendingOutgoing: outgoing,
ProjectedUsed: projectedUsed,
ProjectedFree: projectedFree,
}
}
// GetAllPendingOperations returns all pending operations
func (po *PendingOperations) GetAllPendingOperations() []*PendingOperation {
po.mutex.RLock()
defer po.mutex.RUnlock()
var operations []*PendingOperation
for _, op := range po.byVolumeID {
operations = append(operations, op)
}
return operations
}
// UpdateOperationStatus updates the status of a pending operation
func (po *PendingOperations) UpdateOperationStatus(taskID string, status string) {
po.mutex.Lock()
defer po.mutex.Unlock()
if op, exists := po.byTaskID[taskID]; exists {
op.Status = status
glog.V(3).Infof("Updated operation status: task %s, volume %d -> %s", taskID, op.VolumeID, status)
}
}
// CleanupStaleOperations removes operations that have been running too long
func (po *PendingOperations) CleanupStaleOperations(maxAge time.Duration) int {
po.mutex.Lock()
defer po.mutex.Unlock()
cutoff := time.Now().Add(-maxAge)
var staleOps []*PendingOperation
for _, op := range po.byVolumeID {
if op.StartTime.Before(cutoff) {
staleOps = append(staleOps, op)
}
}
for _, op := range staleOps {
po.removeOperationUnlocked(op)
glog.Warningf("Removed stale pending operation: volume %d, task %s, age %v",
op.VolumeID, op.TaskID, time.Since(op.StartTime))
}
return len(staleOps)
}
// NodeCapacityProjection represents projected capacity for a node
type NodeCapacityProjection struct {
NodeID string `json:"node_id"`
CurrentUsed uint64 `json:"current_used"`
TotalCapacity uint64 `json:"total_capacity"`
PendingIncoming uint64 `json:"pending_incoming"`
PendingOutgoing uint64 `json:"pending_outgoing"`
ProjectedUsed uint64 `json:"projected_used"`
ProjectedFree uint64 `json:"projected_free"`
}
// GetStats returns statistics about pending operations
func (po *PendingOperations) GetStats() PendingOperationsStats {
po.mutex.RLock()
defer po.mutex.RUnlock()
stats := PendingOperationsStats{
TotalOperations: len(po.byVolumeID),
ByType: make(map[PendingOperationType]int),
ByStatus: make(map[string]int),
}
var totalSize uint64
for _, op := range po.byVolumeID {
stats.ByType[op.OperationType]++
stats.ByStatus[op.Status]++
totalSize += op.EstimatedSize
}
stats.TotalEstimatedSize = totalSize
return stats
}
// PendingOperationsStats provides statistics about pending operations
type PendingOperationsStats struct {
TotalOperations int `json:"total_operations"`
ByType map[PendingOperationType]int `json:"by_type"`
ByStatus map[string]int `json:"by_status"`
TotalEstimatedSize uint64 `json:"total_estimated_size"`
}