1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-08-17 01:22:47 +02:00
seaweedfs/weed/worker/tasks/ui_base.go
Chris Lu 891a2fb6eb
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-07-30 12:38:03 -07:00

184 lines
4.8 KiB
Go

package tasks
import (
"reflect"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BaseUIProvider provides common UIProvider functionality for all tasks
type BaseUIProvider struct {
taskType types.TaskType
displayName string
description string
icon string
schemaFunc func() *TaskConfigSchema
configFunc func() types.TaskConfig
applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error
applyTaskConfigFunc func(config types.TaskConfig) error
}
// NewBaseUIProvider creates a new base UI provider
func NewBaseUIProvider(
taskType types.TaskType,
displayName string,
description string,
icon string,
schemaFunc func() *TaskConfigSchema,
configFunc func() types.TaskConfig,
applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error,
applyTaskConfigFunc func(config types.TaskConfig) error,
) *BaseUIProvider {
return &BaseUIProvider{
taskType: taskType,
displayName: displayName,
description: description,
icon: icon,
schemaFunc: schemaFunc,
configFunc: configFunc,
applyTaskPolicyFunc: applyTaskPolicyFunc,
applyTaskConfigFunc: applyTaskConfigFunc,
}
}
// GetTaskType returns the task type
func (ui *BaseUIProvider) GetTaskType() types.TaskType {
return ui.taskType
}
// GetDisplayName returns the human-readable name
func (ui *BaseUIProvider) GetDisplayName() string {
return ui.displayName
}
// GetDescription returns a description of what this task does
func (ui *BaseUIProvider) GetDescription() string {
return ui.description
}
// GetIcon returns the icon CSS class for this task type
func (ui *BaseUIProvider) GetIcon() string {
return ui.icon
}
// GetCurrentConfig returns the current configuration as TaskConfig
func (ui *BaseUIProvider) GetCurrentConfig() types.TaskConfig {
return ui.configFunc()
}
// ApplyTaskPolicy applies protobuf TaskPolicy configuration
func (ui *BaseUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error {
return ui.applyTaskPolicyFunc(policy)
}
// ApplyTaskConfig applies TaskConfig interface configuration
func (ui *BaseUIProvider) ApplyTaskConfig(config types.TaskConfig) error {
return ui.applyTaskConfigFunc(config)
}
// CommonConfigGetter provides a common pattern for getting current configuration
type CommonConfigGetter[T any] struct {
defaultConfig T
detectorFunc func() T
schedulerFunc func() T
}
// NewCommonConfigGetter creates a new common config getter
func NewCommonConfigGetter[T any](
defaultConfig T,
detectorFunc func() T,
schedulerFunc func() T,
) *CommonConfigGetter[T] {
return &CommonConfigGetter[T]{
defaultConfig: defaultConfig,
detectorFunc: detectorFunc,
schedulerFunc: schedulerFunc,
}
}
// GetConfig returns the merged configuration
func (cg *CommonConfigGetter[T]) GetConfig() T {
config := cg.defaultConfig
// Apply detector values if available
if cg.detectorFunc != nil {
detectorConfig := cg.detectorFunc()
mergeConfigs(&config, detectorConfig)
}
// Apply scheduler values if available
if cg.schedulerFunc != nil {
schedulerConfig := cg.schedulerFunc()
mergeConfigs(&config, schedulerConfig)
}
return config
}
// mergeConfigs merges non-zero values from source into dest
func mergeConfigs[T any](dest *T, source T) {
destValue := reflect.ValueOf(dest).Elem()
sourceValue := reflect.ValueOf(source)
if destValue.Kind() != reflect.Struct || sourceValue.Kind() != reflect.Struct {
return
}
for i := 0; i < destValue.NumField(); i++ {
destField := destValue.Field(i)
sourceField := sourceValue.Field(i)
if !destField.CanSet() {
continue
}
// Only copy non-zero values
if !sourceField.IsZero() {
if destField.Type() == sourceField.Type() {
destField.Set(sourceField)
}
}
}
}
// RegisterUIFunc provides a common registration function signature
type RegisterUIFunc[D, S any] func(uiRegistry *types.UIRegistry, detector D, scheduler S)
// CommonRegisterUI provides a common registration implementation
func CommonRegisterUI[D, S any](
taskType types.TaskType,
displayName string,
uiRegistry *types.UIRegistry,
detector D,
scheduler S,
schemaFunc func() *TaskConfigSchema,
configFunc func() types.TaskConfig,
applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error,
applyTaskConfigFunc func(config types.TaskConfig) error,
) {
// Get metadata from schema
schema := schemaFunc()
description := "Task configuration"
icon := "fas fa-cog"
if schema != nil {
description = schema.Description
icon = schema.Icon
}
uiProvider := NewBaseUIProvider(
taskType,
displayName,
description,
icon,
schemaFunc,
configFunc,
applyTaskPolicyFunc,
applyTaskConfigFunc,
)
uiRegistry.RegisterUI(uiProvider)
glog.V(1).Infof("✅ Registered %s task UI provider", taskType)
}