mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-09 21:02:46 +02:00
* implement sse-c * fix Content-Range * adding tests * Update s3_sse_c_test.go * copy sse-c objects * adding tests * refactor * multi reader * remove extra write header call * refactor * SSE-C encrypted objects do not support HTTP Range requests * robust * fix server starts * Update Makefile * Update Makefile * ci: remove SSE-C integration tests and workflows; delete test/s3/encryption/ * s3: SSE-C MD5 must be base64 (case-sensitive); fix validation, comparisons, metadata storage; update tests * minor * base64 * Update SSE-C_IMPLEMENTATION.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/s3api/s3api_object_handlers.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update SSE-C_IMPLEMENTATION.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * address comments * fix test * fix compilation * Bucket Default Encryption To complete the SSE-KMS implementation for production use: Add AWS KMS Provider - Implement weed/kms/aws/aws_kms.go using AWS SDK Integrate with S3 Handlers - Update PUT/GET object handlers to use SSE-KMS Add Multipart Upload Support - Extend SSE-KMS to multipart uploads Configuration Integration - Add KMS configuration to filer.toml Documentation - Update SeaweedFS wiki with SSE-KMS usage examples * store bucket sse config in proto * add more tests * Update SSE-C_IMPLEMENTATION.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Fix rebase errors and restore structured BucketMetadata API Merge Conflict Fixes: - Fixed merge conflicts in header.go (SSE-C and SSE-KMS headers) - Fixed merge conflicts in s3api_errors.go (SSE-C and SSE-KMS error codes) - Fixed merge conflicts in s3_sse_c.go (copy strategy constants) - Fixed merge conflicts in s3api_object_handlers_copy.go (copy strategy usage) API Restoration: - Restored BucketMetadata struct with Tags, CORS, and Encryption fields - Restored structured API functions: GetBucketMetadata, SetBucketMetadata, UpdateBucketMetadata - Restored helper functions: UpdateBucketTags, UpdateBucketCORS, UpdateBucketEncryption - Restored clear functions: ClearBucketTags, ClearBucketCORS, ClearBucketEncryption Handler Updates: - Updated GetBucketTaggingHandler to use GetBucketMetadata() directly - Updated PutBucketTaggingHandler to use UpdateBucketTags() - Updated DeleteBucketTaggingHandler to use ClearBucketTags() - Updated CORS handlers to use UpdateBucketCORS() and ClearBucketCORS() - Updated loadCORSFromBucketContent to use GetBucketMetadata() Internal Function Updates: - Updated getBucketMetadata() to return *BucketMetadata struct - Updated setBucketMetadata() to accept *BucketMetadata struct - Updated getBucketEncryptionMetadata() to use GetBucketMetadata() - Updated setBucketEncryptionMetadata() to use SetBucketMetadata() Benefits: - Resolved all rebase conflicts while preserving both SSE-C and SSE-KMS functionality - Maintained consistent structured API throughout the codebase - Eliminated intermediate wrapper functions for cleaner code - Proper error handling with better granularity - All tests passing and build successful The bucket metadata system now uses a unified, type-safe, structured API that supports tags, CORS, and encryption configuration consistently. * Fix updateEncryptionConfiguration for first-time bucket encryption setup - Change getBucketEncryptionMetadata to getBucketMetadata to avoid failures when no encryption config exists - Change setBucketEncryptionMetadata to setBucketMetadataWithEncryption for consistency - This fixes the critical issue where bucket encryption configuration failed for buckets without existing encryption Fixes: https://github.com/seaweedfs/seaweedfs/pull/7144#discussion_r2285669572 * Fix rebase conflicts and maintain structured BucketMetadata API Resolved Conflicts: - Fixed merge conflicts in s3api_bucket_config.go between structured API (HEAD) and old intermediate functions - Kept modern structured API approach: UpdateBucketCORS, ClearBucketCORS, UpdateBucketEncryption - Removed old intermediate functions: setBucketTags, deleteBucketTags, setBucketMetadataWithEncryption API Consistency Maintained: - updateCORSConfiguration: Uses UpdateBucketCORS() directly - removeCORSConfiguration: Uses ClearBucketCORS() directly - updateEncryptionConfiguration: Uses UpdateBucketEncryption() directly - All structured API functions preserved: GetBucketMetadata, SetBucketMetadata, UpdateBucketMetadata Benefits: - Maintains clean separation between API layers - Preserves atomic metadata updates with proper error handling - Eliminates function indirection for better performance - Consistent API usage pattern throughout codebase - All tests passing and build successful The bucket metadata system continues to use the unified, type-safe, structured API that properly handles tags, CORS, and encryption configuration without any intermediate wrapper functions. * Fix complex rebase conflicts and maintain clean structured BucketMetadata API Resolved Complex Conflicts: - Fixed merge conflicts between modern structured API (HEAD) and mixed approach - Removed duplicate function declarations that caused compilation errors - Consistently chose structured API approach over intermediate functions Fixed Functions: - BucketMetadata struct: Maintained clean field alignment - loadCORSFromBucketContent: Uses GetBucketMetadata() directly - updateCORSConfiguration: Uses UpdateBucketCORS() directly - removeCORSConfiguration: Uses ClearBucketCORS() directly - getBucketMetadata: Returns *BucketMetadata struct consistently - setBucketMetadata: Accepts *BucketMetadata struct consistently Removed Duplicates: - Eliminated duplicate GetBucketMetadata implementations - Eliminated duplicate SetBucketMetadata implementations - Eliminated duplicate UpdateBucketMetadata implementations - Eliminated duplicate helper functions (UpdateBucketTags, etc.) API Consistency Achieved: - Single, unified BucketMetadata struct for all operations - Atomic updates through UpdateBucketMetadata with function callbacks - Type-safe operations with proper error handling - No intermediate wrapper functions cluttering the API Benefits: - Clean, maintainable codebase with no function duplication - Consistent structured API usage throughout all bucket operations - Proper error handling and type safety - Build successful and all tests passing The bucket metadata system now has a completely clean, structured API without any conflicts, duplicates, or inconsistencies. * Update remaining functions to use new structured BucketMetadata APIs directly Updated functions to follow the pattern established in bucket config: - getEncryptionConfiguration() -> Uses GetBucketMetadata() directly - removeEncryptionConfiguration() -> Uses ClearBucketEncryption() directly Benefits: - Consistent API usage pattern across all bucket metadata operations - Simpler, more readable code that leverages the structured API - Eliminates calls to intermediate legacy functions - Better error handling and logging consistency - All tests pass with improved functionality This completes the transition to using the new structured BucketMetadata API throughout the entire bucket configuration and encryption subsystem. * Fix GitHub PR #7144 code review comments Address all code review comments from Gemini Code Assist bot: 1. **High Priority - SSE-KMS Key Validation**: Fixed ValidateSSEKMSKey to allow empty KMS key ID - Empty key ID now indicates use of default KMS key (consistent with AWS behavior) - Updated ParseSSEKMSHeaders to call validation after parsing - Enhanced isValidKMSKeyID to reject keys with spaces and invalid characters 2. **Medium Priority - KMS Registry Error Handling**: Improved error collection in CloseAll - Now collects all provider close errors instead of only returning the last one - Uses proper error formatting with %w verb for error wrapping - Returns single error for one failure, combined message for multiple failures 3. **Medium Priority - Local KMS Aliases Consistency**: Fixed alias handling in CreateKey - Now updates the aliases slice in-place to maintain consistency - Ensures both p.keys map and key.Aliases slice use the same prefixed format All changes maintain backward compatibility and improve error handling robustness. Tests updated and passing for all scenarios including edge cases. * Use errors.Join for KMS registry error handling Replace manual string building with the more idiomatic errors.Join function: - Removed manual error message concatenation with strings.Builder - Simplified error handling logic by using errors.Join(allErrors...) - Removed unnecessary string import - Added errors import for errors.Join This approach is cleaner, more idiomatic, and automatically handles: - Returning nil for empty error slice - Returning single error for one-element slice - Properly formatting multiple errors with newlines The errors.Join function was introduced in Go 1.20 and is the recommended way to combine multiple errors. * Update registry.go * Fix GitHub PR #7144 latest review comments Address all new code review comments from Gemini Code Assist bot: 1. **High Priority - SSE-KMS Detection Logic**: Tightened IsSSEKMSEncrypted function - Now relies only on the canonical x-amz-server-side-encryption header - Removed redundant check for x-amz-encrypted-data-key metadata - Prevents misinterpretation of objects with inconsistent metadata state - Updated test case to reflect correct behavior (encrypted data key only = false) 2. **Medium Priority - UUID Validation**: Enhanced KMS key ID validation - Replaced simplistic length/hyphen count check with proper regex validation - Added regexp import for robust UUID format checking - Regex pattern: ^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$ - Prevents invalid formats like '------------------------------------' from passing 3. **Medium Priority - Alias Mutation Fix**: Avoided input slice modification - Changed CreateKey to not mutate the input aliases slice in-place - Uses local variable for modified alias to prevent side effects - Maintains backward compatibility while being safer for callers All changes improve code robustness and follow AWS S3 standards more closely. Tests updated and passing for all scenarios including edge cases. * Fix failing SSE tests Address two failing test cases: 1. **TestSSEHeaderConflicts**: Fixed SSE-C and SSE-KMS mutual exclusion - Modified IsSSECRequest to return false if SSE-KMS headers are present - Modified IsSSEKMSRequest to return false if SSE-C headers are present - This prevents both detection functions from returning true simultaneously - Aligns with AWS S3 behavior where SSE-C and SSE-KMS are mutually exclusive 2. **TestBucketEncryptionEdgeCases**: Fixed XML namespace validation - Added namespace validation in encryptionConfigFromXMLBytes function - Now rejects XML with invalid namespaces (only allows empty or AWS standard namespace) - Validates XMLName.Space to ensure proper XML structure - Prevents acceptance of malformed XML with incorrect namespaces Both fixes improve compliance with AWS S3 standards and prevent invalid configurations from being accepted. All SSE and bucket encryption tests now pass successfully. * Fix GitHub PR #7144 latest review comments Address two new code review comments from Gemini Code Assist bot: 1. **High Priority - Race Condition in UpdateBucketMetadata**: Fixed thread safety issue - Added per-bucket locking mechanism to prevent race conditions - Introduced bucketMetadataLocks map with RWMutex for each bucket - Added getBucketMetadataLock helper with double-checked locking pattern - UpdateBucketMetadata now uses bucket-specific locks to serialize metadata updates - Prevents last-writer-wins scenarios when concurrent requests update different metadata parts 2. **Medium Priority - KMS Key ARN Validation**: Improved robustness of ARN validation - Enhanced isValidKMSKeyID function to strictly validate ARN structure - Changed from 'len(parts) >= 6' to 'len(parts) != 6' for exact part count - Added proper resource validation for key/ and alias/ prefixes - Prevents malformed ARNs with incorrect structure from being accepted - Now validates: arn:aws:kms:region:account:key/keyid or arn:aws:kms:region:account:alias/aliasname Both fixes improve system reliability and prevent edge cases that could cause data corruption or security issues. All existing tests continue to pass. * format * address comments * Configuration Adapter * Regex Optimization * Caching Integration * add negative cache for non-existent buckets * remove bucketMetadataLocks * address comments * address comments * copying objects with sse-kms * copying strategy * store IV in entry metadata * implement compression reader * extract json map as sse kms context * bucket key * comments * rotate sse chunks * KMS Data Keys use AES-GCM + nonce * add comments * Update weed/s3api/s3_sse_kms.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update s3api_object_handlers_put.go * get IV from response header * set sse headers * Update s3api_object_handlers.go * deterministic JSON marshaling * store iv in entry metadata * address comments * not used * store iv in destination metadata ensures that SSE-C copy operations with re-encryption (decrypt/re-encrypt scenario) now properly store the destination encryption metadata * add todo * address comments * SSE-S3 Deserialization * add BucketKMSCache to BucketConfig * fix test compilation * already not empty * use constants * fix: critical metadata (encrypted data keys, encryption context, etc.) was never stored during PUT/copy operations * address comments * fix tests * Fix SSE-KMS Copy Re-encryption * Cache now persists across requests * fix test * iv in metadata only * SSE-KMS copy operations should follow the same pattern as SSE-C * fix size overhead calculation * Filer-Side SSE Metadata Processing * SSE Integration Tests * fix tests * clean up * Update s3_sse_multipart_test.go * add s3 sse tests * unused * add logs * Update Makefile * Update Makefile * s3 health check * The tests were failing because they tried to run both SSE-C and SSE-KMS tests * Update weed/s3api/s3_sse_c.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update Makefile * add back * Update Makefile * address comments * fix tests * Update s3-sse-tests.yml * Update s3-sse-tests.yml * fix sse-kms for PUT operation * IV * Update auth_credentials.go * fix multipart with kms * constants * multipart sse kms Modified handleSSEKMSResponse to detect multipart SSE-KMS objects Added createMultipartSSEKMSDecryptedReader to handle each chunk independently Each chunk now gets its own decrypted reader before combining into the final stream * validate key id * add SSEType * permissive kms key format * Update s3_sse_kms_test.go * format * assert equal * uploading SSE-KMS metadata per chunk * persist sse type and metadata * avoid re-chunk multipart uploads * decryption process to use stored PartOffset values * constants * sse-c multipart upload * Unified Multipart SSE Copy * purge * fix fatalf * avoid io.MultiReader which does not close underlying readers * unified cross-encryption * fix Single-object SSE-C * adjust constants * range read sse files * remove debug logs --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
837 lines
27 KiB
Go
837 lines
27 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
|
|
// Import task packages to trigger their auto-registration
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
|
|
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
|
|
)
|
|
|
|
// Worker represents a maintenance worker instance
|
|
type Worker struct {
|
|
id string
|
|
config *types.WorkerConfig
|
|
registry *tasks.TaskRegistry
|
|
currentTasks map[string]*types.TaskInput
|
|
adminClient AdminClient
|
|
running bool
|
|
stopChan chan struct{}
|
|
mutex sync.RWMutex
|
|
startTime time.Time
|
|
tasksCompleted int
|
|
tasksFailed int
|
|
heartbeatTicker *time.Ticker
|
|
requestTicker *time.Ticker
|
|
taskLogHandler *tasks.TaskLogHandler
|
|
}
|
|
|
|
// AdminClient defines the interface for communicating with the admin server
|
|
type AdminClient interface {
|
|
Connect() error
|
|
Disconnect() error
|
|
RegisterWorker(worker *types.WorkerData) error
|
|
SendHeartbeat(workerID string, status *types.WorkerStatus) error
|
|
RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error)
|
|
CompleteTask(taskID string, success bool, errorMsg string) error
|
|
CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error
|
|
UpdateTaskProgress(taskID string, progress float64) error
|
|
IsConnected() bool
|
|
}
|
|
|
|
// GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory
|
|
func GenerateOrLoadWorkerID(workingDir string) (string, error) {
|
|
const workerIDFile = "worker.id"
|
|
|
|
var idFilePath string
|
|
if workingDir != "" {
|
|
idFilePath = filepath.Join(workingDir, workerIDFile)
|
|
} else {
|
|
// Use current working directory if none specified
|
|
wd, err := os.Getwd()
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get working directory: %w", err)
|
|
}
|
|
idFilePath = filepath.Join(wd, workerIDFile)
|
|
}
|
|
|
|
// Try to read existing worker ID
|
|
if data, err := os.ReadFile(idFilePath); err == nil {
|
|
workerID := strings.TrimSpace(string(data))
|
|
if workerID != "" {
|
|
glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID)
|
|
return workerID, nil
|
|
}
|
|
}
|
|
|
|
// Generate simplified worker ID
|
|
hostname, _ := os.Hostname()
|
|
if hostname == "" {
|
|
hostname = "unknown"
|
|
}
|
|
|
|
// Use short hostname - take first 6 chars or last part after dots
|
|
shortHostname := hostname
|
|
if len(hostname) > 6 {
|
|
if parts := strings.Split(hostname, "."); len(parts) > 1 {
|
|
// Use last part before domain (e.g., "worker1" from "worker1.example.com")
|
|
shortHostname = parts[0]
|
|
if len(shortHostname) > 6 {
|
|
shortHostname = shortHostname[:6]
|
|
}
|
|
} else {
|
|
// Use first 6 characters
|
|
shortHostname = hostname[:6]
|
|
}
|
|
}
|
|
|
|
// Generate random component for uniqueness (2 bytes = 4 hex chars)
|
|
randomBytes := make([]byte, 2)
|
|
var workerID string
|
|
if _, err := rand.Read(randomBytes); err != nil {
|
|
// Fallback to short timestamp if crypto/rand fails
|
|
timestamp := time.Now().Unix() % 10000 // last 4 digits
|
|
workerID = fmt.Sprintf("w-%s-%04d", shortHostname, timestamp)
|
|
glog.Infof("Generated fallback worker ID: %s", workerID)
|
|
} else {
|
|
// Use random hex for uniqueness
|
|
randomHex := fmt.Sprintf("%x", randomBytes)
|
|
workerID = fmt.Sprintf("w-%s-%s", shortHostname, randomHex)
|
|
glog.Infof("Generated new worker ID: %s", workerID)
|
|
}
|
|
|
|
// Save worker ID to file
|
|
if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil {
|
|
glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err)
|
|
} else {
|
|
glog.Infof("Saved worker ID to %s", idFilePath)
|
|
}
|
|
|
|
return workerID, nil
|
|
}
|
|
|
|
// NewWorker creates a new worker instance
|
|
func NewWorker(config *types.WorkerConfig) (*Worker, error) {
|
|
if config == nil {
|
|
config = types.DefaultWorkerConfig()
|
|
}
|
|
|
|
// Generate or load persistent worker ID
|
|
workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to generate or load worker ID: %w", err)
|
|
}
|
|
|
|
// Use the global unified registry that already has all tasks registered
|
|
registry := tasks.GetGlobalTaskRegistry()
|
|
|
|
// Initialize task log handler
|
|
logDir := filepath.Join(config.BaseWorkingDir, "task_logs")
|
|
// Ensure the base task log directory exists to avoid errors when admin requests logs
|
|
if err := os.MkdirAll(logDir, 0755); err != nil {
|
|
glog.Warningf("Failed to create task log base directory %s: %v", logDir, err)
|
|
}
|
|
taskLogHandler := tasks.NewTaskLogHandler(logDir)
|
|
|
|
worker := &Worker{
|
|
id: workerID,
|
|
config: config,
|
|
registry: registry,
|
|
currentTasks: make(map[string]*types.TaskInput),
|
|
stopChan: make(chan struct{}),
|
|
startTime: time.Now(),
|
|
taskLogHandler: taskLogHandler,
|
|
}
|
|
|
|
glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll()))
|
|
|
|
return worker, nil
|
|
}
|
|
|
|
// getTaskLoggerConfig returns the task logger configuration with worker's log directory
|
|
func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig {
|
|
config := tasks.DefaultTaskLoggerConfig()
|
|
|
|
// Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty)
|
|
logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs")
|
|
config.BaseLogDir = logDir
|
|
|
|
return config
|
|
}
|
|
|
|
// ID returns the worker ID
|
|
func (w *Worker) ID() string {
|
|
return w.id
|
|
}
|
|
|
|
// Start starts the worker
|
|
func (w *Worker) Start() error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if w.running {
|
|
return fmt.Errorf("worker is already running")
|
|
}
|
|
|
|
if w.adminClient == nil {
|
|
return fmt.Errorf("admin client is not set")
|
|
}
|
|
|
|
w.running = true
|
|
w.startTime = time.Now()
|
|
|
|
// Prepare worker info for registration
|
|
workerInfo := &types.WorkerData{
|
|
ID: w.id,
|
|
Capabilities: w.config.Capabilities,
|
|
MaxConcurrent: w.config.MaxConcurrent,
|
|
Status: "active",
|
|
CurrentLoad: 0,
|
|
LastHeartbeat: time.Now(),
|
|
}
|
|
|
|
// Register worker info with client first (this stores it for use during connection)
|
|
if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
|
|
glog.V(1).Infof("Worker info stored for registration: %v", err)
|
|
// This is expected if not connected yet
|
|
}
|
|
|
|
// Start connection attempt (will register immediately if successful)
|
|
glog.Infof("🚀 WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d",
|
|
w.id, w.config.Capabilities, w.config.MaxConcurrent)
|
|
|
|
// Try initial connection, but don't fail if it doesn't work immediately
|
|
if err := w.adminClient.Connect(); err != nil {
|
|
glog.Warningf("⚠️ INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
|
|
// Don't return error - let the reconnection loop handle it
|
|
} else {
|
|
glog.Infof("✅ INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id)
|
|
}
|
|
|
|
// Start worker loops regardless of initial connection status
|
|
// They will handle connection failures gracefully
|
|
glog.V(1).Infof("🔄 STARTING LOOPS: Worker %s starting background loops", w.id)
|
|
go w.heartbeatLoop()
|
|
go w.taskRequestLoop()
|
|
go w.connectionMonitorLoop()
|
|
go w.messageProcessingLoop()
|
|
|
|
glog.Infof("✅ WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the worker
|
|
func (w *Worker) Stop() error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if !w.running {
|
|
return nil
|
|
}
|
|
|
|
w.running = false
|
|
close(w.stopChan)
|
|
|
|
// Stop tickers
|
|
if w.heartbeatTicker != nil {
|
|
w.heartbeatTicker.Stop()
|
|
}
|
|
if w.requestTicker != nil {
|
|
w.requestTicker.Stop()
|
|
}
|
|
|
|
// Wait for current tasks to complete or timeout
|
|
timeout := time.NewTimer(30 * time.Second)
|
|
defer timeout.Stop()
|
|
|
|
for len(w.currentTasks) > 0 {
|
|
select {
|
|
case <-timeout.C:
|
|
glog.Warningf("Worker %s stopping with %d tasks still running", w.id, len(w.currentTasks))
|
|
break
|
|
case <-time.After(time.Second):
|
|
// Check again
|
|
}
|
|
}
|
|
|
|
// Disconnect from admin server
|
|
if w.adminClient != nil {
|
|
if err := w.adminClient.Disconnect(); err != nil {
|
|
glog.Errorf("Error disconnecting from admin server: %v", err)
|
|
}
|
|
}
|
|
|
|
glog.Infof("Worker %s stopped", w.id)
|
|
return nil
|
|
}
|
|
|
|
// RegisterTask registers a task factory
|
|
func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) {
|
|
w.registry.Register(taskType, factory)
|
|
}
|
|
|
|
// GetCapabilities returns the worker capabilities
|
|
func (w *Worker) GetCapabilities() []types.TaskType {
|
|
return w.config.Capabilities
|
|
}
|
|
|
|
// GetStatus returns the current worker status
|
|
func (w *Worker) GetStatus() types.WorkerStatus {
|
|
w.mutex.RLock()
|
|
defer w.mutex.RUnlock()
|
|
|
|
var currentTasks []types.TaskInput
|
|
for _, task := range w.currentTasks {
|
|
currentTasks = append(currentTasks, *task)
|
|
}
|
|
|
|
status := "active"
|
|
if len(w.currentTasks) >= w.config.MaxConcurrent {
|
|
status = "busy"
|
|
}
|
|
|
|
return types.WorkerStatus{
|
|
WorkerID: w.id,
|
|
Status: status,
|
|
Capabilities: w.config.Capabilities,
|
|
MaxConcurrent: w.config.MaxConcurrent,
|
|
CurrentLoad: len(w.currentTasks),
|
|
LastHeartbeat: time.Now(),
|
|
CurrentTasks: currentTasks,
|
|
Uptime: time.Since(w.startTime),
|
|
TasksCompleted: w.tasksCompleted,
|
|
TasksFailed: w.tasksFailed,
|
|
}
|
|
}
|
|
|
|
// HandleTask handles a task execution
|
|
func (w *Worker) HandleTask(task *types.TaskInput) error {
|
|
glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)",
|
|
w.id, task.ID, task.Type, task.VolumeID)
|
|
|
|
w.mutex.Lock()
|
|
currentLoad := len(w.currentTasks)
|
|
if currentLoad >= w.config.MaxConcurrent {
|
|
w.mutex.Unlock()
|
|
glog.Errorf("❌ TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
|
|
w.id, currentLoad, w.config.MaxConcurrent, task.ID)
|
|
return fmt.Errorf("worker is at capacity")
|
|
}
|
|
|
|
w.currentTasks[task.ID] = task
|
|
newLoad := len(w.currentTasks)
|
|
w.mutex.Unlock()
|
|
|
|
glog.Infof("✅ TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
|
|
w.id, task.ID, newLoad, w.config.MaxConcurrent)
|
|
|
|
// Execute task in goroutine
|
|
go w.executeTask(task)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetCapabilities sets the worker capabilities
|
|
func (w *Worker) SetCapabilities(capabilities []types.TaskType) {
|
|
w.config.Capabilities = capabilities
|
|
}
|
|
|
|
// SetMaxConcurrent sets the maximum concurrent tasks
|
|
func (w *Worker) SetMaxConcurrent(max int) {
|
|
w.config.MaxConcurrent = max
|
|
}
|
|
|
|
// SetHeartbeatInterval sets the heartbeat interval
|
|
func (w *Worker) SetHeartbeatInterval(interval time.Duration) {
|
|
w.config.HeartbeatInterval = interval
|
|
}
|
|
|
|
// SetTaskRequestInterval sets the task request interval
|
|
func (w *Worker) SetTaskRequestInterval(interval time.Duration) {
|
|
w.config.TaskRequestInterval = interval
|
|
}
|
|
|
|
// SetAdminClient sets the admin client
|
|
func (w *Worker) SetAdminClient(client AdminClient) {
|
|
w.adminClient = client
|
|
}
|
|
|
|
// executeTask executes a task
|
|
func (w *Worker) executeTask(task *types.TaskInput) {
|
|
startTime := time.Now()
|
|
|
|
defer func() {
|
|
w.mutex.Lock()
|
|
delete(w.currentTasks, task.ID)
|
|
currentLoad := len(w.currentTasks)
|
|
w.mutex.Unlock()
|
|
|
|
duration := time.Since(startTime)
|
|
glog.Infof("🏁 TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
|
|
w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent)
|
|
}()
|
|
|
|
glog.Infof("🚀 TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v",
|
|
w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339))
|
|
|
|
// Report task start to admin server
|
|
if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil {
|
|
glog.V(1).Infof("Failed to report task start to admin: %v", err)
|
|
}
|
|
|
|
// Determine task-specific working directory (BaseWorkingDir is guaranteed to be non-empty)
|
|
taskWorkingDir := filepath.Join(w.config.BaseWorkingDir, string(task.Type))
|
|
glog.V(2).Infof("📁 WORKING DIRECTORY: Task %s using working directory: %s", task.ID, taskWorkingDir)
|
|
|
|
// Check if we have typed protobuf parameters
|
|
if task.TypedParams == nil {
|
|
w.completeTask(task.ID, false, "task has no typed parameters - task was not properly planned")
|
|
glog.Errorf("Worker %s rejecting task %s: no typed parameters", w.id, task.ID)
|
|
return
|
|
}
|
|
|
|
// Use new task execution system with unified Task interface
|
|
glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID)
|
|
|
|
// Initialize a file-based task logger so admin can retrieve logs
|
|
// Build minimal params for logger metadata
|
|
loggerParams := types.TaskParams{
|
|
VolumeID: task.VolumeID,
|
|
Collection: task.Collection,
|
|
TypedParams: task.TypedParams,
|
|
}
|
|
loggerConfig := w.getTaskLoggerConfig()
|
|
fileLogger, logErr := tasks.NewTaskLogger(task.ID, task.Type, w.id, loggerParams, loggerConfig)
|
|
if logErr != nil {
|
|
glog.Warningf("Failed to initialize file logger for task %s: %v", task.ID, logErr)
|
|
} else {
|
|
defer func() {
|
|
if err := fileLogger.Close(); err != nil {
|
|
glog.V(1).Infof("Failed to close task logger for %s: %v", task.ID, err)
|
|
}
|
|
}()
|
|
fileLogger.Info("Task %s started (type=%s, server=%s, collection=%s)", task.ID, task.Type, task.Server, task.Collection)
|
|
}
|
|
|
|
taskFactory := w.registry.Get(task.Type)
|
|
if taskFactory == nil {
|
|
w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type))
|
|
glog.Errorf("Worker %s failed to get task factory for %s type %v", w.id, task.ID, task.Type)
|
|
|
|
// Log supported task types for debugging
|
|
allFactories := w.registry.GetAll()
|
|
glog.Errorf("Available task types: %d", len(allFactories))
|
|
for taskType := range allFactories {
|
|
glog.Errorf("Supported task type: %v", taskType)
|
|
}
|
|
return
|
|
}
|
|
|
|
taskInstance, err := taskFactory.Create(task.TypedParams)
|
|
if err != nil {
|
|
w.completeTask(task.ID, false, fmt.Sprintf("failed to create task for %s: %v", task.Type, err))
|
|
glog.Errorf("Worker %s failed to create task %s type %v: %v", w.id, task.ID, task.Type, err)
|
|
return
|
|
}
|
|
|
|
// Task execution uses the new unified Task interface
|
|
glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)
|
|
|
|
// If we have a file logger, adapt it so task WithFields logs are captured into file
|
|
if fileLogger != nil {
|
|
if withLogger, ok := taskInstance.(interface{ SetLogger(types.Logger) }); ok {
|
|
withLogger.SetLogger(newTaskLoggerAdapter(fileLogger))
|
|
}
|
|
}
|
|
|
|
// Set progress callback that reports to admin server
|
|
taskInstance.SetProgressCallback(func(progress float64, stage string) {
|
|
// Report progress updates to admin server
|
|
glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage)
|
|
if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil {
|
|
glog.V(1).Infof("Failed to report task progress to admin: %v", err)
|
|
}
|
|
if fileLogger != nil {
|
|
// Use meaningful stage description or fallback to generic message
|
|
message := stage
|
|
if message == "" {
|
|
message = fmt.Sprintf("Progress: %.1f%%", progress)
|
|
}
|
|
fileLogger.LogProgress(progress, message)
|
|
}
|
|
})
|
|
|
|
// Execute task with context
|
|
ctx := context.Background()
|
|
err = taskInstance.Execute(ctx, task.TypedParams)
|
|
|
|
// Report completion
|
|
if err != nil {
|
|
w.completeTask(task.ID, false, err.Error())
|
|
w.tasksFailed++
|
|
glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
|
|
if fileLogger != nil {
|
|
fileLogger.LogStatus("failed", err.Error())
|
|
fileLogger.Error("Task %s failed: %v", task.ID, err)
|
|
}
|
|
} else {
|
|
w.completeTask(task.ID, true, "")
|
|
w.tasksCompleted++
|
|
glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
|
|
if fileLogger != nil {
|
|
fileLogger.Info("Task %s completed successfully", task.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// completeTask reports task completion to admin server
|
|
func (w *Worker) completeTask(taskID string, success bool, errorMsg string) {
|
|
if w.adminClient != nil {
|
|
if err := w.adminClient.CompleteTask(taskID, success, errorMsg); err != nil {
|
|
glog.Errorf("Failed to report task completion: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// heartbeatLoop sends periodic heartbeats to the admin server
|
|
func (w *Worker) heartbeatLoop() {
|
|
w.heartbeatTicker = time.NewTicker(w.config.HeartbeatInterval)
|
|
defer w.heartbeatTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-w.stopChan:
|
|
return
|
|
case <-w.heartbeatTicker.C:
|
|
w.sendHeartbeat()
|
|
}
|
|
}
|
|
}
|
|
|
|
// taskRequestLoop periodically requests new tasks from the admin server
|
|
func (w *Worker) taskRequestLoop() {
|
|
w.requestTicker = time.NewTicker(w.config.TaskRequestInterval)
|
|
defer w.requestTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-w.stopChan:
|
|
return
|
|
case <-w.requestTicker.C:
|
|
w.requestTasks()
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat sends heartbeat to admin server
|
|
func (w *Worker) sendHeartbeat() {
|
|
if w.adminClient != nil {
|
|
if err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{
|
|
WorkerID: w.id,
|
|
Status: "active",
|
|
Capabilities: w.config.Capabilities,
|
|
MaxConcurrent: w.config.MaxConcurrent,
|
|
CurrentLoad: len(w.currentTasks),
|
|
LastHeartbeat: time.Now(),
|
|
}); err != nil {
|
|
glog.Warningf("Failed to send heartbeat: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// requestTasks requests new tasks from the admin server
|
|
func (w *Worker) requestTasks() {
|
|
w.mutex.RLock()
|
|
currentLoad := len(w.currentTasks)
|
|
w.mutex.RUnlock()
|
|
|
|
if currentLoad >= w.config.MaxConcurrent {
|
|
glog.V(3).Infof("🚫 TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
|
|
w.id, currentLoad, w.config.MaxConcurrent)
|
|
return // Already at capacity
|
|
}
|
|
|
|
if w.adminClient != nil {
|
|
glog.V(3).Infof("📞 REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
|
|
w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities)
|
|
|
|
task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities)
|
|
if err != nil {
|
|
glog.V(2).Infof("❌ TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
|
|
return
|
|
}
|
|
|
|
if task != nil {
|
|
glog.Infof("📨 TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s",
|
|
w.id, task.ID, task.Type)
|
|
if err := w.HandleTask(task); err != nil {
|
|
glog.Errorf("❌ TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err)
|
|
}
|
|
} else {
|
|
glog.V(3).Infof("📭 NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetTaskRegistry returns the task registry
|
|
func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry {
|
|
return w.registry
|
|
}
|
|
|
|
// GetCurrentTasks returns the current tasks
|
|
func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput {
|
|
w.mutex.RLock()
|
|
defer w.mutex.RUnlock()
|
|
|
|
tasks := make(map[string]*types.TaskInput)
|
|
for id, task := range w.currentTasks {
|
|
tasks[id] = task
|
|
}
|
|
return tasks
|
|
}
|
|
|
|
// registerWorker registers the worker with the admin server
|
|
func (w *Worker) registerWorker() {
|
|
workerInfo := &types.WorkerData{
|
|
ID: w.id,
|
|
Capabilities: w.config.Capabilities,
|
|
MaxConcurrent: w.config.MaxConcurrent,
|
|
Status: "active",
|
|
CurrentLoad: 0,
|
|
LastHeartbeat: time.Now(),
|
|
}
|
|
|
|
if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
|
|
glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err)
|
|
} else {
|
|
glog.Infof("Worker %s registered successfully with admin server", w.id)
|
|
}
|
|
}
|
|
|
|
// connectionMonitorLoop monitors connection status
|
|
func (w *Worker) connectionMonitorLoop() {
|
|
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
|
|
defer ticker.Stop()
|
|
|
|
lastConnectionStatus := false
|
|
|
|
for {
|
|
select {
|
|
case <-w.stopChan:
|
|
glog.V(1).Infof("🛑 CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
|
|
return
|
|
case <-ticker.C:
|
|
// Monitor connection status and log changes
|
|
currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected()
|
|
|
|
if currentConnectionStatus != lastConnectionStatus {
|
|
if currentConnectionStatus {
|
|
glog.Infof("🔗 CONNECTION RESTORED: Worker %s connection status changed: connected", w.id)
|
|
} else {
|
|
glog.Warningf("⚠️ CONNECTION LOST: Worker %s connection status changed: disconnected", w.id)
|
|
}
|
|
lastConnectionStatus = currentConnectionStatus
|
|
} else {
|
|
if currentConnectionStatus {
|
|
glog.V(3).Infof("✅ CONNECTION OK: Worker %s connection status: connected", w.id)
|
|
} else {
|
|
glog.V(1).Infof("🔌 CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetConfig returns the worker configuration
|
|
func (w *Worker) GetConfig() *types.WorkerConfig {
|
|
return w.config
|
|
}
|
|
|
|
// GetPerformanceMetrics returns performance metrics
|
|
func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
|
|
w.mutex.RLock()
|
|
defer w.mutex.RUnlock()
|
|
|
|
uptime := time.Since(w.startTime)
|
|
var successRate float64
|
|
totalTasks := w.tasksCompleted + w.tasksFailed
|
|
if totalTasks > 0 {
|
|
successRate = float64(w.tasksCompleted) / float64(totalTasks) * 100
|
|
}
|
|
|
|
return &types.WorkerPerformance{
|
|
TasksCompleted: w.tasksCompleted,
|
|
TasksFailed: w.tasksFailed,
|
|
AverageTaskTime: 0, // Would need to track this
|
|
Uptime: uptime,
|
|
SuccessRate: successRate,
|
|
}
|
|
}
|
|
|
|
// messageProcessingLoop processes incoming admin messages
|
|
func (w *Worker) messageProcessingLoop() {
|
|
glog.Infof("🔄 MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
|
|
|
|
// Get access to the incoming message channel from gRPC client
|
|
grpcClient, ok := w.adminClient.(*GrpcAdminClient)
|
|
if !ok {
|
|
glog.Warningf("⚠️ MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
|
|
return
|
|
}
|
|
|
|
incomingChan := grpcClient.GetIncomingChannel()
|
|
glog.V(1).Infof("📡 MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
|
|
|
|
for {
|
|
select {
|
|
case <-w.stopChan:
|
|
glog.Infof("🛑 MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
|
|
return
|
|
case message := <-incomingChan:
|
|
if message != nil {
|
|
glog.V(3).Infof("📥 MESSAGE PROCESSING: Worker %s processing incoming message", w.id)
|
|
w.processAdminMessage(message)
|
|
} else {
|
|
glog.V(3).Infof("📭 NULL MESSAGE: Worker %s received nil message", w.id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processAdminMessage processes different types of admin messages
|
|
func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
|
|
glog.V(4).Infof("📫 ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message)
|
|
|
|
switch msg := message.Message.(type) {
|
|
case *worker_pb.AdminMessage_RegistrationResponse:
|
|
glog.V(2).Infof("✅ REGISTRATION RESPONSE: Worker %s received registration response", w.id)
|
|
w.handleRegistrationResponse(msg.RegistrationResponse)
|
|
case *worker_pb.AdminMessage_HeartbeatResponse:
|
|
glog.V(3).Infof("💓 HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id)
|
|
w.handleHeartbeatResponse(msg.HeartbeatResponse)
|
|
case *worker_pb.AdminMessage_TaskLogRequest:
|
|
glog.V(1).Infof("📋 TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId)
|
|
w.handleTaskLogRequest(msg.TaskLogRequest)
|
|
case *worker_pb.AdminMessage_TaskAssignment:
|
|
taskAssign := msg.TaskAssignment
|
|
glog.V(1).Infof("Worker %s received direct task assignment %s (type: %s, volume: %d)",
|
|
w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
|
|
|
|
// Convert to task and handle it
|
|
task := &types.TaskInput{
|
|
ID: taskAssign.TaskId,
|
|
Type: types.TaskType(taskAssign.TaskType),
|
|
Status: types.TaskStatusAssigned,
|
|
VolumeID: taskAssign.Params.VolumeId,
|
|
Server: getServerFromParams(taskAssign.Params),
|
|
Collection: taskAssign.Params.Collection,
|
|
Priority: types.TaskPriority(taskAssign.Priority),
|
|
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
|
|
TypedParams: taskAssign.Params,
|
|
}
|
|
|
|
if err := w.HandleTask(task); err != nil {
|
|
glog.Errorf("❌ DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err)
|
|
}
|
|
case *worker_pb.AdminMessage_TaskCancellation:
|
|
glog.Infof("🛑 TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId)
|
|
w.handleTaskCancellation(msg.TaskCancellation)
|
|
case *worker_pb.AdminMessage_AdminShutdown:
|
|
glog.Infof("🔄 ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id)
|
|
w.handleAdminShutdown(msg.AdminShutdown)
|
|
default:
|
|
glog.V(1).Infof("❓ UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message)
|
|
}
|
|
}
|
|
|
|
// handleTaskLogRequest processes task log requests from admin server
|
|
func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) {
|
|
glog.V(1).Infof("Worker %s handling task log request for task %s", w.id, request.TaskId)
|
|
|
|
// Use the task log handler to process the request
|
|
response := w.taskLogHandler.HandleLogRequest(request)
|
|
|
|
// Send response back to admin server
|
|
responseMsg := &worker_pb.WorkerMessage{
|
|
WorkerId: w.id,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskLogResponse{
|
|
TaskLogResponse: response,
|
|
},
|
|
}
|
|
|
|
grpcClient, ok := w.adminClient.(*GrpcAdminClient)
|
|
if !ok {
|
|
glog.Errorf("Cannot send task log response: admin client is not gRPC client")
|
|
return
|
|
}
|
|
|
|
select {
|
|
case grpcClient.outgoing <- responseMsg:
|
|
glog.V(1).Infof("Task log response sent for task %s", request.TaskId)
|
|
case <-time.After(5 * time.Second):
|
|
glog.Errorf("Failed to send task log response for task %s: timeout", request.TaskId)
|
|
}
|
|
}
|
|
|
|
// handleTaskCancellation processes task cancellation requests
|
|
func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) {
|
|
glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId)
|
|
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if task, exists := w.currentTasks[cancellation.TaskId]; exists {
|
|
// TODO: Implement task cancellation logic
|
|
glog.Infof("Cancelling task %s", task.ID)
|
|
} else {
|
|
glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId)
|
|
}
|
|
}
|
|
|
|
// handleAdminShutdown processes admin shutdown notifications
|
|
func (w *Worker) handleAdminShutdown(shutdown *worker_pb.AdminShutdown) {
|
|
glog.Infof("Worker %s received admin shutdown notification: %s", w.id, shutdown.Reason)
|
|
|
|
gracefulSeconds := shutdown.GracefulShutdownSeconds
|
|
if gracefulSeconds > 0 {
|
|
glog.Infof("Graceful shutdown in %d seconds", gracefulSeconds)
|
|
time.AfterFunc(time.Duration(gracefulSeconds)*time.Second, func() {
|
|
w.Stop()
|
|
})
|
|
} else {
|
|
// Immediate shutdown
|
|
go w.Stop()
|
|
}
|
|
}
|
|
|
|
// handleRegistrationResponse processes registration response from admin server
|
|
func (w *Worker) handleRegistrationResponse(response *worker_pb.RegistrationResponse) {
|
|
glog.V(2).Infof("Worker %s processed registration response: success=%v", w.id, response.Success)
|
|
if !response.Success {
|
|
glog.Warningf("Worker %s registration failed: %s", w.id, response.Message)
|
|
}
|
|
// Registration responses are typically handled by the gRPC client during connection setup
|
|
// No additional action needed here
|
|
}
|
|
|
|
// handleHeartbeatResponse processes heartbeat response from admin server
|
|
func (w *Worker) handleHeartbeatResponse(response *worker_pb.HeartbeatResponse) {
|
|
glog.V(4).Infof("Worker %s processed heartbeat response", w.id)
|
|
// Heartbeat responses are mainly for keeping the connection alive
|
|
// The admin may include configuration updates or status information in the future
|
|
// For now, just acknowledge receipt
|
|
}
|