package ml import ( "context" "fmt" "path/filepath" "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" ) // TensorFormat represents different tensor file formats type TensorFormat int const ( TensorFormatUnknown TensorFormat = iota TensorFormatNumPy // .npy, .npz files TensorFormatPickle // Python pickle files TensorFormatTensorFlow // TensorFlow SavedModel, .pb files TensorFormatPyTorch // PyTorch .pt, .pth files TensorFormatONNX // ONNX .onnx files TensorFormatHDF5 // HDF5 .h5, .hdf5 files TensorFormatParquet // Apache Parquet files TensorFormatArrow // Apache Arrow files TensorFormatTensorRT // NVIDIA TensorRT engines TensorFormatCoreML // Apple CoreML models ) // TensorDataType represents tensor data types type TensorDataType int const ( TensorDataTypeUnknown TensorDataType = iota TensorDataTypeFloat32 TensorDataTypeFloat64 TensorDataTypeInt8 TensorDataTypeInt16 TensorDataTypeInt32 TensorDataTypeInt64 TensorDataTypeUInt8 TensorDataTypeUInt16 TensorDataTypeUInt32 TensorDataTypeUInt64 TensorDataTypeBool TensorDataTypeComplex64 TensorDataTypeComplex128 ) // TensorMetadata holds metadata about a tensor file type TensorMetadata struct { sync.RWMutex // File information FilePath string `json:"file_path"` FileName string `json:"file_name"` FileSize uint64 `json:"file_size"` Format TensorFormat `json:"format"` Checksum uint32 `json:"checksum"` // Tensor properties Shape []int64 `json:"shape"` // Tensor dimensions DataType TensorDataType `json:"data_type"` // Element data type ElementCount int64 `json:"element_count"` // Total number of elements ElementSize int `json:"element_size"` // Size of each element in bytes // Memory layout Strides []int64 `json:"strides"` // Memory strides ByteOrder string `json:"byte_order"` // little_endian, big_endian Alignment int `json:"alignment"` // Memory alignment Compressed bool `json:"compressed"` // Whether data is compressed // Access patterns AccessPattern AccessPattern `json:"access_pattern"` // How tensor is accessed SlicePatterns []SlicePattern `json:"slice_patterns"` // Common slice patterns HotRegions []TensorRegion `json:"hot_regions"` // Frequently accessed regions ColdRegions []TensorRegion `json:"cold_regions"` // Rarely accessed regions // Performance characteristics LoadTime time.Duration `json:"load_time"` // Time to load tensor ParseTime time.Duration `json:"parse_time"` // Time to parse metadata AccessCount int64 `json:"access_count"` // Total access count LastAccessed time.Time `json:"last_accessed"` // When last accessed // Optimization hints ShouldPreload bool `json:"should_preload"` // Should be preloaded OptimalChunkSize int64 `json:"optimal_chunk_size"` // Optimal chunk size for I/O PreferredLayout string `json:"preferred_layout"` // row_major, column_major CompressionRatio float64 `json:"compression_ratio"` // Achieved compression ratio } // SlicePattern represents a common tensor slicing pattern type SlicePattern struct { Pattern string `json:"pattern"` // e.g., "[:, 0:100, :]" Frequency int64 `json:"frequency"` // How often this pattern is used Size int64 `json:"size"` // Size of the slice in bytes Offset int64 `json:"offset"` // Starting byte offset LastUsed time.Time `json:"last_used"` // When pattern was last used } // TensorRegion represents a region of a tensor type TensorRegion struct { StartOffset int64 `json:"start_offset"` // Starting byte offset EndOffset int64 `json:"end_offset"` // Ending byte offset AccessCount int64 `json:"access_count"` // Number of accesses LastAccessed time.Time `json:"last_accessed"` // When last accessed Dimensions []int64 `json:"dimensions"` // Region dimensions } // TensorOptimizer optimizes tensor file access patterns type TensorOptimizer struct { sync.RWMutex // Configuration enabled bool // Whether tensor optimization is enabled analysisInterval time.Duration // How often to analyze patterns metadataCacheSize int // Number of metadata entries to cache compressionThreshold float64 // Compression threshold // Tensor tracking tensorMetadata map[string]*TensorMetadata // File path -> metadata formatDetectors map[TensorFormat]*FormatDetector // Format-specific detectors // Optimization state sliceCache *TensorSliceCache // Cache for tensor slices prefetchQueue []*TensorPrefetchRequest // Prefetch requests optimizationRules []*TensorOptimizationRule // Optimization rules // Performance tracking cacheHits int64 // Cache hits cacheMisses int64 // Cache misses totalBytesRead int64 // Total bytes read optimizedReads int64 // Optimized tensor reads // Background tasks ctx context.Context cancel context.CancelFunc // Metrics activeWorkloads int64 // Active tensor workloads optimizationEvents int64 // Optimization events } // FormatDetector detects and analyzes tensor file formats type FormatDetector struct { Format TensorFormat `json:"format"` FileExtensions []string `json:"file_extensions"` MagicBytes [][]byte `json:"magic_bytes"` MetadataParser func([]byte) (*TensorMetadata, error) `json:"-"` OptimalChunkSize int64 `json:"optimal_chunk_size"` } // TensorSliceCache caches tensor slices for efficient access type TensorSliceCache struct { sync.RWMutex maxSize uint64 // Maximum cache size in bytes currentSize uint64 // Current cache size in bytes entries map[string]*TensorSliceEntry // Cache entries accessOrder []string // LRU access order hitCount int64 // Cache hits missCount int64 // Cache misses } // TensorSliceEntry represents a cached tensor slice type TensorSliceEntry struct { Key string `json:"key"` // Cache key (file_path:slice_pattern) Data []byte `json:"data"` // Cached tensor data Size uint64 `json:"size"` // Size in bytes Metadata *TensorMetadata `json:"metadata"` // Associated metadata AccessCount int64 `json:"access_count"` // Access frequency LastAccess time.Time `json:"last_access"` // When last accessed ExpiryTime time.Time `json:"expiry_time"` // When cache entry expires } // TensorPrefetchRequest represents a tensor prefetch request type TensorPrefetchRequest struct { FilePath string `json:"file_path"` SlicePattern string `json:"slice_pattern"` Priority int `json:"priority"` RequestTime time.Time `json:"request_time"` EstimatedSize int64 `json:"estimated_size"` Reason string `json:"reason"` // Why prefetch was requested } // TensorOptimizationRule defines optimization rules for tensor access type TensorOptimizationRule struct { Name string `json:"name"` Condition string `json:"condition"` // shape[0] > 1000, format == numpy Action string `json:"action"` // compress, cache_slices, prefetch Parameters map[string]interface{} `json:"parameters"` FormatTypes []TensorFormat `json:"format_types"` // Applicable formats Priority int `json:"priority"` Enabled bool `json:"enabled"` } // NewTensorOptimizer creates a new tensor optimizer func NewTensorOptimizer(enabled bool) *TensorOptimizer { ctx, cancel := context.WithCancel(context.Background()) to := &TensorOptimizer{ enabled: enabled, analysisInterval: 60 * time.Second, // Analyze every minute metadataCacheSize: 1000, // Cache 1000 tensor metadata entries compressionThreshold: 0.8, // Compress if ratio > 0.8 tensorMetadata: make(map[string]*TensorMetadata), formatDetectors: make(map[TensorFormat]*FormatDetector), prefetchQueue: make([]*TensorPrefetchRequest, 0), optimizationRules: make([]*TensorOptimizationRule, 0), ctx: ctx, cancel: cancel, } // Initialize format detectors to.initializeFormatDetectors() // Initialize tensor slice cache to.sliceCache = &TensorSliceCache{ maxSize: 100 * 1024 * 1024, // 100MB cache currentSize: 0, entries: make(map[string]*TensorSliceEntry), accessOrder: make([]string, 0), } // Initialize optimization rules to.initializeTensorRules() if enabled { // Start optimization loop go to.optimizationLoop() glog.V(1).Infof("Tensor optimizer started with analysis interval %v", to.analysisInterval) } return to } // initializeFormatDetectors sets up format detectors for different tensor formats func (to *TensorOptimizer) initializeFormatDetectors() { // NumPy format detector to.formatDetectors[TensorFormatNumPy] = &FormatDetector{ Format: TensorFormatNumPy, FileExtensions: []string{".npy", ".npz"}, MagicBytes: [][]byte{{0x93, 0x4E, 0x55, 0x4D, 0x50, 0x59}}, // "\x93NUMPY" MetadataParser: to.parseNumPyMetadata, OptimalChunkSize: 64 * 1024, } // PyTorch format detector to.formatDetectors[TensorFormatPyTorch] = &FormatDetector{ Format: TensorFormatPyTorch, FileExtensions: []string{".pt", ".pth"}, MagicBytes: [][]byte{{0x50, 0x4B, 0x03, 0x04}}, // ZIP signature (PyTorch uses ZIP) MetadataParser: to.parsePyTorchMetadata, OptimalChunkSize: 128 * 1024, } // TensorFlow format detector to.formatDetectors[TensorFormatTensorFlow] = &FormatDetector{ Format: TensorFormatTensorFlow, FileExtensions: []string{".pb", ".pbtxt"}, MagicBytes: [][]byte{}, // Protocol Buffers don't have fixed magic bytes MetadataParser: to.parseTensorFlowMetadata, OptimalChunkSize: 256 * 1024, } // ONNX format detector to.formatDetectors[TensorFormatONNX] = &FormatDetector{ Format: TensorFormatONNX, FileExtensions: []string{".onnx"}, MagicBytes: [][]byte{}, // ONNX uses Protocol Buffers MetadataParser: to.parseONNXMetadata, OptimalChunkSize: 256 * 1024, } // HDF5 format detector to.formatDetectors[TensorFormatHDF5] = &FormatDetector{ Format: TensorFormatHDF5, FileExtensions: []string{".h5", ".hdf5"}, MagicBytes: [][]byte{{0x89, 0x48, 0x44, 0x46, 0x0D, 0x0A, 0x1A, 0x0A}}, // HDF5 signature MetadataParser: to.parseHDF5Metadata, OptimalChunkSize: 512 * 1024, } } // initializeTensorRules sets up default tensor optimization rules func (to *TensorOptimizer) initializeTensorRules() { // Rule 1: Cache small frequently accessed tensors to.optimizationRules = append(to.optimizationRules, &TensorOptimizationRule{ Name: "cache_small_frequent_tensors", Condition: "file_size < 10MB AND access_count > 10", Action: "cache_entire_tensor", Parameters: map[string]interface{}{"cache_ttl": "1h"}, FormatTypes: []TensorFormat{TensorFormatNumPy, TensorFormatPyTorch}, Priority: 20, Enabled: true, }) // Rule 2: Prefetch commonly sliced regions to.optimizationRules = append(to.optimizationRules, &TensorOptimizationRule{ Name: "prefetch_common_slices", Condition: "slice_pattern_frequency > 5", Action: "prefetch_slices", Parameters: map[string]interface{}{"max_prefetch_size": "50MB"}, FormatTypes: []TensorFormat{TensorFormatNumPy, TensorFormatHDF5}, Priority: 15, Enabled: true, }) // Rule 3: Compress large infrequently accessed tensors to.optimizationRules = append(to.optimizationRules, &TensorOptimizationRule{ Name: "compress_large_cold_tensors", Condition: "file_size > 100MB AND access_frequency < 0.1", Action: "enable_compression", Parameters: map[string]interface{}{"compression_algorithm": "lz4"}, FormatTypes: []TensorFormat{TensorFormatNumPy, TensorFormatTensorFlow}, Priority: 5, Enabled: true, }) // Rule 4: Optimize tensor layout for strided access to.optimizationRules = append(to.optimizationRules, &TensorOptimizationRule{ Name: "optimize_strided_access", Condition: "access_pattern == 'strided' AND shape[0] > 1000", Action: "suggest_layout_change", Parameters: map[string]interface{}{"preferred_layout": "column_major"}, FormatTypes: []TensorFormat{TensorFormatNumPy, TensorFormatPyTorch, TensorFormatHDF5}, Priority: 10, Enabled: true, }) } // AnalyzeTensorFile analyzes a tensor file and extracts metadata func (to *TensorOptimizer) AnalyzeTensorFile(filePath string, fileSize uint64) (*TensorMetadata, error) { to.Lock() defer to.Unlock() // Check if metadata already exists if metadata, exists := to.tensorMetadata[filePath]; exists { metadata.Lock() metadata.AccessCount++ metadata.LastAccessed = time.Now() metadata.Unlock() return metadata, nil } // Detect tensor format format := to.detectTensorFormat(filePath) if format == TensorFormatUnknown { return nil, fmt.Errorf("unknown tensor format for file: %s", filePath) } // Parse tensor metadata detector := to.formatDetectors[format] if detector == nil { return nil, fmt.Errorf("no detector available for format: %v", format) } // Read file header to extract metadata // In production, this would read the actual file metadata := &TensorMetadata{ FilePath: filePath, FileName: filepath.Base(filePath), FileSize: fileSize, Format: format, OptimalChunkSize: detector.OptimalChunkSize, AccessCount: 1, LastAccessed: time.Now(), AccessPattern: RandomAccess, SlicePatterns: make([]SlicePattern, 0), HotRegions: make([]TensorRegion, 0), ColdRegions: make([]TensorRegion, 0), } // Store metadata to.tensorMetadata[filePath] = metadata glog.V(2).Infof("Analyzed tensor file: %s, format: %v, size: %d bytes", filePath, format, fileSize) return metadata, nil } // detectTensorFormat detects the format of a tensor file func (to *TensorOptimizer) detectTensorFormat(filePath string) TensorFormat { ext := strings.ToLower(filepath.Ext(filePath)) // Check by file extension first for format, detector := range to.formatDetectors { for _, supportedExt := range detector.FileExtensions { if ext == supportedExt { return format } } } // TODO: In production, would also check magic bytes by reading file header return TensorFormatUnknown } // RecordTensorAccess records a tensor access for optimization analysis func (to *TensorOptimizer) RecordTensorAccess(filePath string, offset int64, size int, accessPattern AccessPattern) { to.Lock() defer to.Unlock() metadata, exists := to.tensorMetadata[filePath] if !exists { // Try to analyze the file if md, err := to.AnalyzeTensorFile(filePath, 0); err == nil { metadata = md } else { return } } metadata.Lock() metadata.AccessCount++ metadata.LastAccessed = time.Now() metadata.AccessPattern = accessPattern // Track access regions region := TensorRegion{ StartOffset: offset, EndOffset: offset + int64(size), AccessCount: 1, LastAccessed: time.Now(), } // Add to hot regions if frequently accessed to.updateHotColdRegions(metadata, region) metadata.Unlock() to.totalBytesRead += int64(size) } // updateHotColdRegions updates hot and cold regions based on access patterns func (to *TensorOptimizer) updateHotColdRegions(metadata *TensorMetadata, newRegion TensorRegion) { // Simple implementation - could be made more sophisticated const hotThreshold = 5 // Access count threshold for hot regions // Check if region overlaps with existing hot regions for i, hotRegion := range metadata.HotRegions { if to.regionsOverlap(newRegion, hotRegion) { metadata.HotRegions[i].AccessCount++ metadata.HotRegions[i].LastAccessed = time.Now() return } } // Add as new region if access count is high enough if newRegion.AccessCount >= hotThreshold { metadata.HotRegions = append(metadata.HotRegions, newRegion) } else { metadata.ColdRegions = append(metadata.ColdRegions, newRegion) } // Keep only recent regions (limit memory usage) if len(metadata.HotRegions) > 100 { metadata.HotRegions = metadata.HotRegions[len(metadata.HotRegions)-50:] } if len(metadata.ColdRegions) > 100 { metadata.ColdRegions = metadata.ColdRegions[len(metadata.ColdRegions)-50:] } } // regionsOverlap checks if two tensor regions overlap func (to *TensorOptimizer) regionsOverlap(region1, region2 TensorRegion) bool { return region1.StartOffset < region2.EndOffset && region2.StartOffset < region1.EndOffset } // GetTensorOptimization provides optimization recommendations for tensor access func (to *TensorOptimizer) GetTensorOptimization(filePath string) *TensorAccessOptimization { to.RLock() metadata := to.tensorMetadata[filePath] to.RUnlock() if metadata == nil { return &TensorAccessOptimization{ ShouldCache: false, PrefetchSize: 64 * 1024, CompressionHint: "none", } } metadata.RLock() defer metadata.RUnlock() optimization := &TensorAccessOptimization{ FilePath: filePath, Format: metadata.Format, ShouldCache: false, PrefetchSize: metadata.OptimalChunkSize, CompressionHint: "none", LayoutHint: "row_major", SliceOptimizations: make([]SliceOptimization, 0), } // Determine if tensor should be cached if metadata.FileSize < 10*1024*1024 && metadata.AccessCount > 10 { optimization.ShouldCache = true optimization.CacheTTL = time.Hour } // Suggest compression for large infrequently accessed tensors if metadata.FileSize > 100*1024*1024 && metadata.AccessCount < 5 { optimization.CompressionHint = "lz4" } // Optimize based on access patterns switch metadata.AccessPattern { case SequentialAccess: optimization.PrefetchSize *= 4 // Larger prefetch for sequential access optimization.LayoutHint = "row_major" case StridedAccess: optimization.LayoutHint = "column_major" // Better for strided access optimization.PrefetchSize /= 2 // Smaller prefetch to avoid waste case RandomAccess: optimization.PrefetchSize = 64 * 1024 // Conservative prefetch optimization.ShouldCache = metadata.AccessCount > 20 // Cache if very frequent } // Analyze slice patterns for optimization for _, pattern := range metadata.SlicePatterns { if pattern.Frequency > 3 { sliceOpt := SliceOptimization{ Pattern: pattern.Pattern, ShouldCache: true, PrefetchSize: pattern.Size, Priority: int(pattern.Frequency), } optimization.SliceOptimizations = append(optimization.SliceOptimizations, sliceOpt) } } return optimization } // TensorAccessOptimization holds optimization recommendations for tensor access type TensorAccessOptimization struct { FilePath string `json:"file_path"` Format TensorFormat `json:"format"` ShouldCache bool `json:"should_cache"` CacheTTL time.Duration `json:"cache_ttl"` PrefetchSize int64 `json:"prefetch_size"` CompressionHint string `json:"compression_hint"` LayoutHint string `json:"layout_hint"` SliceOptimizations []SliceOptimization `json:"slice_optimizations"` } // SliceOptimization holds optimization recommendations for tensor slices type SliceOptimization struct { Pattern string `json:"pattern"` ShouldCache bool `json:"should_cache"` PrefetchSize int64 `json:"prefetch_size"` Priority int `json:"priority"` } // optimizationLoop runs the main tensor optimization loop func (to *TensorOptimizer) optimizationLoop() { ticker := time.NewTicker(to.analysisInterval) defer ticker.Stop() for { select { case <-to.ctx.Done(): return case <-ticker.C: to.performTensorOptimization() } } } // performTensorOptimization performs tensor optimizations func (to *TensorOptimizer) performTensorOptimization() { to.Lock() defer to.Unlock() // Apply optimization rules for _, rule := range to.optimizationRules { if !rule.Enabled { continue } for filePath, metadata := range to.tensorMetadata { if to.evaluateTensorCondition(metadata, rule.Condition) && to.formatMatches(metadata.Format, rule.FormatTypes) { to.executeTensorAction(filePath, rule) to.optimizationEvents++ } } } // Clean up old metadata to.cleanupTensorMetadata() // Update slice cache to.updateSliceCache() } // evaluateTensorCondition evaluates a tensor optimization condition func (to *TensorOptimizer) evaluateTensorCondition(metadata *TensorMetadata, condition string) bool { metadata.RLock() defer metadata.RUnlock() if strings.Contains(condition, "file_size < 10MB") { return metadata.FileSize < 10*1024*1024 } if strings.Contains(condition, "access_count > 10") { return metadata.AccessCount > 10 } if strings.Contains(condition, "file_size > 100MB") { return metadata.FileSize > 100*1024*1024 } if strings.Contains(condition, "access_pattern == 'strided'") { return metadata.AccessPattern == StridedAccess } return false } // formatMatches checks if a format matches the allowed formats func (to *TensorOptimizer) formatMatches(format TensorFormat, allowedFormats []TensorFormat) bool { for _, allowed := range allowedFormats { if format == allowed { return true } } return false } // executeTensorAction executes a tensor optimization action func (to *TensorOptimizer) executeTensorAction(filePath string, rule *TensorOptimizationRule) { switch rule.Action { case "cache_entire_tensor": to.cacheEntireTensor(filePath, rule.Parameters) case "prefetch_slices": to.prefetchTensorSlices(filePath, rule.Parameters) case "enable_compression": to.enableTensorCompression(filePath, rule.Parameters) case "suggest_layout_change": to.suggestLayoutChange(filePath, rule.Parameters) default: glog.V(3).Infof("Unknown tensor optimization action: %s", rule.Action) } glog.V(2).Infof("Executed tensor optimization: %s -> %s for file %s", rule.Name, rule.Action, filePath) } // Action implementations func (to *TensorOptimizer) cacheEntireTensor(filePath string, params map[string]interface{}) { glog.V(3).Infof("Caching entire tensor: %s", filePath) // Implementation would cache the full tensor in memory } func (to *TensorOptimizer) prefetchTensorSlices(filePath string, params map[string]interface{}) { glog.V(3).Infof("Prefetching tensor slices for: %s", filePath) // Implementation would prefetch commonly accessed slices } func (to *TensorOptimizer) enableTensorCompression(filePath string, params map[string]interface{}) { algorithm := "lz4" if alg, ok := params["compression_algorithm"].(string); ok { algorithm = alg } glog.V(3).Infof("Enabling compression (%s) for tensor: %s", algorithm, filePath) } func (to *TensorOptimizer) suggestLayoutChange(filePath string, params map[string]interface{}) { layout := "row_major" if l, ok := params["preferred_layout"].(string); ok { layout = l } glog.V(3).Infof("Suggesting layout change (%s) for tensor: %s", layout, filePath) } // Metadata parsers for different formats func (to *TensorOptimizer) parseNumPyMetadata(data []byte) (*TensorMetadata, error) { // Simplified NumPy .npy format parsing // Real implementation would properly parse the NumPy header metadata := &TensorMetadata{ Format: TensorFormatNumPy, DataType: TensorDataTypeFloat32, // Default assumption ElementSize: 4, // 4 bytes for float32 ByteOrder: "little_endian", // NumPy default Alignment: 8, // Default alignment } return metadata, nil } func (to *TensorOptimizer) parsePyTorchMetadata(data []byte) (*TensorMetadata, error) { // Simplified PyTorch format parsing // Real implementation would parse the PyTorch pickle format metadata := &TensorMetadata{ Format: TensorFormatPyTorch, DataType: TensorDataTypeFloat32, ElementSize: 4, ByteOrder: "little_endian", Alignment: 8, } return metadata, nil } func (to *TensorOptimizer) parseTensorFlowMetadata(data []byte) (*TensorMetadata, error) { // Simplified TensorFlow format parsing // Real implementation would parse Protocol Buffer format metadata := &TensorMetadata{ Format: TensorFormatTensorFlow, DataType: TensorDataTypeFloat32, ElementSize: 4, ByteOrder: "little_endian", Alignment: 8, } return metadata, nil } func (to *TensorOptimizer) parseONNXMetadata(data []byte) (*TensorMetadata, error) { // Simplified ONNX format parsing // Real implementation would parse ONNX Protocol Buffer format metadata := &TensorMetadata{ Format: TensorFormatONNX, DataType: TensorDataTypeFloat32, ElementSize: 4, ByteOrder: "little_endian", Alignment: 8, } return metadata, nil } func (to *TensorOptimizer) parseHDF5Metadata(data []byte) (*TensorMetadata, error) { // Simplified HDF5 format parsing // Real implementation would use HDF5 library metadata := &TensorMetadata{ Format: TensorFormatHDF5, DataType: TensorDataTypeFloat64, ElementSize: 8, ByteOrder: "little_endian", Alignment: 8, } return metadata, nil } // Helper functions func (to *TensorOptimizer) cleanupTensorMetadata() { cutoffTime := time.Now().Add(-24 * time.Hour) for filePath, metadata := range to.tensorMetadata { metadata.RLock() shouldRemove := metadata.LastAccessed.Before(cutoffTime) metadata.RUnlock() if shouldRemove { delete(to.tensorMetadata, filePath) } } } func (to *TensorOptimizer) updateSliceCache() { // Update slice cache statistics to.sliceCache.Lock() // Calculate cache hit rate totalAccesses := to.sliceCache.hitCount + to.sliceCache.missCount if totalAccesses > 0 { hitRate := float64(to.sliceCache.hitCount) / float64(totalAccesses) glog.V(4).Infof("Tensor slice cache hit rate: %.2f%%", hitRate*100) } // Evict expired entries now := time.Now() for key, entry := range to.sliceCache.entries { if now.After(entry.ExpiryTime) { to.sliceCache.currentSize -= entry.Size delete(to.sliceCache.entries, key) // Remove from access order for i, k := range to.sliceCache.accessOrder { if k == key { to.sliceCache.accessOrder = append(to.sliceCache.accessOrder[:i], to.sliceCache.accessOrder[i+1:]...) break } } } } to.sliceCache.Unlock() } // GetTensorMetrics returns comprehensive tensor optimization metrics func (to *TensorOptimizer) GetTensorMetrics() TensorOptimizerMetrics { to.RLock() defer to.RUnlock() metrics := TensorOptimizerMetrics{ TrackedTensors: int64(len(to.tensorMetadata)), TotalBytesRead: to.totalBytesRead, OptimizedReads: to.optimizedReads, CacheHits: to.cacheHits, CacheMisses: to.cacheMisses, OptimizationEvents: to.optimizationEvents, FormatCounts: make(map[TensorFormat]int64), } // Calculate cache hit rate if metrics.CacheHits+metrics.CacheMisses > 0 { metrics.CacheHitRate = float64(metrics.CacheHits) / float64(metrics.CacheHits+metrics.CacheMisses) } // Count tensors by format for _, metadata := range to.tensorMetadata { metadata.RLock() metrics.FormatCounts[metadata.Format]++ metadata.RUnlock() } return metrics } // TensorOptimizerMetrics holds metrics for tensor optimization type TensorOptimizerMetrics struct { TrackedTensors int64 `json:"tracked_tensors"` TotalBytesRead int64 `json:"total_bytes_read"` OptimizedReads int64 `json:"optimized_reads"` CacheHits int64 `json:"cache_hits"` CacheMisses int64 `json:"cache_misses"` CacheHitRate float64 `json:"cache_hit_rate"` OptimizationEvents int64 `json:"optimization_events"` FormatCounts map[TensorFormat]int64 `json:"format_counts"` } // Shutdown gracefully shuts down the tensor optimizer func (to *TensorOptimizer) Shutdown() { if to.cancel != nil { to.cancel() } glog.V(1).Infof("Tensor optimizer shutdown complete") } // String methods for enums func (tf TensorFormat) String() string { switch tf { case TensorFormatNumPy: return "NumPy" case TensorFormatPickle: return "Pickle" case TensorFormatTensorFlow: return "TensorFlow" case TensorFormatPyTorch: return "PyTorch" case TensorFormatONNX: return "ONNX" case TensorFormatHDF5: return "HDF5" case TensorFormatParquet: return "Parquet" case TensorFormatArrow: return "Arrow" case TensorFormatTensorRT: return "TensorRT" case TensorFormatCoreML: return "CoreML" default: return "Unknown" } } func (tdt TensorDataType) String() string { switch tdt { case TensorDataTypeFloat32: return "Float32" case TensorDataTypeFloat64: return "Float64" case TensorDataTypeInt32: return "Int32" case TensorDataTypeInt64: return "Int64" case TensorDataTypeBool: return "Bool" default: return "Unknown" } }