1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-17 01:22:47 +02:00
seaweedfs/weed/worker/tasks/task_log_handler.go
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

230 lines
6.1 KiB
Go

package tasks
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// TaskLogHandler handles task log requests from admin server
type TaskLogHandler struct {
baseLogDir string
}
// NewTaskLogHandler creates a new task log handler
func NewTaskLogHandler(baseLogDir string) *TaskLogHandler {
if baseLogDir == "" {
baseLogDir = "/tmp/seaweedfs/task_logs"
}
return &TaskLogHandler{
baseLogDir: baseLogDir,
}
}
// HandleLogRequest processes a task log request and returns the response
func (h *TaskLogHandler) HandleLogRequest(request *worker_pb.TaskLogRequest) *worker_pb.TaskLogResponse {
response := &worker_pb.TaskLogResponse{
TaskId: request.TaskId,
WorkerId: request.WorkerId,
Success: false,
}
// Find the task log directory
logDir, err := h.findTaskLogDirectory(request.TaskId)
if err != nil {
response.ErrorMessage = fmt.Sprintf("Task log directory not found: %v", err)
glog.Warningf("Task log request failed for %s: %v", request.TaskId, err)
return response
}
// Read metadata if requested
if request.IncludeMetadata {
metadata, err := h.readTaskMetadata(logDir)
if err != nil {
response.ErrorMessage = fmt.Sprintf("Failed to read task metadata: %v", err)
glog.Warningf("Failed to read metadata for task %s: %v", request.TaskId, err)
return response
}
response.Metadata = metadata
}
// Read log entries
logEntries, err := h.readTaskLogEntries(logDir, request)
if err != nil {
response.ErrorMessage = fmt.Sprintf("Failed to read task logs: %v", err)
glog.Warningf("Failed to read logs for task %s: %v", request.TaskId, err)
return response
}
response.LogEntries = logEntries
response.Success = true
glog.V(1).Infof("Successfully retrieved %d log entries for task %s", len(logEntries), request.TaskId)
return response
}
// findTaskLogDirectory searches for the task log directory by task ID
func (h *TaskLogHandler) findTaskLogDirectory(taskID string) (string, error) {
entries, err := os.ReadDir(h.baseLogDir)
if err != nil {
return "", fmt.Errorf("failed to read base log directory: %w", err)
}
// Look for directories that start with the task ID
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), taskID+"_") {
return filepath.Join(h.baseLogDir, entry.Name()), nil
}
}
return "", fmt.Errorf("task log directory not found for task ID: %s", taskID)
}
// readTaskMetadata reads task metadata from the log directory
func (h *TaskLogHandler) readTaskMetadata(logDir string) (*worker_pb.TaskLogMetadata, error) {
metadata, err := GetTaskLogMetadata(logDir)
if err != nil {
return nil, err
}
// Convert to protobuf metadata
pbMetadata := &worker_pb.TaskLogMetadata{
TaskId: metadata.TaskID,
TaskType: metadata.TaskType,
WorkerId: metadata.WorkerID,
StartTime: metadata.StartTime.Unix(),
Status: metadata.Status,
Progress: float32(metadata.Progress),
VolumeId: metadata.VolumeID,
Server: metadata.Server,
Collection: metadata.Collection,
LogFilePath: metadata.LogFilePath,
CreatedAt: metadata.CreatedAt.Unix(),
CustomData: make(map[string]string),
}
// Set end time and duration if available
if metadata.EndTime != nil {
pbMetadata.EndTime = metadata.EndTime.Unix()
}
if metadata.Duration != nil {
pbMetadata.DurationMs = metadata.Duration.Milliseconds()
}
// Convert custom data
for key, value := range metadata.CustomData {
if strValue, ok := value.(string); ok {
pbMetadata.CustomData[key] = strValue
} else {
pbMetadata.CustomData[key] = fmt.Sprintf("%v", value)
}
}
return pbMetadata, nil
}
// readTaskLogEntries reads and filters log entries based on the request
func (h *TaskLogHandler) readTaskLogEntries(logDir string, request *worker_pb.TaskLogRequest) ([]*worker_pb.TaskLogEntry, error) {
entries, err := ReadTaskLogs(logDir)
if err != nil {
return nil, err
}
// Apply filters
var filteredEntries []TaskLogEntry
for _, entry := range entries {
// Filter by log level
if request.LogLevel != "" && !strings.EqualFold(entry.Level, request.LogLevel) {
continue
}
// Filter by time range
if request.StartTime > 0 && entry.Timestamp.Unix() < request.StartTime {
continue
}
if request.EndTime > 0 && entry.Timestamp.Unix() > request.EndTime {
continue
}
filteredEntries = append(filteredEntries, entry)
}
// Limit entries if requested
if request.MaxEntries > 0 && len(filteredEntries) > int(request.MaxEntries) {
// Take the most recent entries
start := len(filteredEntries) - int(request.MaxEntries)
filteredEntries = filteredEntries[start:]
}
// Convert to protobuf entries
var pbEntries []*worker_pb.TaskLogEntry
for _, entry := range filteredEntries {
pbEntry := &worker_pb.TaskLogEntry{
Timestamp: entry.Timestamp.Unix(),
Level: entry.Level,
Message: entry.Message,
Fields: make(map[string]string),
}
// Set progress if available
if entry.Progress != nil {
pbEntry.Progress = float32(*entry.Progress)
}
// Set status if available
if entry.Status != nil {
pbEntry.Status = *entry.Status
}
// Convert fields
for key, value := range entry.Fields {
if strValue, ok := value.(string); ok {
pbEntry.Fields[key] = strValue
} else {
pbEntry.Fields[key] = fmt.Sprintf("%v", value)
}
}
pbEntries = append(pbEntries, pbEntry)
}
return pbEntries, nil
}
// ListAvailableTaskLogs returns a list of available task log directories
func (h *TaskLogHandler) ListAvailableTaskLogs() ([]string, error) {
entries, err := os.ReadDir(h.baseLogDir)
if err != nil {
return nil, fmt.Errorf("failed to read base log directory: %w", err)
}
var taskDirs []string
for _, entry := range entries {
if entry.IsDir() {
taskDirs = append(taskDirs, entry.Name())
}
}
return taskDirs, nil
}
// CleanupOldLogs removes old task logs beyond the specified limit
func (h *TaskLogHandler) CleanupOldLogs(maxTasks int) error {
config := TaskLoggerConfig{
BaseLogDir: h.baseLogDir,
MaxTasks: maxTasks,
}
// Create a temporary logger to trigger cleanup
tempLogger := &FileTaskLogger{
config: config,
}
tempLogger.cleanupOldLogs()
return nil
}