mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-08-16 09:02:47 +02:00
but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted.
229 lines
7.5 KiB
Go
229 lines
7.5 KiB
Go
package command
|
|
|
|
import (
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
|
|
// Import task packages to trigger their auto-registration
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
|
|
)
|
|
|
|
var cmdWorker = &Command{
|
|
UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>] [-workingDir=<path>]",
|
|
Short: "start a maintenance worker to process cluster maintenance tasks",
|
|
Long: `Start a maintenance worker that connects to an admin server to process
|
|
maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes.
|
|
|
|
The worker ID and address are automatically generated.
|
|
The worker connects to the admin server via gRPC (admin HTTP port + 10000).
|
|
|
|
Examples:
|
|
weed worker -admin=localhost:23646
|
|
weed worker -admin=admin.example.com:23646
|
|
weed worker -admin=localhost:23646 -capabilities=vacuum,replication
|
|
weed worker -admin=localhost:23646 -maxConcurrent=4
|
|
weed worker -admin=localhost:23646 -workingDir=/tmp/worker
|
|
`,
|
|
}
|
|
|
|
var (
|
|
workerAdminServer = cmdWorker.Flag.String("admin", "localhost:23646", "admin server address")
|
|
workerCapabilities = cmdWorker.Flag.String("capabilities", "vacuum,ec,remote,replication,balance", "comma-separated list of task types this worker can handle")
|
|
workerMaxConcurrent = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks")
|
|
workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval")
|
|
workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval")
|
|
workerWorkingDir = cmdWorker.Flag.String("workingDir", "", "working directory for the worker")
|
|
)
|
|
|
|
func init() {
|
|
cmdWorker.Run = runWorker
|
|
|
|
// Set default capabilities from registered task types
|
|
// This happens after package imports have triggered auto-registration
|
|
tasks.SetDefaultCapabilitiesFromRegistry()
|
|
}
|
|
|
|
func runWorker(cmd *Command, args []string) bool {
|
|
util.LoadConfiguration("security", false)
|
|
|
|
glog.Infof("Starting maintenance worker")
|
|
glog.Infof("Admin server: %s", *workerAdminServer)
|
|
glog.Infof("Capabilities: %s", *workerCapabilities)
|
|
|
|
// Parse capabilities
|
|
capabilities := parseCapabilities(*workerCapabilities)
|
|
if len(capabilities) == 0 {
|
|
glog.Fatalf("No valid capabilities specified")
|
|
return false
|
|
}
|
|
|
|
// Set working directory and create task-specific subdirectories
|
|
var baseWorkingDir string
|
|
if *workerWorkingDir != "" {
|
|
glog.Infof("Setting working directory to: %s", *workerWorkingDir)
|
|
if err := os.Chdir(*workerWorkingDir); err != nil {
|
|
glog.Fatalf("Failed to change working directory: %v", err)
|
|
return false
|
|
}
|
|
wd, err := os.Getwd()
|
|
if err != nil {
|
|
glog.Fatalf("Failed to get working directory: %v", err)
|
|
return false
|
|
}
|
|
baseWorkingDir = wd
|
|
glog.Infof("Current working directory: %s", baseWorkingDir)
|
|
|
|
// Create task-specific subdirectories
|
|
for _, capability := range capabilities {
|
|
taskDir := filepath.Join(baseWorkingDir, string(capability))
|
|
if err := os.MkdirAll(taskDir, 0755); err != nil {
|
|
glog.Fatalf("Failed to create task directory %s: %v", taskDir, err)
|
|
return false
|
|
}
|
|
glog.Infof("Created task directory: %s", taskDir)
|
|
}
|
|
}
|
|
|
|
// Create gRPC dial option using TLS configuration
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")
|
|
|
|
// Create worker configuration
|
|
config := &types.WorkerConfig{
|
|
AdminServer: *workerAdminServer,
|
|
Capabilities: capabilities,
|
|
MaxConcurrent: *workerMaxConcurrent,
|
|
HeartbeatInterval: *workerHeartbeatInterval,
|
|
TaskRequestInterval: *workerTaskRequestInterval,
|
|
BaseWorkingDir: baseWorkingDir,
|
|
GrpcDialOption: grpcDialOption,
|
|
}
|
|
|
|
// Create worker instance
|
|
workerInstance, err := worker.NewWorker(config)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to create worker: %v", err)
|
|
return false
|
|
}
|
|
adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to create admin client: %v", err)
|
|
return false
|
|
}
|
|
|
|
// Set admin client
|
|
workerInstance.SetAdminClient(adminClient)
|
|
|
|
// Set working directory
|
|
if *workerWorkingDir != "" {
|
|
glog.Infof("Setting working directory to: %s", *workerWorkingDir)
|
|
if err := os.Chdir(*workerWorkingDir); err != nil {
|
|
glog.Fatalf("Failed to change working directory: %v", err)
|
|
return false
|
|
}
|
|
wd, err := os.Getwd()
|
|
if err != nil {
|
|
glog.Fatalf("Failed to get working directory: %v", err)
|
|
return false
|
|
}
|
|
glog.Infof("Current working directory: %s", wd)
|
|
}
|
|
|
|
// Start the worker
|
|
err = workerInstance.Start()
|
|
if err != nil {
|
|
glog.Errorf("Failed to start worker: %v", err)
|
|
return false
|
|
}
|
|
|
|
// Set up signal handling
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
glog.Infof("Maintenance worker %s started successfully", workerInstance.ID())
|
|
glog.Infof("Press Ctrl+C to stop the worker")
|
|
|
|
// Wait for shutdown signal
|
|
<-sigChan
|
|
glog.Infof("Shutdown signal received, stopping worker...")
|
|
|
|
// Gracefully stop the worker
|
|
err = workerInstance.Stop()
|
|
if err != nil {
|
|
glog.Errorf("Error stopping worker: %v", err)
|
|
}
|
|
glog.Infof("Worker stopped")
|
|
|
|
return true
|
|
}
|
|
|
|
// parseCapabilities converts comma-separated capability string to task types
|
|
func parseCapabilities(capabilityStr string) []types.TaskType {
|
|
if capabilityStr == "" {
|
|
return nil
|
|
}
|
|
|
|
capabilityMap := map[string]types.TaskType{}
|
|
|
|
// Populate capabilityMap with registered task types
|
|
typesRegistry := tasks.GetGlobalTypesRegistry()
|
|
for taskType := range typesRegistry.GetAllDetectors() {
|
|
// Use the task type string directly as the key
|
|
capabilityMap[strings.ToLower(string(taskType))] = taskType
|
|
}
|
|
|
|
// Add common aliases for convenience
|
|
if taskType, exists := capabilityMap["erasure_coding"]; exists {
|
|
capabilityMap["ec"] = taskType
|
|
}
|
|
if taskType, exists := capabilityMap["remote_upload"]; exists {
|
|
capabilityMap["remote"] = taskType
|
|
}
|
|
if taskType, exists := capabilityMap["fix_replication"]; exists {
|
|
capabilityMap["replication"] = taskType
|
|
}
|
|
|
|
var capabilities []types.TaskType
|
|
parts := strings.Split(capabilityStr, ",")
|
|
|
|
for _, part := range parts {
|
|
part = strings.TrimSpace(part)
|
|
if taskType, exists := capabilityMap[part]; exists {
|
|
capabilities = append(capabilities, taskType)
|
|
} else {
|
|
glog.Warningf("Unknown capability: %s", part)
|
|
}
|
|
}
|
|
|
|
return capabilities
|
|
}
|
|
|
|
// Legacy compatibility types for backward compatibility
|
|
// These will be deprecated in future versions
|
|
|
|
// WorkerStatus represents the current status of a worker (deprecated)
|
|
type WorkerStatus struct {
|
|
WorkerID string `json:"worker_id"`
|
|
Address string `json:"address"`
|
|
Status string `json:"status"`
|
|
Capabilities []types.TaskType `json:"capabilities"`
|
|
MaxConcurrent int `json:"max_concurrent"`
|
|
CurrentLoad int `json:"current_load"`
|
|
LastHeartbeat time.Time `json:"last_heartbeat"`
|
|
CurrentTasks []types.Task `json:"current_tasks"`
|
|
Uptime time.Duration `json:"uptime"`
|
|
TasksCompleted int `json:"tasks_completed"`
|
|
TasksFailed int `json:"tasks_failed"`
|
|
}
|