1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-17 01:22:47 +02:00
seaweedfs/weed/worker/tasks/task_logger.go
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

432 lines
12 KiB
Go

package tasks
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// TaskLogger provides file-based logging for individual tasks
type TaskLogger interface {
// Log methods
Info(message string, args ...interface{})
Warning(message string, args ...interface{})
Error(message string, args ...interface{})
Debug(message string, args ...interface{})
// Progress and status logging
LogProgress(progress float64, message string)
LogStatus(status string, message string)
// Structured logging
LogWithFields(level string, message string, fields map[string]interface{})
// Lifecycle
Close() error
GetLogDir() string
}
// LoggerProvider interface for tasks that support logging
type LoggerProvider interface {
InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error
GetTaskLogger() TaskLogger
}
// TaskLoggerConfig holds configuration for task logging
type TaskLoggerConfig struct {
BaseLogDir string
MaxTasks int // Maximum number of task logs to keep
MaxLogSizeMB int // Maximum log file size in MB
EnableConsole bool // Also log to console
}
// FileTaskLogger implements TaskLogger using file-based logging
type FileTaskLogger struct {
taskID string
taskType types.TaskType
workerID string
logDir string
logFile *os.File
mutex sync.Mutex
config TaskLoggerConfig
metadata *TaskLogMetadata
closed bool
}
// TaskLogMetadata contains metadata about the task execution
type TaskLogMetadata struct {
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
WorkerID string `json:"worker_id"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
Duration *time.Duration `json:"duration,omitempty"`
Status string `json:"status"`
Progress float64 `json:"progress"`
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"`
CustomData map[string]interface{} `json:"custom_data,omitempty"`
LogFilePath string `json:"log_file_path"`
CreatedAt time.Time `json:"created_at"`
}
// TaskLogEntry represents a single log entry
type TaskLogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Progress *float64 `json:"progress,omitempty"`
Status *string `json:"status,omitempty"`
}
// DefaultTaskLoggerConfig returns default configuration
func DefaultTaskLoggerConfig() TaskLoggerConfig {
return TaskLoggerConfig{
BaseLogDir: "/data/task_logs", // Use persistent data directory
MaxTasks: 100, // Keep last 100 task logs
MaxLogSizeMB: 10,
EnableConsole: true,
}
}
// NewTaskLogger creates a new file-based task logger
func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) {
// Create unique directory name with timestamp
timestamp := time.Now().Format("20060102_150405")
dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp)
logDir := filepath.Join(config.BaseLogDir, dirName)
// Create log directory
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err)
}
// Create log file
logFilePath := filepath.Join(logDir, "task.log")
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err)
}
// Create metadata
metadata := &TaskLogMetadata{
TaskID: taskID,
TaskType: string(taskType),
WorkerID: workerID,
StartTime: time.Now(),
Status: "started",
Progress: 0.0,
VolumeID: params.VolumeID,
Server: params.Server,
Collection: params.Collection,
CustomData: make(map[string]interface{}),
LogFilePath: logFilePath,
CreatedAt: time.Now(),
}
logger := &FileTaskLogger{
taskID: taskID,
taskType: taskType,
workerID: workerID,
logDir: logDir,
logFile: logFile,
config: config,
metadata: metadata,
closed: false,
}
// Write initial log entry
logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID)
logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{
"volume_id": params.VolumeID,
"server": params.Server,
"collection": params.Collection,
})
// Save initial metadata
if err := logger.saveMetadata(); err != nil {
glog.Warningf("Failed to save initial task metadata: %v", err)
}
// Clean up old task logs
go logger.cleanupOldLogs()
return logger, nil
}
// Info logs an info message
func (l *FileTaskLogger) Info(message string, args ...interface{}) {
l.log("INFO", message, args...)
}
// Warning logs a warning message
func (l *FileTaskLogger) Warning(message string, args ...interface{}) {
l.log("WARNING", message, args...)
}
// Error logs an error message
func (l *FileTaskLogger) Error(message string, args ...interface{}) {
l.log("ERROR", message, args...)
}
// Debug logs a debug message
func (l *FileTaskLogger) Debug(message string, args ...interface{}) {
l.log("DEBUG", message, args...)
}
// LogProgress logs task progress
func (l *FileTaskLogger) LogProgress(progress float64, message string) {
l.mutex.Lock()
l.metadata.Progress = progress
l.mutex.Unlock()
entry := TaskLogEntry{
Timestamp: time.Now(),
Level: "INFO",
Message: message,
Progress: &progress,
}
l.writeLogEntry(entry)
l.saveMetadata() // Update metadata with new progress
}
// LogStatus logs task status change
func (l *FileTaskLogger) LogStatus(status string, message string) {
l.mutex.Lock()
l.metadata.Status = status
l.mutex.Unlock()
entry := TaskLogEntry{
Timestamp: time.Now(),
Level: "INFO",
Message: message,
Status: &status,
}
l.writeLogEntry(entry)
l.saveMetadata() // Update metadata with new status
}
// LogWithFields logs a message with structured fields
func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) {
entry := TaskLogEntry{
Timestamp: time.Now(),
Level: level,
Message: message,
Fields: fields,
}
l.writeLogEntry(entry)
}
// Close closes the logger and finalizes metadata
func (l *FileTaskLogger) Close() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.closed {
return nil
}
// Finalize metadata
endTime := time.Now()
duration := endTime.Sub(l.metadata.StartTime)
l.metadata.EndTime = &endTime
l.metadata.Duration = &duration
if l.metadata.Status == "started" {
l.metadata.Status = "completed"
}
// Save final metadata
l.saveMetadata()
// Close log file
if l.logFile != nil {
if err := l.logFile.Close(); err != nil {
return fmt.Errorf("failed to close log file: %w", err)
}
}
l.closed = true
l.Info("Task logger closed for %s", l.taskID)
return nil
}
// GetLogDir returns the log directory path
func (l *FileTaskLogger) GetLogDir() string {
return l.logDir
}
// log is the internal logging method
func (l *FileTaskLogger) log(level string, message string, args ...interface{}) {
formattedMessage := fmt.Sprintf(message, args...)
entry := TaskLogEntry{
Timestamp: time.Now(),
Level: level,
Message: formattedMessage,
}
l.writeLogEntry(entry)
}
// writeLogEntry writes a log entry to the file
func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.closed || l.logFile == nil {
return
}
// Format as JSON line
jsonData, err := json.Marshal(entry)
if err != nil {
glog.Errorf("Failed to marshal log entry: %v", err)
return
}
// Write to file
if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil {
glog.Errorf("Failed to write log entry: %v", err)
return
}
// Flush to disk
if err := l.logFile.Sync(); err != nil {
glog.Errorf("Failed to sync log file: %v", err)
}
// Also log to console and stderr if enabled
if l.config.EnableConsole {
// Log to glog with proper call depth to show actual source location
// We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller
formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message)
switch entry.Level {
case "ERROR":
glog.ErrorDepth(3, formattedMsg)
case "WARNING":
glog.WarningDepth(3, formattedMsg)
default: // INFO, DEBUG, etc.
glog.InfoDepth(3, formattedMsg)
}
// Also log to stderr for immediate visibility
fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message)
}
}
// saveMetadata saves task metadata to file
func (l *FileTaskLogger) saveMetadata() error {
metadataPath := filepath.Join(l.logDir, "metadata.json")
data, err := json.MarshalIndent(l.metadata, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}
return os.WriteFile(metadataPath, data, 0644)
}
// cleanupOldLogs removes old task log directories to maintain the limit
func (l *FileTaskLogger) cleanupOldLogs() {
baseDir := l.config.BaseLogDir
entries, err := os.ReadDir(baseDir)
if err != nil {
glog.Warningf("Failed to read log directory %s: %v", baseDir, err)
return
}
// Filter for directories only
var dirs []os.DirEntry
for _, entry := range entries {
if entry.IsDir() {
dirs = append(dirs, entry)
}
}
// If we're under the limit, nothing to clean
if len(dirs) <= l.config.MaxTasks {
return
}
// Sort by modification time (oldest first)
sort.Slice(dirs, func(i, j int) bool {
infoI, errI := dirs[i].Info()
infoJ, errJ := dirs[j].Info()
if errI != nil || errJ != nil {
return false
}
return infoI.ModTime().Before(infoJ.ModTime())
})
// Remove oldest directories
numToRemove := len(dirs) - l.config.MaxTasks
for i := 0; i < numToRemove; i++ {
dirPath := filepath.Join(baseDir, dirs[i].Name())
if err := os.RemoveAll(dirPath); err != nil {
glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err)
} else {
glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath)
}
}
glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove)
}
// GetTaskLogMetadata reads metadata from a task log directory
func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) {
metadataPath := filepath.Join(logDir, "metadata.json")
data, err := os.ReadFile(metadataPath)
if err != nil {
return nil, fmt.Errorf("failed to read metadata file: %w", err)
}
var metadata TaskLogMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
}
return &metadata, nil
}
// ReadTaskLogs reads all log entries from a task log file
func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) {
logPath := filepath.Join(logDir, "task.log")
file, err := os.Open(logPath)
if err != nil {
return nil, fmt.Errorf("failed to open log file: %w", err)
}
defer file.Close()
var entries []TaskLogEntry
decoder := json.NewDecoder(file)
for {
var entry TaskLogEntry
if err := decoder.Decode(&entry); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("failed to decode log entry: %w", err)
}
entries = append(entries, entry)
}
return entries, nil
}