mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-09 21:02:46 +02:00
* refactoring * add ec shard size * address comments * passing task id There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way. This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature. A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution. * 1 source multiple destinations * task supports multi source and destination * ec needs to clean up previous shards * use erasure coding constants * getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe should return StorageSlotChange for calculation * use CanAccommodate to calculate * remove dead code * address comments * fix Mutex Copying in Protobuf Structs * use constants * fix estimatedSize The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize. * at.assignTaskToDisk(task) * refactoring * Update weed/admin/topology/internal.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fail fast * fix compilation * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * indexes for volume and shard locations * dedup with ToVolumeSlots * return an additional boolean to indicate success, or an error * Update abstract_sql_store.go * fix * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/task_management.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * faster findVolumeDisk * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor * simplify * remove unused GetDiskStorageImpact function * refactor * add comments * Update weed/admin/topology/storage_impact.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update storage_impact.go * AddPendingTask The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
300 lines
10 KiB
Go
300 lines
10 KiB
Go
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
|
|
// This considers BOTH pending and assigned tasks for capacity reservation.
|
|
//
|
|
// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
|
|
//
|
|
// The calculation includes:
|
|
// - Pending tasks: Reserve capacity immediately when added
|
|
// - Assigned tasks: Continue to reserve capacity during execution
|
|
// - Recently completed tasks are NOT counted against capacity
|
|
func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return 0
|
|
}
|
|
|
|
// Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
|
|
capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
return int64(capacity.VolumeSlots)
|
|
}
|
|
|
|
// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
|
|
// This provides granular information about available volume slots and shard slots
|
|
func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
return at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
}
|
|
|
|
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
|
|
// This shows the net impact from all pending and assigned tasks
|
|
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
return at.getEffectiveCapacityUnsafe(disk)
|
|
}
|
|
|
|
// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
|
|
// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
|
|
//
|
|
// Parameters:
|
|
// - taskType: type of task to check compatibility for
|
|
// - excludeNodeID: node to exclude from results
|
|
// - minCapacity: minimum effective capacity required (in volume slots)
|
|
//
|
|
// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
|
|
func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
var available []*DiskInfo
|
|
|
|
for _, disk := range at.disks {
|
|
if disk.NodeID == excludeNodeID {
|
|
continue // Skip excluded node
|
|
}
|
|
|
|
if at.isDiskAvailable(disk, taskType) {
|
|
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
|
|
// Only include disks that meet minimum capacity requirement
|
|
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
|
|
// Create a new DiskInfo with current capacity information
|
|
diskCopy := DiskInfo{
|
|
NodeID: disk.DiskInfo.NodeID,
|
|
DiskID: disk.DiskInfo.DiskID,
|
|
DiskType: disk.DiskInfo.DiskType,
|
|
DataCenter: disk.DiskInfo.DataCenter,
|
|
Rack: disk.DiskInfo.Rack,
|
|
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
|
|
}
|
|
|
|
// Create a new protobuf DiskInfo to avoid modifying the original
|
|
diskInfoCopy := &master_pb.DiskInfo{
|
|
DiskId: disk.DiskInfo.DiskInfo.DiskId,
|
|
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
|
|
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
|
|
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
|
|
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
|
|
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
|
|
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
|
|
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
|
|
}
|
|
diskCopy.DiskInfo = diskInfoCopy
|
|
|
|
available = append(available, &diskCopy)
|
|
}
|
|
}
|
|
}
|
|
|
|
return available
|
|
}
|
|
|
|
// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
|
|
// This helps avoid over-scheduling tasks to the same disk
|
|
func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
var available []*DiskInfo
|
|
|
|
for _, disk := range at.disks {
|
|
if disk.NodeID == excludeNodeID {
|
|
continue // Skip excluded node
|
|
}
|
|
|
|
// Consider both pending and active tasks for scheduling decisions
|
|
if at.isDiskAvailableForPlanning(disk, taskType) {
|
|
// Check if disk can accommodate new task considering pending tasks
|
|
planningCapacity := at.getPlanningCapacityUnsafe(disk)
|
|
|
|
if int64(planningCapacity.VolumeSlots) >= minCapacity {
|
|
// Create a new DiskInfo with planning information
|
|
diskCopy := DiskInfo{
|
|
NodeID: disk.DiskInfo.NodeID,
|
|
DiskID: disk.DiskInfo.DiskID,
|
|
DiskType: disk.DiskInfo.DiskType,
|
|
DataCenter: disk.DiskInfo.DataCenter,
|
|
Rack: disk.DiskInfo.Rack,
|
|
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
|
|
}
|
|
|
|
// Create a new protobuf DiskInfo to avoid modifying the original
|
|
diskInfoCopy := &master_pb.DiskInfo{
|
|
DiskId: disk.DiskInfo.DiskInfo.DiskId,
|
|
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
|
|
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
|
|
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
|
|
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
|
|
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
|
|
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
|
|
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
|
|
}
|
|
diskCopy.DiskInfo = diskInfoCopy
|
|
|
|
available = append(available, &diskCopy)
|
|
}
|
|
}
|
|
}
|
|
|
|
return available
|
|
}
|
|
|
|
// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
|
|
func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
disk, exists := at.disks[diskKey]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
// Check basic availability
|
|
if !at.isDiskAvailable(disk, taskType) {
|
|
return false
|
|
}
|
|
|
|
// Check effective capacity
|
|
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
|
|
return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
|
|
}
|
|
|
|
// getPlanningCapacityUnsafe considers both pending and active tasks for planning
|
|
func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
|
|
|
|
// Use the centralized helper function to calculate task storage impact
|
|
totalImpact := at.calculateTaskStorageImpact(disk)
|
|
|
|
// Calculate available capacity considering impact (negative impact reduces availability)
|
|
availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
|
|
if availableVolumeSlots < 0 {
|
|
availableVolumeSlots = 0
|
|
}
|
|
|
|
// Return detailed capacity information
|
|
return StorageSlotChange{
|
|
VolumeSlots: int32(availableVolumeSlots),
|
|
ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
|
|
}
|
|
}
|
|
|
|
// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
|
|
func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
|
|
// Check total load including pending tasks
|
|
totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
if totalLoad >= MaxTotalTaskLoadPerDisk {
|
|
return false
|
|
}
|
|
|
|
// Check for conflicting task types in active tasks only
|
|
for _, task := range disk.assignedTasks {
|
|
if at.areTaskTypesConflicting(task.TaskType, taskType) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// calculateTaskStorageImpact is a helper function that calculates the total storage impact
|
|
// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
|
|
// between multiple capacity calculation functions.
|
|
func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
totalImpact := StorageSlotChange{}
|
|
|
|
// Process both pending and assigned tasks with identical logic
|
|
taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
|
|
|
|
for _, taskList := range taskLists {
|
|
for _, task := range taskList {
|
|
// Calculate impact for all source locations
|
|
for _, source := range task.Sources {
|
|
if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
|
|
totalImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
}
|
|
|
|
// Calculate impact for all destination locations
|
|
for _, dest := range task.Destinations {
|
|
if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
|
|
totalImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return totalImpact
|
|
}
|
|
|
|
// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
|
|
// Returns StorageSlotChange representing the net impact from all tasks
|
|
func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
|
|
return at.calculateTaskStorageImpact(disk)
|
|
}
|
|
|
|
// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
|
|
func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return StorageSlotChange{}
|
|
}
|
|
|
|
baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
|
|
netImpact := at.getEffectiveCapacityUnsafe(disk)
|
|
|
|
// Calculate available volume slots (negative impact reduces availability)
|
|
availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
|
|
if availableVolumeSlots < 0 {
|
|
availableVolumeSlots = 0
|
|
}
|
|
|
|
// Return detailed capacity information
|
|
return StorageSlotChange{
|
|
VolumeSlots: int32(availableVolumeSlots),
|
|
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
|
|
}
|
|
}
|