1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-24 20:42:47 +02:00
seaweedfs/weed/command/worker.go
Chris Lu aa66852304
Admin UI add maintenance menu (#6944)
* add ui for maintenance

* valid config loading. fix workers page.

* refactor

* grpc between admin and workers

* add a long-running bidirectional grpc call between admin and worker
* use the grpc call to heartbeat
* use the grpc call to communicate
* worker can remove the http client
* admin uses http port + 10000 as its default grpc port

* one task one package

* handles connection failures gracefully with exponential backoff

* grpc with insecure tls

* grpc with optional tls

* fix detecting tls

* change time config from nano seconds to seconds

* add tasks with 3 interfaces

* compiles reducing hard coded

* remove a couple of tasks

* remove hard coded references

* reduce hard coded values

* remove hard coded values

* remove hard coded from templ

* refactor maintenance package

* fix import cycle

* simplify

* simplify

* auto register

* auto register factory

* auto register task types

* self register types

* refactor

* simplify

* remove one task

* register ui

* lazy init executor factories

* use registered task types

* DefaultWorkerConfig remove hard coded task types

* remove more hard coded

* implement get maintenance task

* dynamic task configuration

* "System Settings" should only have system level settings

* adjust menu for tasks

* ensure menu not collapsed

* render job configuration well

* use templ for ui of task configuration

* fix ordering

* fix bugs

* saving duration in seconds

* use value and unit for duration

* Delete WORKER_REFACTORING_PLAN.md

* Delete maintenance.json

* Delete custom_worker_example.go

* remove address from workers

* remove old code from ec task

* remove creating collection button

* reconnect with exponential backoff

* worker use security.toml

* start admin server with tls info from security.toml

* fix "weed admin" cli description
2025-07-06 13:57:02 -07:00

182 lines
5.9 KiB
Go

package command
import (
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
var cmdWorker = &Command{
UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>]",
Short: "start a maintenance worker to process cluster maintenance tasks",
Long: `Start a maintenance worker that connects to an admin server to process
maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes.
The worker ID and address are automatically generated.
The worker connects to the admin server via gRPC (admin HTTP port + 10000).
Examples:
weed worker -admin=localhost:23646
weed worker -admin=admin.example.com:23646
weed worker -admin=localhost:23646 -capabilities=vacuum,replication
weed worker -admin=localhost:23646 -maxConcurrent=4
`,
}
var (
workerAdminServer = cmdWorker.Flag.String("admin", "localhost:23646", "admin server address")
workerCapabilities = cmdWorker.Flag.String("capabilities", "vacuum,ec,remote,replication,balance", "comma-separated list of task types this worker can handle")
workerMaxConcurrent = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks")
workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval")
workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval")
)
func init() {
cmdWorker.Run = runWorker
// Set default capabilities from registered task types
// This happens after package imports have triggered auto-registration
tasks.SetDefaultCapabilitiesFromRegistry()
}
func runWorker(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
glog.Infof("Starting maintenance worker")
glog.Infof("Admin server: %s", *workerAdminServer)
glog.Infof("Capabilities: %s", *workerCapabilities)
// Parse capabilities
capabilities := parseCapabilities(*workerCapabilities)
if len(capabilities) == 0 {
glog.Fatalf("No valid capabilities specified")
return false
}
// Create worker configuration
config := &types.WorkerConfig{
AdminServer: *workerAdminServer,
Capabilities: capabilities,
MaxConcurrent: *workerMaxConcurrent,
HeartbeatInterval: *workerHeartbeatInterval,
TaskRequestInterval: *workerTaskRequestInterval,
}
// Create worker instance
workerInstance, err := worker.NewWorker(config)
if err != nil {
glog.Fatalf("Failed to create worker: %v", err)
return false
}
// Create admin client with LoadClientTLS
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")
adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption)
if err != nil {
glog.Fatalf("Failed to create admin client: %v", err)
return false
}
// Set admin client
workerInstance.SetAdminClient(adminClient)
// Start the worker
err = workerInstance.Start()
if err != nil {
glog.Fatalf("Failed to start worker: %v", err)
return false
}
// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
glog.Infof("Maintenance worker %s started successfully", workerInstance.ID())
glog.Infof("Press Ctrl+C to stop the worker")
// Wait for shutdown signal
<-sigChan
glog.Infof("Shutdown signal received, stopping worker...")
// Gracefully stop the worker
err = workerInstance.Stop()
if err != nil {
glog.Errorf("Error stopping worker: %v", err)
}
glog.Infof("Worker stopped")
return true
}
// parseCapabilities converts comma-separated capability string to task types
func parseCapabilities(capabilityStr string) []types.TaskType {
if capabilityStr == "" {
return nil
}
capabilityMap := map[string]types.TaskType{}
// Populate capabilityMap with registered task types
typesRegistry := tasks.GetGlobalTypesRegistry()
for taskType := range typesRegistry.GetAllDetectors() {
// Use the task type string directly as the key
capabilityMap[strings.ToLower(string(taskType))] = taskType
}
// Add common aliases for convenience
if taskType, exists := capabilityMap["erasure_coding"]; exists {
capabilityMap["ec"] = taskType
}
if taskType, exists := capabilityMap["remote_upload"]; exists {
capabilityMap["remote"] = taskType
}
if taskType, exists := capabilityMap["fix_replication"]; exists {
capabilityMap["replication"] = taskType
}
var capabilities []types.TaskType
parts := strings.Split(capabilityStr, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if taskType, exists := capabilityMap[part]; exists {
capabilities = append(capabilities, taskType)
} else {
glog.Warningf("Unknown capability: %s", part)
}
}
return capabilities
}
// Legacy compatibility types for backward compatibility
// These will be deprecated in future versions
// WorkerStatus represents the current status of a worker (deprecated)
type WorkerStatus struct {
WorkerID string `json:"worker_id"`
Address string `json:"address"`
Status string `json:"status"`
Capabilities []types.TaskType `json:"capabilities"`
MaxConcurrent int `json:"max_concurrent"`
CurrentLoad int `json:"current_load"`
LastHeartbeat time.Time `json:"last_heartbeat"`
CurrentTasks []types.Task `json:"current_tasks"`
Uptime time.Duration `json:"uptime"`
TasksCompleted int `json:"tasks_completed"`
TasksFailed int `json:"tasks_failed"`
}