1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/admin/topology/capacity.go
Chris Lu 0ecb466eda
Admin: refactoring active topology (#7073)
* refactoring

* add ec shard size

* address comments

* passing task id

There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way.

This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature.

A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution.

* 1 source multiple destinations

* task supports multi source and destination

* ec needs to clean up previous shards

* use erasure coding constants

* getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe  should return StorageSlotChange for calculation

* use CanAccommodate to calculate

* remove dead code

* address comments

* fix Mutex Copying in Protobuf Structs

* use constants

* fix estimatedSize

The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize.

* at.assignTaskToDisk(task)

* refactoring

* Update weed/admin/topology/internal.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fail fast

* fix compilation

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* indexes for volume and shard locations

* dedup with ToVolumeSlots

* return an additional boolean to indicate success, or an error

* Update abstract_sql_store.go

* fix

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/topology/task_management.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* faster findVolumeDisk

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/admin/topology/storage_slot_test.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* refactor

* simplify

* remove unused GetDiskStorageImpact function

* refactor

* add comments

* Update weed/admin/topology/storage_impact.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/topology/storage_slot_test.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update storage_impact.go

* AddPendingTask

The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-08-03 01:35:38 -07:00

300 lines
10 KiB
Go

package topology
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
// This considers BOTH pending and assigned tasks for capacity reservation.
//
// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
//
// The calculation includes:
// - Pending tasks: Reserve capacity immediately when added
// - Assigned tasks: Continue to reserve capacity during execution
// - Recently completed tasks are NOT counted against capacity
func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return 0
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return 0
}
// Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(capacity.VolumeSlots)
}
// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
// This provides granular information about available volume slots and shard slots
func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
return at.getEffectiveAvailableCapacityUnsafe(disk)
}
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
// This shows the net impact from all pending and assigned tasks
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
return at.getEffectiveCapacityUnsafe(disk)
}
// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
//
// Parameters:
// - taskType: type of task to check compatibility for
// - excludeNodeID: node to exclude from results
// - minCapacity: minimum effective capacity required (in volume slots)
//
// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
// Only include disks that meet minimum capacity requirement
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with current capacity information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
// This helps avoid over-scheduling tasks to the same disk
func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
// Consider both pending and active tasks for scheduling decisions
if at.isDiskAvailableForPlanning(disk, taskType) {
// Check if disk can accommodate new task considering pending tasks
planningCapacity := at.getPlanningCapacityUnsafe(disk)
if int64(planningCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with planning information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return false
}
// Check basic availability
if !at.isDiskAvailable(disk, taskType) {
return false
}
// Check effective capacity
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
}
// getPlanningCapacityUnsafe considers both pending and active tasks for planning
func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
// Use the centralized helper function to calculate task storage impact
totalImpact := at.calculateTaskStorageImpact(disk)
// Calculate available capacity considering impact (negative impact reduces availability)
availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}
// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
// Check total load including pending tasks
totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if totalLoad >= MaxTotalTaskLoadPerDisk {
return false
}
// Check for conflicting task types in active tasks only
for _, task := range disk.assignedTasks {
if at.areTaskTypesConflicting(task.TaskType, taskType) {
return false
}
}
return true
}
// calculateTaskStorageImpact is a helper function that calculates the total storage impact
// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
// between multiple capacity calculation functions.
func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
totalImpact := StorageSlotChange{}
// Process both pending and assigned tasks with identical logic
taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
for _, taskList := range taskLists {
for _, task := range taskList {
// Calculate impact for all source locations
for _, source := range task.Sources {
if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
totalImpact.AddInPlace(source.StorageChange)
}
}
// Calculate impact for all destination locations
for _, dest := range task.Destinations {
if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
totalImpact.AddInPlace(dest.StorageChange)
}
}
}
}
return totalImpact
}
// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
// Returns StorageSlotChange representing the net impact from all tasks
func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
return at.calculateTaskStorageImpact(disk)
}
// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
netImpact := at.getEffectiveCapacityUnsafe(disk)
// Calculate available volume slots (negative impact reduces availability)
availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}