mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-27 05:52:48 +02:00
* add ui for maintenance * valid config loading. fix workers page. * refactor * grpc between admin and workers * add a long-running bidirectional grpc call between admin and worker * use the grpc call to heartbeat * use the grpc call to communicate * worker can remove the http client * admin uses http port + 10000 as its default grpc port * one task one package * handles connection failures gracefully with exponential backoff * grpc with insecure tls * grpc with optional tls * fix detecting tls * change time config from nano seconds to seconds * add tasks with 3 interfaces * compiles reducing hard coded * remove a couple of tasks * remove hard coded references * reduce hard coded values * remove hard coded values * remove hard coded from templ * refactor maintenance package * fix import cycle * simplify * simplify * auto register * auto register factory * auto register task types * self register types * refactor * simplify * remove one task * register ui * lazy init executor factories * use registered task types * DefaultWorkerConfig remove hard coded task types * remove more hard coded * implement get maintenance task * dynamic task configuration * "System Settings" should only have system level settings * adjust menu for tasks * ensure menu not collapsed * render job configuration well * use templ for ui of task configuration * fix ordering * fix bugs * saving duration in seconds * use value and unit for duration * Delete WORKER_REFACTORING_PLAN.md * Delete maintenance.json * Delete custom_worker_example.go * remove address from workers * remove old code from ec task * remove creating collection button * reconnect with exponential backoff * worker use security.toml * start admin server with tls info from security.toml * fix "weed admin" cli description
114 lines
3 KiB
Go
114 lines
3 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Scheduler implements erasure coding task scheduling
|
|
type Scheduler struct {
|
|
maxConcurrent int
|
|
enabled bool
|
|
}
|
|
|
|
// NewScheduler creates a new erasure coding scheduler
|
|
func NewScheduler() *Scheduler {
|
|
return &Scheduler{
|
|
maxConcurrent: 1, // Conservative default
|
|
enabled: false, // Conservative default
|
|
}
|
|
}
|
|
|
|
// GetTaskType returns the task type
|
|
func (s *Scheduler) GetTaskType() types.TaskType {
|
|
return types.TaskTypeErasureCoding
|
|
}
|
|
|
|
// CanScheduleNow determines if an erasure coding task can be scheduled now
|
|
func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
|
|
if !s.enabled {
|
|
return false
|
|
}
|
|
|
|
// Check if we have available workers
|
|
if len(availableWorkers) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Count running EC tasks
|
|
runningCount := 0
|
|
for _, runningTask := range runningTasks {
|
|
if runningTask.Type == types.TaskTypeErasureCoding {
|
|
runningCount++
|
|
}
|
|
}
|
|
|
|
// Check concurrency limit
|
|
if runningCount >= s.maxConcurrent {
|
|
glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
|
|
return false
|
|
}
|
|
|
|
// Check if any worker can handle EC tasks
|
|
for _, worker := range availableWorkers {
|
|
for _, capability := range worker.Capabilities {
|
|
if capability == types.TaskTypeErasureCoding {
|
|
glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetMaxConcurrent returns the maximum number of concurrent tasks
|
|
func (s *Scheduler) GetMaxConcurrent() int {
|
|
return s.maxConcurrent
|
|
}
|
|
|
|
// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
|
|
func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
|
|
return 24 * time.Hour // Don't repeat EC for 24 hours
|
|
}
|
|
|
|
// GetPriority returns the priority for this task
|
|
func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
|
|
return types.TaskPriorityLow // EC is not urgent
|
|
}
|
|
|
|
// WasTaskRecentlyCompleted checks if a similar task was recently completed
|
|
func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
|
|
// Don't repeat EC for 24 hours
|
|
interval := 24 * time.Hour
|
|
cutoff := now.Add(-interval)
|
|
|
|
for _, completedTask := range completedTasks {
|
|
if completedTask.Type == types.TaskTypeErasureCoding &&
|
|
completedTask.VolumeID == task.VolumeID &&
|
|
completedTask.Server == task.Server &&
|
|
completedTask.Status == types.TaskStatusCompleted &&
|
|
completedTask.CompletedAt != nil &&
|
|
completedTask.CompletedAt.After(cutoff) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// IsEnabled returns whether this task type is enabled
|
|
func (s *Scheduler) IsEnabled() bool {
|
|
return s.enabled
|
|
}
|
|
|
|
// Configuration setters
|
|
|
|
func (s *Scheduler) SetEnabled(enabled bool) {
|
|
s.enabled = enabled
|
|
}
|
|
|
|
func (s *Scheduler) SetMaxConcurrent(max int) {
|
|
s.maxConcurrent = max
|
|
}
|