1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-26 21:42:48 +02:00
seaweedfs/weed/worker/tasks/vacuum/vacuum_scheduler.go
Chris Lu aa66852304
Admin UI add maintenance menu (#6944)
* add ui for maintenance

* valid config loading. fix workers page.

* refactor

* grpc between admin and workers

* add a long-running bidirectional grpc call between admin and worker
* use the grpc call to heartbeat
* use the grpc call to communicate
* worker can remove the http client
* admin uses http port + 10000 as its default grpc port

* one task one package

* handles connection failures gracefully with exponential backoff

* grpc with insecure tls

* grpc with optional tls

* fix detecting tls

* change time config from nano seconds to seconds

* add tasks with 3 interfaces

* compiles reducing hard coded

* remove a couple of tasks

* remove hard coded references

* reduce hard coded values

* remove hard coded values

* remove hard coded from templ

* refactor maintenance package

* fix import cycle

* simplify

* simplify

* auto register

* auto register factory

* auto register task types

* self register types

* refactor

* simplify

* remove one task

* register ui

* lazy init executor factories

* use registered task types

* DefaultWorkerConfig remove hard coded task types

* remove more hard coded

* implement get maintenance task

* dynamic task configuration

* "System Settings" should only have system level settings

* adjust menu for tasks

* ensure menu not collapsed

* render job configuration well

* use templ for ui of task configuration

* fix ordering

* fix bugs

* saving duration in seconds

* use value and unit for duration

* Delete WORKER_REFACTORING_PLAN.md

* Delete maintenance.json

* Delete custom_worker_example.go

* remove address from workers

* remove old code from ec task

* remove creating collection button

* reconnect with exponential backoff

* worker use security.toml

* start admin server with tls info from security.toml

* fix "weed admin" cli description
2025-07-06 13:57:02 -07:00

111 lines
2.6 KiB
Go

package vacuum
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// VacuumScheduler implements vacuum task scheduling using code instead of schemas
type VacuumScheduler struct {
enabled bool
maxConcurrent int
minInterval time.Duration
}
// Compile-time interface assertions
var (
_ types.TaskScheduler = (*VacuumScheduler)(nil)
)
// NewVacuumScheduler creates a new simple vacuum scheduler
func NewVacuumScheduler() *VacuumScheduler {
return &VacuumScheduler{
enabled: true,
maxConcurrent: 2,
minInterval: 6 * time.Hour,
}
}
// GetTaskType returns the task type
func (s *VacuumScheduler) GetTaskType() types.TaskType {
return types.TaskTypeVacuum
}
// CanScheduleNow determines if a vacuum task can be scheduled right now
func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
// Check if scheduler is enabled
if !s.enabled {
return false
}
// Check concurrent limit
runningVacuumCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskTypeVacuum {
runningVacuumCount++
}
}
if runningVacuumCount >= s.maxConcurrent {
return false
}
// Check if there's an available worker with vacuum capability
for _, worker := range availableWorkers {
if worker.CurrentLoad < worker.MaxConcurrent {
for _, capability := range worker.Capabilities {
if capability == types.TaskTypeVacuum {
return true
}
}
}
}
return false
}
// GetPriority returns the priority for this task
func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority {
// Could adjust priority based on task parameters
if params, ok := task.Parameters["garbage_ratio"].(float64); ok {
if params > 0.8 {
return types.TaskPriorityHigh
}
}
return task.Priority
}
// GetMaxConcurrent returns max concurrent tasks of this type
func (s *VacuumScheduler) GetMaxConcurrent() int {
return s.maxConcurrent
}
// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks
func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration {
return s.minInterval
}
// IsEnabled returns whether this scheduler is enabled
func (s *VacuumScheduler) IsEnabled() bool {
return s.enabled
}
// Configuration setters
func (s *VacuumScheduler) SetEnabled(enabled bool) {
s.enabled = enabled
}
func (s *VacuumScheduler) SetMaxConcurrent(max int) {
s.maxConcurrent = max
}
func (s *VacuumScheduler) SetMinInterval(interval time.Duration) {
s.minInterval = interval
}
// GetMinInterval returns the minimum interval
func (s *VacuumScheduler) GetMinInterval() time.Duration {
return s.minInterval
}