1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/mount/ml/config_manager.go
chrislu 814e0bb233 Phase 4: Revolutionary Recipe-Based ML Optimization Engine
🚀 Transform SeaweedFS ML optimizations from hard-coded framework-specific code
to a flexible, configuration-driven system using YAML/JSON rules and templates.

## Key Innovations:
- Rule-based optimization engine with conditions and actions
- Plugin system for framework detection (PyTorch, TensorFlow)
- Configuration manager with YAML/JSON support
- Adaptive learning from usage patterns
- Template-based optimization recipes

## New Components:
- optimization_engine.go: Core rule evaluation and application
- config_manager.go: Configuration loading and validation
- plugins/pytorch_plugin.go: PyTorch-specific optimizations
- plugins/tensorflow_plugin.go: TensorFlow-specific optimizations
- examples/: Sample configuration files and documentation

## Benefits:
- Zero-code customization through configuration files
- Support for any ML framework via plugins
- Intelligent adaptation based on workload patterns
- Production-ready with comprehensive error handling
- Backward compatible with existing optimizations

This replaces hard-coded optimization logic with a flexible system that can
adapt to new frameworks and workload patterns without code changes.
2025-08-30 16:49:12 -07:00

626 lines
19 KiB
Go

package ml
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"gopkg.in/yaml.v3"
)
// OptimizationConfigManager manages optimization configuration loading and validation
type OptimizationConfigManager struct {
sync.RWMutex
configDir string
loadedConfigs map[string]*OptimizationConfig
watchEnabled bool
validationRules map[string]ValidationRule
}
// OptimizationConfig represents a complete optimization configuration
type OptimizationConfig struct {
Version string `json:"version" yaml:"version"`
Name string `json:"name" yaml:"name"`
Description string `json:"description" yaml:"description"`
Author string `json:"author,omitempty" yaml:"author,omitempty"`
Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"`
// Core configuration
Rules []*OptimizationRule `json:"rules" yaml:"rules"`
Templates []*OptimizationTemplate `json:"templates" yaml:"templates"`
Strategies map[string]interface{} `json:"strategies,omitempty" yaml:"strategies,omitempty"`
// Framework-specific settings
Frameworks map[string]FrameworkConfig `json:"frameworks,omitempty" yaml:"frameworks,omitempty"`
// Global settings
Settings GlobalOptimizationSettings `json:"settings" yaml:"settings"`
// Metadata
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
// FrameworkConfig holds framework-specific configuration
type FrameworkConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Version string `json:"version,omitempty" yaml:"version,omitempty"`
Rules []string `json:"rules,omitempty" yaml:"rules,omitempty"`
Templates []string `json:"templates,omitempty" yaml:"templates,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty" yaml:"parameters,omitempty"`
}
// GlobalOptimizationSettings contains global optimization settings
type GlobalOptimizationSettings struct {
DefaultStrategy string `json:"default_strategy" yaml:"default_strategy"`
MaxConcurrentRules int `json:"max_concurrent_rules" yaml:"max_concurrent_rules"`
ConfidenceThreshold float64 `json:"confidence_threshold" yaml:"confidence_threshold"`
AdaptiveLearning bool `json:"adaptive_learning" yaml:"adaptive_learning"`
MetricsCollection bool `json:"metrics_collection" yaml:"metrics_collection"`
Debug bool `json:"debug" yaml:"debug"`
// Resource limits
MemoryLimitMB int `json:"memory_limit_mb,omitempty" yaml:"memory_limit_mb,omitempty"`
CPULimitPercent int `json:"cpu_limit_percent,omitempty" yaml:"cpu_limit_percent,omitempty"`
// Advanced settings
ExperimentalFeatures map[string]bool `json:"experimental_features,omitempty" yaml:"experimental_features,omitempty"`
CustomProperties map[string]interface{} `json:"custom_properties,omitempty" yaml:"custom_properties,omitempty"`
}
// ValidationRule defines validation rules for configurations
type ValidationRule struct {
Field string `json:"field"`
Required bool `json:"required"`
Type string `json:"type"` // string, int, float, bool, array, object
MinValue *float64 `json:"min_value,omitempty"`
MaxValue *float64 `json:"max_value,omitempty"`
AllowedValues []string `json:"allowed_values,omitempty"`
Pattern string `json:"pattern,omitempty"` // regex pattern
}
// NewOptimizationConfigManager creates a new configuration manager
func NewOptimizationConfigManager(configDir string) *OptimizationConfigManager {
return &OptimizationConfigManager{
configDir: configDir,
loadedConfigs: make(map[string]*OptimizationConfig),
watchEnabled: false,
validationRules: getDefaultValidationRules(),
}
}
// LoadConfiguration loads optimization configuration from file
func (ocm *OptimizationConfigManager) LoadConfiguration(filePath string) (*OptimizationConfig, error) {
ocm.Lock()
defer ocm.Unlock()
// Check if already loaded
if config, exists := ocm.loadedConfigs[filePath]; exists {
return config, nil
}
// Read file
data, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read config file %s: %w", filePath, err)
}
// Parse based on file extension
config := &OptimizationConfig{}
ext := strings.ToLower(filepath.Ext(filePath))
switch ext {
case ".yaml", ".yml":
if err := yaml.Unmarshal(data, config); err != nil {
return nil, fmt.Errorf("failed to parse YAML config %s: %w", filePath, err)
}
case ".json":
if err := json.Unmarshal(data, config); err != nil {
return nil, fmt.Errorf("failed to parse JSON config %s: %w", filePath, err)
}
default:
return nil, fmt.Errorf("unsupported config file format: %s", ext)
}
// Validate configuration
if err := ocm.validateConfiguration(config); err != nil {
return nil, fmt.Errorf("configuration validation failed for %s: %w", filePath, err)
}
// Process and enhance configuration
ocm.processConfiguration(config)
// Cache the configuration
ocm.loadedConfigs[filePath] = config
glog.V(1).Infof("Loaded optimization configuration: %s (%d rules, %d templates)",
config.Name, len(config.Rules), len(config.Templates))
return config, nil
}
// LoadConfigurationDirectory loads all configuration files from a directory
func (ocm *OptimizationConfigManager) LoadConfigurationDirectory(dirPath string) ([]*OptimizationConfig, error) {
if _, err := os.Stat(dirPath); os.IsNotExist(err) {
return nil, fmt.Errorf("configuration directory does not exist: %s", dirPath)
}
configs := make([]*OptimizationConfig, 0)
err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
// Check if it's a config file
ext := strings.ToLower(filepath.Ext(path))
if ext != ".yaml" && ext != ".yml" && ext != ".json" {
return nil
}
config, loadErr := ocm.LoadConfiguration(path)
if loadErr != nil {
glog.Warningf("Failed to load configuration %s: %v", path, loadErr)
return nil // Continue loading other files
}
configs = append(configs, config)
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to walk configuration directory: %w", err)
}
glog.V(1).Infof("Loaded %d optimization configurations from directory: %s", len(configs), dirPath)
return configs, nil
}
// SaveConfiguration saves an optimization configuration to file
func (ocm *OptimizationConfigManager) SaveConfiguration(config *OptimizationConfig, filePath string) error {
// Validate configuration before saving
if err := ocm.validateConfiguration(config); err != nil {
return fmt.Errorf("cannot save invalid configuration: %w", err)
}
// Serialize based on file extension
ext := strings.ToLower(filepath.Ext(filePath))
var data []byte
var err error
switch ext {
case ".yaml", ".yml":
data, err = yaml.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal YAML: %w", err)
}
case ".json":
data, err = json.MarshalIndent(config, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
default:
return fmt.Errorf("unsupported config file format: %s", ext)
}
// Ensure directory exists
dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
// Write file
if err := ioutil.WriteFile(filePath, data, 0644); err != nil {
return fmt.Errorf("failed to write config file: %w", err)
}
// Update cache
ocm.Lock()
ocm.loadedConfigs[filePath] = config
ocm.Unlock()
glog.V(1).Infof("Saved optimization configuration: %s", filePath)
return nil
}
// GenerateDefaultConfiguration generates a comprehensive default configuration
func (ocm *OptimizationConfigManager) GenerateDefaultConfiguration() *OptimizationConfig {
return &OptimizationConfig{
Version: "1.0.0",
Name: "Default ML Optimization Configuration",
Description: "Comprehensive default optimization rules and templates for ML workloads",
Author: "SeaweedFS ML Optimization System",
Tags: []string{"default", "ml", "comprehensive"},
Rules: []*OptimizationRule{
{
ID: "smart_sequential_prefetch",
Name: "Smart Sequential Prefetching",
Description: "Intelligent prefetching based on access patterns and file characteristics",
Priority: 100,
Conditions: []RuleCondition{
{
Type: "access_pattern",
Property: "pattern_type",
Operator: "equals",
Value: "sequential",
Weight: 1.0,
},
{
Type: "file_context",
Property: "size",
Operator: "greater_than",
Value: 5 * 1024 * 1024, // 5MB
Weight: 0.7,
},
},
Actions: []RuleAction{
{
Type: "prefetch",
Target: "file",
Parameters: map[string]interface{}{
"strategy": "adaptive",
"initial_size": 8,
"max_size": 32,
"growth_factor": 1.5,
"confidence_based": true,
},
},
},
},
{
ID: "ml_file_type_optimization",
Name: "ML File Type Optimization",
Description: "Optimizations based on detected ML file types",
Priority: 95,
Conditions: []RuleCondition{
{
Type: "file_context",
Property: "type",
Operator: "in",
Value: []string{"model", "dataset", "checkpoint"},
Weight: 1.0,
},
},
Actions: []RuleAction{
{
Type: "smart_cache",
Target: "file",
Parameters: map[string]interface{}{
"strategy": "ml_aware",
"priority_boost": 2.0,
"retention_time": "extended",
},
},
},
},
{
ID: "workload_aware_coordination",
Name: "Workload-Aware Coordination",
Description: "Coordinate optimizations based on workload characteristics",
Priority: 85,
Conditions: []RuleCondition{
{
Type: "workload_context",
Property: "workload_type",
Operator: "in",
Value: []string{"training", "inference", "preprocessing"},
Weight: 0.9,
},
{
Type: "system_context",
Property: "gpu_count",
Operator: "greater_than",
Value: 0,
Weight: 0.6,
},
},
Actions: []RuleAction{
{
Type: "coordinate",
Target: "workload",
Parameters: map[string]interface{}{
"resource_aware": true,
"priority_scheduling": true,
"gpu_coordination": true,
},
},
},
},
},
Templates: []*OptimizationTemplate{
{
ID: "universal_ml_training",
Name: "Universal ML Training Template",
Description: "Framework-agnostic optimization template for ML training",
Category: "training",
Rules: []string{"smart_sequential_prefetch", "ml_file_type_optimization", "workload_aware_coordination"},
Parameters: map[string]interface{}{
"optimization_level": "balanced",
"resource_usage": "moderate",
"adaptivity": true,
},
},
{
ID: "inference_optimized",
Name: "Inference Optimization Template",
Description: "Low-latency optimization template for ML inference",
Category: "inference",
Rules: []string{"ml_file_type_optimization"},
Parameters: map[string]interface{}{
"optimization_level": "latency",
"preload_models": true,
"batch_processing": false,
},
},
},
Frameworks: map[string]FrameworkConfig{
"pytorch": {
Enabled: true,
Rules: []string{"smart_sequential_prefetch", "ml_file_type_optimization"},
Parameters: map[string]interface{}{
"dataloader_optimization": true,
"tensor_prefetch": true,
},
},
"tensorflow": {
Enabled: true,
Rules: []string{"smart_sequential_prefetch", "workload_aware_coordination"},
Parameters: map[string]interface{}{
"dataset_optimization": true,
"savedmodel_caching": true,
},
},
},
Settings: GlobalOptimizationSettings{
DefaultStrategy: "adaptive",
MaxConcurrentRules: 5,
ConfidenceThreshold: 0.6,
AdaptiveLearning: true,
MetricsCollection: true,
Debug: false,
MemoryLimitMB: 512,
CPULimitPercent: 20,
ExperimentalFeatures: map[string]bool{
"neural_optimization": false,
"quantum_prefetch": false,
"blockchain_cache": false, // Just kidding :)
},
},
Metadata: map[string]interface{}{
"generated_at": "auto",
"config_version": "1.0.0",
"compatible_with": []string{"seaweedfs-ml-v1"},
},
}
}
// validateConfiguration validates an optimization configuration
func (ocm *OptimizationConfigManager) validateConfiguration(config *OptimizationConfig) error {
if config == nil {
return fmt.Errorf("configuration is nil")
}
// Basic validation
if config.Name == "" {
return fmt.Errorf("configuration name is required")
}
if config.Version == "" {
return fmt.Errorf("configuration version is required")
}
// Validate rules
ruleIDs := make(map[string]bool)
for i, rule := range config.Rules {
if rule.ID == "" {
return fmt.Errorf("rule at index %d is missing ID", i)
}
if ruleIDs[rule.ID] {
return fmt.Errorf("duplicate rule ID: %s", rule.ID)
}
ruleIDs[rule.ID] = true
// Validate rule structure
if err := ocm.validateRule(rule); err != nil {
return fmt.Errorf("rule '%s' validation failed: %w", rule.ID, err)
}
}
// Validate templates
templateIDs := make(map[string]bool)
for i, template := range config.Templates {
if template.ID == "" {
return fmt.Errorf("template at index %d is missing ID", i)
}
if templateIDs[template.ID] {
return fmt.Errorf("duplicate template ID: %s", template.ID)
}
templateIDs[template.ID] = true
// Validate template references
for _, ruleID := range template.Rules {
if !ruleIDs[ruleID] {
return fmt.Errorf("template '%s' references unknown rule: %s", template.ID, ruleID)
}
}
}
// Validate settings
if config.Settings.ConfidenceThreshold < 0.0 || config.Settings.ConfidenceThreshold > 1.0 {
return fmt.Errorf("confidence threshold must be between 0.0 and 1.0")
}
if config.Settings.MaxConcurrentRules < 1 {
return fmt.Errorf("max concurrent rules must be at least 1")
}
return nil
}
// validateRule validates a single optimization rule
func (ocm *OptimizationConfigManager) validateRule(rule *OptimizationRule) error {
if rule.Name == "" {
return fmt.Errorf("rule name is required")
}
if rule.Priority < 0 {
return fmt.Errorf("rule priority must be non-negative")
}
// Validate conditions
for i, condition := range rule.Conditions {
if condition.Type == "" {
return fmt.Errorf("condition %d is missing type", i)
}
if condition.Property == "" {
return fmt.Errorf("condition %d is missing property", i)
}
if condition.Operator == "" {
return fmt.Errorf("condition %d is missing operator", i)
}
if condition.Weight < 0.0 || condition.Weight > 1.0 {
return fmt.Errorf("condition %d weight must be between 0.0 and 1.0", i)
}
}
// Validate actions
if len(rule.Actions) == 0 {
return fmt.Errorf("rule must have at least one action")
}
for i, action := range rule.Actions {
if action.Type == "" {
return fmt.Errorf("action %d is missing type", i)
}
if action.Target == "" {
return fmt.Errorf("action %d is missing target", i)
}
}
return nil
}
// processConfiguration processes and enhances a configuration after loading
func (ocm *OptimizationConfigManager) processConfiguration(config *OptimizationConfig) {
// Set default values
if config.Settings.DefaultStrategy == "" {
config.Settings.DefaultStrategy = "adaptive"
}
if config.Settings.MaxConcurrentRules == 0 {
config.Settings.MaxConcurrentRules = 3
}
if config.Settings.ConfidenceThreshold == 0.0 {
config.Settings.ConfidenceThreshold = 0.5
}
// Process metadata
if config.Metadata == nil {
config.Metadata = make(map[string]interface{})
}
config.Metadata["processed_at"] = "runtime"
config.Metadata["rule_count"] = len(config.Rules)
config.Metadata["template_count"] = len(config.Templates)
}
// getDefaultValidationRules returns default validation rules
func getDefaultValidationRules() map[string]ValidationRule {
return map[string]ValidationRule{
"confidence_threshold": {
Field: "confidence_threshold",
Required: true,
Type: "float",
MinValue: &[]float64{0.0}[0],
MaxValue: &[]float64{1.0}[0],
},
"max_concurrent_rules": {
Field: "max_concurrent_rules",
Required: true,
Type: "int",
MinValue: &[]float64{1.0}[0],
MaxValue: &[]float64{100.0}[0],
},
}
}
// ExportConfiguration exports configuration to different formats
func (ocm *OptimizationConfigManager) ExportConfiguration(config *OptimizationConfig, format string) ([]byte, error) {
switch strings.ToLower(format) {
case "json":
return json.MarshalIndent(config, "", " ")
case "yaml", "yml":
return yaml.Marshal(config)
default:
return nil, fmt.Errorf("unsupported export format: %s", format)
}
}
// GetLoadedConfigurations returns all currently loaded configurations
func (ocm *OptimizationConfigManager) GetLoadedConfigurations() map[string]*OptimizationConfig {
ocm.RLock()
defer ocm.RUnlock()
// Return a copy to prevent external modification
result := make(map[string]*OptimizationConfig)
for k, v := range ocm.loadedConfigs {
result[k] = v
}
return result
}
// ClearCache clears the configuration cache
func (ocm *OptimizationConfigManager) ClearCache() {
ocm.Lock()
defer ocm.Unlock()
ocm.loadedConfigs = make(map[string]*OptimizationConfig)
glog.V(1).Infof("Configuration cache cleared")
}
// ValidateConfigurationFile validates a configuration file without loading it
func (ocm *OptimizationConfigManager) ValidateConfigurationFile(filePath string) error {
data, err := ioutil.ReadFile(filePath)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
config := &OptimizationConfig{}
ext := strings.ToLower(filepath.Ext(filePath))
switch ext {
case ".yaml", ".yml":
if err := yaml.Unmarshal(data, config); err != nil {
return fmt.Errorf("YAML parsing error: %w", err)
}
case ".json":
if err := json.Unmarshal(data, config); err != nil {
return fmt.Errorf("JSON parsing error: %w", err)
}
default:
return fmt.Errorf("unsupported file format: %s", ext)
}
return ocm.validateConfiguration(config)
}