1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/admin/topology/capacity.go
chrislu 751cfac7d7 Implement volume-aware task conflict checking
MAJOR IMPROVEMENT: Tasks now conflict by volume ID, not globally by task type

Changes:
- PRIMARY RULE: Tasks on the same volume ID always conflict (prevents race conditions)
- SECONDARY RULE: Minimal global task type conflicts (currently none)
- Add isDiskAvailableForVolume() for volume-specific availability checking
- Add GetAvailableDisksForVolume() and GetDisksWithEffectiveCapacityForVolume()
- Remove overly restrictive global task type conflicts
- Update planning functions to focus on capacity, not conflicts

Benefits:
 Multiple vacuum tasks can run on different volumes simultaneously
 Balance and erasure coding can run on different volumes
 Still prevents dangerous concurrent operations on same volume
 Much more efficient resource utilization
 Maintains data integrity and prevents race conditions

This addresses the user feedback that task conflicts should be volume-specific,
not global task type restrictions.
2025-08-10 18:02:42 -07:00

345 lines
12 KiB
Go

package topology
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
// This considers BOTH pending and assigned tasks for capacity reservation.
//
// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
//
// The calculation includes:
// - Pending tasks: Reserve capacity immediately when added
// - Assigned tasks: Continue to reserve capacity during execution
// - Recently completed tasks are NOT counted against capacity
func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return 0
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return 0
}
// Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(capacity.VolumeSlots)
}
// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
// This provides granular information about available volume slots and shard slots
func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
return at.getEffectiveAvailableCapacityUnsafe(disk)
}
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
// This shows the net impact from all pending and assigned tasks
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return StorageSlotChange{}
}
return at.getEffectiveCapacityUnsafe(disk)
}
// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
//
// Parameters:
// - taskType: type of task to check compatibility for
// - excludeNodeID: node to exclude from results
// - minCapacity: minimum effective capacity required (in volume slots)
//
// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailable(disk, taskType) {
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
// Only include disks that meet minimum capacity requirement
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with current capacity information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
// This helps avoid over-scheduling tasks to the same disk
func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
// Consider both pending and active tasks for scheduling decisions
if at.isDiskAvailableForPlanning(disk, taskType) {
// Check if disk can accommodate new task considering pending tasks
planningCapacity := at.getPlanningCapacityUnsafe(disk)
if int64(planningCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with planning information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}
// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
at.mutex.RLock()
defer at.mutex.RUnlock()
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
disk, exists := at.disks[diskKey]
if !exists {
return false
}
// Check basic availability
if !at.isDiskAvailable(disk, taskType) {
return false
}
// Check effective capacity
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
}
// getPlanningCapacityUnsafe considers both pending and active tasks for planning
func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
// Use the centralized helper function to calculate task storage impact
totalImpact := at.calculateTaskStorageImpact(disk)
// Calculate available capacity considering impact (negative impact reduces availability)
availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}
// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
// Check total load including pending tasks
totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
if totalLoad >= MaxTotalTaskLoadPerDisk {
return false
}
// For planning purposes, we only check capacity constraints
// Volume-specific conflicts will be checked when the actual task is scheduled
// with knowledge of the specific volume ID
return true
}
// calculateTaskStorageImpact is a helper function that calculates the total storage impact
// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
// between multiple capacity calculation functions.
func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
totalImpact := StorageSlotChange{}
// Process both pending and assigned tasks with identical logic
taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
for _, taskList := range taskLists {
for _, task := range taskList {
// Calculate impact for all source locations
for _, source := range task.Sources {
if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
totalImpact.AddInPlace(source.StorageChange)
}
}
// Calculate impact for all destination locations
for _, dest := range task.Destinations {
if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
totalImpact.AddInPlace(dest.StorageChange)
}
}
}
}
return totalImpact
}
// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
// Returns StorageSlotChange representing the net impact from all tasks
func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
return at.calculateTaskStorageImpact(disk)
}
// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
return StorageSlotChange{}
}
baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
netImpact := at.getEffectiveCapacityUnsafe(disk)
// Calculate available volume slots (negative impact reduces availability)
availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
if availableVolumeSlots < 0 {
availableVolumeSlots = 0
}
// Return detailed capacity information
return StorageSlotChange{
VolumeSlots: int32(availableVolumeSlots),
ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
}
}
// GetDisksWithEffectiveCapacityForVolume returns disks with effective capacity for a specific volume
// Uses volume-aware conflict checking to prevent race conditions on the same volume
func (at *ActiveTopology) GetDisksWithEffectiveCapacityForVolume(taskType TaskType, volumeID uint32, excludeNodeID string, minCapacity int64) []*DiskInfo {
at.mutex.RLock()
defer at.mutex.RUnlock()
var available []*DiskInfo
for _, disk := range at.disks {
if disk.NodeID == excludeNodeID {
continue // Skip excluded node
}
if at.isDiskAvailableForVolume(disk, taskType, volumeID) {
effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
// Only include disks that meet minimum capacity requirement
if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
// Create a new DiskInfo with current capacity information
diskCopy := DiskInfo{
NodeID: disk.DiskInfo.NodeID,
DiskID: disk.DiskInfo.DiskID,
DiskType: disk.DiskInfo.DiskType,
DataCenter: disk.DiskInfo.DataCenter,
Rack: disk.DiskInfo.Rack,
LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
}
// Create a new protobuf DiskInfo to avoid modifying the original
diskInfoCopy := &master_pb.DiskInfo{
DiskId: disk.DiskInfo.DiskInfo.DiskId,
MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
}
diskCopy.DiskInfo = diskInfoCopy
available = append(available, &diskCopy)
}
}
}
return available
}