mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-10 05:12:47 +02:00
577 lines
15 KiB
Go
577 lines
15 KiB
Go
package ml
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
)
|
|
|
|
// ChunkMetadata contains metadata about a cached chunk
|
|
type ChunkMetadata struct {
|
|
FileId string // Chunk file ID
|
|
Offset uint64 // Offset within the file
|
|
Size uint64 // Size of the chunk
|
|
CacheLevel int // 0=memory, 1=disk, 2=not cached
|
|
LastAccess time.Time // Last access time
|
|
AccessCount int64 // Number of times accessed
|
|
IsHot bool // Whether this chunk is frequently accessed
|
|
Pattern AccessPattern // Access pattern for this chunk
|
|
}
|
|
|
|
// OpenFileInfo contains comprehensive information about an open file
|
|
type OpenFileInfo struct {
|
|
sync.RWMutex
|
|
|
|
// Basic file information
|
|
Inode uint64 // File inode
|
|
Entry *filer_pb.Entry // File entry from filer
|
|
OpenCount int // Number of open handles
|
|
OpenTime time.Time // When file was first opened
|
|
LastAccess time.Time // Last access time
|
|
|
|
// Chunk-level caching
|
|
ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata
|
|
ChunkCount uint32 // Total number of chunks in file
|
|
ChunkSize int64 // Size of each chunk
|
|
|
|
// Access pattern tracking
|
|
AccessInfo *AccessInfo // Access pattern information
|
|
ReadPattern AccessPattern // Overall file access pattern
|
|
PrefetchState PrefetchState // Current prefetch state
|
|
|
|
// ML-specific optimizations
|
|
IsMLFile bool // Whether this is likely an ML-related file
|
|
FileType MLFileType // Type of ML file (dataset, model, etc.)
|
|
BatchSize int // Detected batch size for training data
|
|
EpochCount int // Number of epochs detected
|
|
|
|
// Performance tracking
|
|
TotalBytesRead int64 // Total bytes read from this file
|
|
CacheHitCount int64 // Number of cache hits
|
|
CacheMissCount int64 // Number of cache misses
|
|
PrefetchHitCount int64 // Number of prefetch hits
|
|
}
|
|
|
|
// PrefetchState represents the current prefetch state for a file
|
|
type PrefetchState int
|
|
|
|
const (
|
|
PrefetchIdle PrefetchState = iota
|
|
PrefetchActive
|
|
PrefetchComplete
|
|
PrefetchSuspended
|
|
)
|
|
|
|
// MLFileType represents the type of ML-related file
|
|
type MLFileType int
|
|
|
|
const (
|
|
MLFileUnknown MLFileType = iota
|
|
MLFileDataset // Training/validation dataset
|
|
MLFileModel // Model checkpoint/weights
|
|
MLFileConfig // Configuration files
|
|
MLFileTensor // Individual tensor files
|
|
MLFileLog // Training logs
|
|
)
|
|
|
|
// OpenFileCache manages open file information with ML-aware optimizations
|
|
type OpenFileCache struct {
|
|
sync.RWMutex
|
|
|
|
// Configuration
|
|
maxFiles int // Maximum number of files to track
|
|
ttl time.Duration // TTL for inactive files
|
|
cleanupInterval time.Duration // Cleanup interval
|
|
|
|
// File tracking
|
|
files map[uint64]*OpenFileInfo // inode -> file info
|
|
accessOrder []uint64 // LRU order for eviction
|
|
|
|
// ML-specific configuration
|
|
enableMLOptimization bool
|
|
mlFileDetector *MLFileDetector
|
|
|
|
// Metrics
|
|
totalFiles int64
|
|
evictedFiles int64
|
|
cacheHits int64
|
|
cacheMisses int64
|
|
|
|
// Background cleanup
|
|
shutdown chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
// MLFileDetector detects ML-related files based on patterns and metadata
|
|
type MLFileDetector struct {
|
|
// File extension patterns
|
|
datasetExtensions map[string]bool
|
|
modelExtensions map[string]bool
|
|
configExtensions map[string]bool
|
|
|
|
// Path patterns
|
|
datasetPaths []string
|
|
modelPaths []string
|
|
|
|
// Size heuristics
|
|
modelMinSize int64 // Minimum size for model files
|
|
datasetMaxItems int // Maximum items in dataset directory
|
|
}
|
|
|
|
// NewOpenFileCache creates a new open file cache optimized for ML workloads
|
|
func NewOpenFileCache(maxFiles int, ttl time.Duration) *OpenFileCache {
|
|
if maxFiles <= 0 {
|
|
maxFiles = 1000 // Default suitable for ML workloads
|
|
}
|
|
if ttl <= 0 {
|
|
ttl = 30 * time.Minute // Default TTL
|
|
}
|
|
|
|
ofc := &OpenFileCache{
|
|
maxFiles: maxFiles,
|
|
ttl: ttl,
|
|
cleanupInterval: 5 * time.Minute,
|
|
files: make(map[uint64]*OpenFileInfo),
|
|
accessOrder: make([]uint64, 0, maxFiles),
|
|
enableMLOptimization: true,
|
|
mlFileDetector: newMLFileDetector(),
|
|
shutdown: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
// Start background cleanup
|
|
go ofc.cleanupWorker()
|
|
|
|
glog.V(1).Infof("OpenFileCache initialized: maxFiles=%d, ttl=%v", maxFiles, ttl)
|
|
return ofc
|
|
}
|
|
|
|
// newMLFileDetector creates a new ML file detector with common patterns
|
|
func newMLFileDetector() *MLFileDetector {
|
|
return &MLFileDetector{
|
|
datasetExtensions: map[string]bool{
|
|
"jpg": true, "jpeg": true, "png": true, "bmp": true, "tiff": true,
|
|
"wav": true, "mp3": true, "flac": true,
|
|
"txt": true, "csv": true, "json": true, "jsonl": true,
|
|
"parquet": true, "arrow": true, "h5": true, "hdf5": true,
|
|
"tfrecord": true, "tfrecords": true,
|
|
},
|
|
modelExtensions: map[string]bool{
|
|
"pt": true, "pth": true, "pkl": true, "pickle": true,
|
|
"h5": true, "hdf5": true, "pb": true, "pbtxt": true,
|
|
"onnx": true, "tflite": true, "caffemodel": true,
|
|
"bin": true, "safetensors": true,
|
|
},
|
|
configExtensions: map[string]bool{
|
|
"yaml": true, "yml": true, "json": true, "toml": true,
|
|
"cfg": true, "config": true, "conf": true,
|
|
},
|
|
datasetPaths: []string{
|
|
"/datasets", "/data", "/train", "/test", "/val", "/validation",
|
|
"/images", "/audio", "/text", "/corpus",
|
|
},
|
|
modelPaths: []string{
|
|
"/models", "/checkpoints", "/weights", "/pretrained",
|
|
"/saved_models", "/exports",
|
|
},
|
|
modelMinSize: 1024 * 1024, // 1MB minimum for model files
|
|
datasetMaxItems: 1000000, // 1M max items in dataset directory
|
|
}
|
|
}
|
|
|
|
// OpenFile registers a file as opened and initializes tracking
|
|
func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath string) *OpenFileInfo {
|
|
ofc.Lock()
|
|
defer ofc.Unlock()
|
|
|
|
// Get or create file info
|
|
fileInfo := ofc.files[inode]
|
|
if fileInfo == nil {
|
|
fileInfo = &OpenFileInfo{
|
|
Inode: inode,
|
|
Entry: entry,
|
|
OpenTime: time.Now(),
|
|
ChunkCache: make(map[uint32]*ChunkMetadata),
|
|
AccessInfo: &AccessInfo{Inode: inode},
|
|
ReadPattern: RandomAccess,
|
|
PrefetchState: PrefetchIdle,
|
|
}
|
|
|
|
// Detect ML file type
|
|
if ofc.enableMLOptimization {
|
|
fileInfo.IsMLFile, fileInfo.FileType = ofc.mlFileDetector.DetectMLFile(entry, fullPath)
|
|
if fileInfo.IsMLFile {
|
|
glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s",
|
|
inode, fileInfo.FileType, fullPath)
|
|
}
|
|
}
|
|
|
|
ofc.files[inode] = fileInfo
|
|
ofc.totalFiles++
|
|
|
|
// Update access order for LRU
|
|
ofc.updateAccessOrder(inode)
|
|
|
|
// Evict if necessary
|
|
if len(ofc.files) > ofc.maxFiles {
|
|
ofc.evictLRU()
|
|
}
|
|
}
|
|
|
|
fileInfo.OpenCount++
|
|
fileInfo.LastAccess = time.Now()
|
|
ofc.updateAccessOrder(inode)
|
|
|
|
glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v",
|
|
inode, fileInfo.OpenCount, fileInfo.IsMLFile)
|
|
|
|
return fileInfo
|
|
}
|
|
|
|
// CloseFile decrements the open count and potentially cleans up
|
|
func (ofc *OpenFileCache) CloseFile(inode uint64) bool {
|
|
ofc.Lock()
|
|
defer ofc.Unlock()
|
|
|
|
fileInfo := ofc.files[inode]
|
|
if fileInfo == nil {
|
|
return true // Already cleaned up
|
|
}
|
|
|
|
fileInfo.OpenCount--
|
|
glog.V(4).Infof("File closed: inode=%d, openCount=%d", inode, fileInfo.OpenCount)
|
|
|
|
// Return true if file can be evicted (no more open handles)
|
|
return fileInfo.OpenCount <= 0
|
|
}
|
|
|
|
// GetFileInfo retrieves file information if cached
|
|
func (ofc *OpenFileCache) GetFileInfo(inode uint64) *OpenFileInfo {
|
|
ofc.RLock()
|
|
defer ofc.RUnlock()
|
|
|
|
fileInfo := ofc.files[inode]
|
|
if fileInfo != nil {
|
|
fileInfo.LastAccess = time.Now()
|
|
ofc.cacheHits++
|
|
return fileInfo
|
|
}
|
|
|
|
ofc.cacheMisses++
|
|
return nil
|
|
}
|
|
|
|
// UpdateChunkCache updates chunk metadata for a file
|
|
func (ofc *OpenFileCache) UpdateChunkCache(inode uint64, chunkIndex uint32, metadata *ChunkMetadata) {
|
|
ofc.RLock()
|
|
fileInfo := ofc.files[inode]
|
|
ofc.RUnlock()
|
|
|
|
if fileInfo == nil {
|
|
return
|
|
}
|
|
|
|
fileInfo.Lock()
|
|
defer fileInfo.Unlock()
|
|
|
|
fileInfo.ChunkCache[chunkIndex] = metadata
|
|
metadata.LastAccess = time.Now()
|
|
metadata.AccessCount++
|
|
|
|
glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d",
|
|
inode, chunkIndex, metadata.CacheLevel)
|
|
}
|
|
|
|
// GetChunkMetadata retrieves chunk metadata if available
|
|
func (ofc *OpenFileCache) GetChunkMetadata(inode uint64, chunkIndex uint32) (*ChunkMetadata, bool) {
|
|
ofc.RLock()
|
|
fileInfo := ofc.files[inode]
|
|
ofc.RUnlock()
|
|
|
|
if fileInfo == nil {
|
|
return nil, false
|
|
}
|
|
|
|
fileInfo.RLock()
|
|
defer fileInfo.RUnlock()
|
|
|
|
metadata, exists := fileInfo.ChunkCache[chunkIndex]
|
|
if exists {
|
|
metadata.LastAccess = time.Now()
|
|
metadata.AccessCount++
|
|
}
|
|
|
|
return metadata, exists
|
|
}
|
|
|
|
// updateAccessOrder updates the LRU access order
|
|
func (ofc *OpenFileCache) updateAccessOrder(inode uint64) {
|
|
// Remove from current position
|
|
for i, ino := range ofc.accessOrder {
|
|
if ino == inode {
|
|
ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Add to front (most recently used)
|
|
ofc.accessOrder = append([]uint64{inode}, ofc.accessOrder...)
|
|
}
|
|
|
|
// evictLRU evicts the least recently used file
|
|
func (ofc *OpenFileCache) evictLRU() {
|
|
if len(ofc.accessOrder) == 0 {
|
|
return
|
|
}
|
|
|
|
// Find LRU file that can be evicted (not currently open)
|
|
for i := len(ofc.accessOrder) - 1; i >= 0; i-- {
|
|
inode := ofc.accessOrder[i]
|
|
fileInfo := ofc.files[inode]
|
|
|
|
if fileInfo != nil && fileInfo.OpenCount <= 0 {
|
|
// Evict this file
|
|
delete(ofc.files, inode)
|
|
ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...)
|
|
ofc.evictedFiles++
|
|
|
|
glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d",
|
|
inode, len(fileInfo.ChunkCache))
|
|
return
|
|
}
|
|
}
|
|
|
|
// If no files can be evicted, just log a warning
|
|
glog.V(2).Infof("Warning: Could not evict any files from cache (all files are open)")
|
|
}
|
|
|
|
// cleanupWorker periodically cleans up expired entries
|
|
func (ofc *OpenFileCache) cleanupWorker() {
|
|
ticker := time.NewTicker(ofc.cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
ofc.cleanup()
|
|
case <-ofc.shutdown:
|
|
close(ofc.done)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup removes expired file entries
|
|
func (ofc *OpenFileCache) cleanup() {
|
|
ofc.Lock()
|
|
defer ofc.Unlock()
|
|
|
|
now := time.Now()
|
|
toRemove := make([]uint64, 0)
|
|
|
|
for inode, fileInfo := range ofc.files {
|
|
// Only cleanup files that are not open and have expired
|
|
if fileInfo.OpenCount <= 0 && now.Sub(fileInfo.LastAccess) > ofc.ttl {
|
|
toRemove = append(toRemove, inode)
|
|
}
|
|
}
|
|
|
|
// Remove expired files
|
|
for _, inode := range toRemove {
|
|
delete(ofc.files, inode)
|
|
// Remove from access order
|
|
for i, ino := range ofc.accessOrder {
|
|
if ino == inode {
|
|
ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(toRemove) > 0 {
|
|
glog.V(3).Infof("Cleaned up %d expired file cache entries", len(toRemove))
|
|
}
|
|
}
|
|
|
|
// GetMetrics returns cache metrics
|
|
func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics {
|
|
ofc.RLock()
|
|
defer ofc.RUnlock()
|
|
|
|
var totalChunks int64
|
|
var mlFiles int64
|
|
fileTypes := make(map[MLFileType]int)
|
|
patterns := make(map[AccessPattern]int)
|
|
|
|
for _, fileInfo := range ofc.files {
|
|
totalChunks += int64(len(fileInfo.ChunkCache))
|
|
if fileInfo.IsMLFile {
|
|
mlFiles++
|
|
fileTypes[fileInfo.FileType]++
|
|
}
|
|
patterns[fileInfo.ReadPattern]++
|
|
}
|
|
|
|
return OpenFileCacheMetrics{
|
|
TotalFiles: int64(len(ofc.files)),
|
|
MLFiles: mlFiles,
|
|
TotalChunks: totalChunks,
|
|
CacheHits: ofc.cacheHits,
|
|
CacheMisses: ofc.cacheMisses,
|
|
EvictedFiles: ofc.evictedFiles,
|
|
FileTypes: fileTypes,
|
|
AccessPatterns: patterns,
|
|
}
|
|
}
|
|
|
|
// OpenFileCacheMetrics holds metrics for the open file cache
|
|
type OpenFileCacheMetrics struct {
|
|
TotalFiles int64 `json:"total_files"`
|
|
MLFiles int64 `json:"ml_files"`
|
|
TotalChunks int64 `json:"total_chunks"`
|
|
CacheHits int64 `json:"cache_hits"`
|
|
CacheMisses int64 `json:"cache_misses"`
|
|
EvictedFiles int64 `json:"evicted_files"`
|
|
FileTypes map[MLFileType]int `json:"file_types"`
|
|
AccessPatterns map[AccessPattern]int `json:"access_patterns"`
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the open file cache
|
|
func (ofc *OpenFileCache) Shutdown() {
|
|
glog.V(1).Infof("Shutting down OpenFileCache...")
|
|
|
|
close(ofc.shutdown)
|
|
|
|
// Wait for cleanup worker to finish
|
|
<-ofc.done
|
|
|
|
// Print final metrics
|
|
metrics := ofc.GetMetrics()
|
|
glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d",
|
|
metrics.TotalFiles, metrics.TotalChunks, metrics.CacheHits, metrics.CacheMisses)
|
|
}
|
|
|
|
// MLFileDetector methods
|
|
|
|
// DetectMLFile determines if a file is ML-related and its type
|
|
func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath string) (bool, MLFileType) {
|
|
if entry == nil {
|
|
return false, MLFileUnknown
|
|
}
|
|
|
|
name := entry.Name
|
|
size := int64(entry.Attributes.FileSize)
|
|
|
|
// Check file extension
|
|
if ext := getFileExtension(name); ext != "" {
|
|
if detector.datasetExtensions[ext] {
|
|
return true, MLFileDataset
|
|
}
|
|
if detector.modelExtensions[ext] {
|
|
return true, MLFileModel
|
|
}
|
|
if detector.configExtensions[ext] {
|
|
return true, MLFileConfig
|
|
}
|
|
}
|
|
|
|
// Check path patterns
|
|
for _, path := range detector.datasetPaths {
|
|
if contains(fullPath, path) {
|
|
return true, MLFileDataset
|
|
}
|
|
}
|
|
|
|
for _, path := range detector.modelPaths {
|
|
if contains(fullPath, path) {
|
|
return true, MLFileModel
|
|
}
|
|
}
|
|
|
|
// Check size heuristics
|
|
if size > detector.modelMinSize {
|
|
// Large files in certain contexts might be models
|
|
if contains(fullPath, "model") || contains(fullPath, "checkpoint") || contains(fullPath, "weight") {
|
|
return true, MLFileModel
|
|
}
|
|
}
|
|
|
|
// Check for tensor files
|
|
if contains(name, "tensor") || contains(name, ".pt") || contains(name, ".npy") {
|
|
return true, MLFileTensor
|
|
}
|
|
|
|
// Check for log files
|
|
if contains(name, "log") || contains(name, "tensorboard") || contains(fullPath, "logs") {
|
|
return true, MLFileLog
|
|
}
|
|
|
|
return false, MLFileUnknown
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
func getFileExtension(filename string) string {
|
|
for i := len(filename) - 1; i >= 0; i-- {
|
|
if filename[i] == '.' {
|
|
return filename[i+1:]
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func contains(str, substr string) bool {
|
|
return len(str) >= len(substr) && findSubstring(str, substr)
|
|
}
|
|
|
|
func findSubstring(str, substr string) bool {
|
|
if len(substr) == 0 {
|
|
return true
|
|
}
|
|
if len(str) < len(substr) {
|
|
return false
|
|
}
|
|
|
|
for i := 0; i <= len(str)-len(substr); i++ {
|
|
if str[i:i+len(substr)] == substr {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// String methods for enums
|
|
|
|
func (ps PrefetchState) String() string {
|
|
switch ps {
|
|
case PrefetchIdle:
|
|
return "Idle"
|
|
case PrefetchActive:
|
|
return "Active"
|
|
case PrefetchComplete:
|
|
return "Complete"
|
|
case PrefetchSuspended:
|
|
return "Suspended"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
func (ft MLFileType) String() string {
|
|
switch ft {
|
|
case MLFileDataset:
|
|
return "Dataset"
|
|
case MLFileModel:
|
|
return "Model"
|
|
case MLFileConfig:
|
|
return "Config"
|
|
case MLFileTensor:
|
|
return "Tensor"
|
|
case MLFileLog:
|
|
return "Log"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|