1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-09 21:02:46 +02:00
seaweedfs/weed/admin/topology/internal.go
Chris Lu 0ecb466eda
Admin: refactoring active topology (#7073)
* refactoring

* add ec shard size

* address comments

* passing task id

There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way.

This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature.

A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution.

* 1 source multiple destinations

* task supports multi source and destination

* ec needs to clean up previous shards

* use erasure coding constants

* getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe  should return StorageSlotChange for calculation

* use CanAccommodate to calculate

* remove dead code

* address comments

* fix Mutex Copying in Protobuf Structs

* use constants

* fix estimatedSize

The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize.

* at.assignTaskToDisk(task)

* refactoring

* Update weed/admin/topology/internal.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fail fast

* fix compilation

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* indexes for volume and shard locations

* dedup with ToVolumeSlots

* return an additional boolean to indicate success, or an error

* Update abstract_sql_store.go

* fix

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/topology/task_management.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* faster findVolumeDisk

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/admin/topology/storage_slot_test.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* refactor

* simplify

* remove unused GetDiskStorageImpact function

* refactor

* add comments

* Update weed/admin/topology/storage_impact.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/topology/storage_slot_test.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update storage_impact.go

* AddPendingTask

The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-08-03 01:35:38 -07:00

114 lines
3 KiB
Go

package topology
import (
"fmt"
"time"
)
// reassignTaskStates assigns tasks to the appropriate disks
func (at *ActiveTopology) reassignTaskStates() {
// Clear existing task assignments
for _, disk := range at.disks {
disk.pendingTasks = nil
disk.assignedTasks = nil
disk.recentTasks = nil
}
// Reassign pending tasks
for _, task := range at.pendingTasks {
at.assignTaskToDisk(task)
}
// Reassign assigned tasks
for _, task := range at.assignedTasks {
at.assignTaskToDisk(task)
}
// Reassign recent tasks
for _, task := range at.recentTasks {
at.assignTaskToDisk(task)
}
}
// assignTaskToDisk assigns a task to the appropriate disk(s)
func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
addedDisks := make(map[string]bool)
// Local helper function to assign task to a disk and avoid code duplication
assign := func(server string, diskID uint32) {
key := fmt.Sprintf("%s:%d", server, diskID)
if server == "" || addedDisks[key] {
return
}
if disk, exists := at.disks[key]; exists {
switch task.Status {
case TaskStatusPending:
disk.pendingTasks = append(disk.pendingTasks, task)
case TaskStatusInProgress:
disk.assignedTasks = append(disk.assignedTasks, task)
case TaskStatusCompleted:
disk.recentTasks = append(disk.recentTasks, task)
}
addedDisks[key] = true
}
}
// Assign to all source disks
for _, source := range task.Sources {
assign(source.SourceServer, source.SourceDisk)
}
// Assign to all destination disks (duplicates automatically avoided by helper)
for _, dest := range task.Destinations {
assign(dest.TargetServer, dest.TargetDisk)
}
}
// isDiskAvailable checks if a disk can accept new tasks
func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
// Check if disk has too many pending and active tasks
activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if activeLoad >= MaxConcurrentTasksPerDisk {
return false
}
// Check for conflicting task types
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// areTaskTypesConflicting checks if two task types conflict
func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
// Examples of conflicting task types
conflictMap := map[TaskType][]TaskType{
TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
}
if conflicts, exists := conflictMap[existing]; exists {
for _, conflictType := range conflicts {
if conflictType == new {
return true
}
}
}
return false
}
// cleanupRecentTasks removes old recent tasks
func (at *ActiveTopology) cleanupRecentTasks() {
cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
for taskID, task := range at.recentTasks {
if task.CompletedAt.Before(cutoff) {
delete(at.recentTasks, taskID)
}
}
}