1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/mount/ml/prefetch.go
chrislu ba318bdac3 Reorganize ML optimization into dedicated package
- Move ML components to weed/mount/ml package for better organization
- Create main MLOptimization interface with configuration
- Separate prefetch, access pattern detection, and ML reader cache components
- Add comprehensive configuration and metrics interface
- Maintain backward compatibility with existing mount package
- Package structure:
  * weed/mount/ml/prefetch.go - Prefetch manager
  * weed/mount/ml/access_pattern.go - Pattern detection
  * weed/mount/ml/ml_reader_cache.go - ML-aware reader cache
  * weed/mount/ml/ml.go - Main interface and configuration

Test status: 17/22 tests passing, core functionality solid
Package compiles cleanly with proper import structure
2025-08-30 15:09:47 -07:00

349 lines
8.7 KiB
Go

package ml
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// PrefetchRequest represents a chunk prefetch request
type PrefetchRequest struct {
FileId string
ChunkIndex uint32
Offset uint64
Size uint64
Priority int
Timestamp time.Time
Callback func([]byte, error)
ctx context.Context
}
// PrefetchJob tracks an active prefetch operation
type PrefetchJob struct {
request *PrefetchRequest
startTime time.Time
cancelled int32
}
// PrefetchManager manages background chunk prefetching for ML workloads
type PrefetchManager struct {
sync.RWMutex
// Configuration
maxWorkers int
queueSize int
jobTimeout time.Duration
enableMetrics bool
// Worker management
workers chan *PrefetchRequest
activeJobs map[string]*PrefetchJob
workerWg sync.WaitGroup
// Metrics
totalRequests int64
successfulFetch int64
failedFetch int64
duplicateReqs int64
timeoutReqs int64
// Shutdown
shutdown chan struct{}
done chan struct{}
}
// NewPrefetchManager creates a new prefetch manager optimized for ML workloads
func NewPrefetchManager(maxWorkers int, queueSize int, timeout time.Duration) *PrefetchManager {
if maxWorkers <= 0 {
maxWorkers = 4 // Default suitable for ML workloads
}
if queueSize <= 0 {
queueSize = 100
}
if timeout <= 0 {
timeout = 30 * time.Second
}
pm := &PrefetchManager{
maxWorkers: maxWorkers,
queueSize: queueSize,
jobTimeout: timeout,
enableMetrics: true,
workers: make(chan *PrefetchRequest, queueSize),
activeJobs: make(map[string]*PrefetchJob),
shutdown: make(chan struct{}),
done: make(chan struct{}),
}
// Start worker goroutines
for i := 0; i < maxWorkers; i++ {
pm.workerWg.Add(1)
go pm.worker(i)
}
// Start cleanup goroutine for expired jobs
go pm.cleanupWorker()
glog.V(1).Infof("PrefetchManager started with %d workers, queue size %d", maxWorkers, queueSize)
return pm
}
// Prefetch requests background fetching of a chunk
// Returns true if request was queued, false if duplicate or queue full
func (pm *PrefetchManager) Prefetch(ctx context.Context, fileId string, chunkIndex uint32, offset, size uint64, priority int, callback func([]byte, error)) bool {
atomic.AddInt64(&pm.totalRequests, 1)
// Create job key for deduplication
jobKey := pm.makeJobKey(fileId, chunkIndex)
pm.Lock()
// Check for duplicate requests
if _, exists := pm.activeJobs[jobKey]; exists {
pm.Unlock()
atomic.AddInt64(&pm.duplicateReqs, 1)
glog.V(4).Infof("Duplicate prefetch request for %s chunk %d", fileId, chunkIndex)
return false
}
request := &PrefetchRequest{
FileId: fileId,
ChunkIndex: chunkIndex,
Offset: offset,
Size: size,
Priority: priority,
Timestamp: time.Now(),
Callback: callback,
ctx: ctx,
}
job := &PrefetchJob{
request: request,
startTime: time.Now(),
}
pm.activeJobs[jobKey] = job
pm.Unlock()
// Try to queue the request
select {
case pm.workers <- request:
glog.V(4).Infof("Queued prefetch for %s chunk %d (priority %d)", fileId, chunkIndex, priority)
return true
default:
// Queue is full, remove from active jobs
pm.Lock()
delete(pm.activeJobs, jobKey)
pm.Unlock()
glog.V(3).Infof("Prefetch queue full, dropping request for %s chunk %d", fileId, chunkIndex)
return false
}
}
// worker processes prefetch requests
func (pm *PrefetchManager) worker(workerID int) {
defer pm.workerWg.Done()
glog.V(4).Infof("Prefetch worker %d started", workerID)
for {
select {
case request := <-pm.workers:
pm.processRequest(workerID, request)
case <-pm.shutdown:
glog.V(4).Infof("Prefetch worker %d shutting down", workerID)
return
}
}
}
// processRequest handles a single prefetch request
func (pm *PrefetchManager) processRequest(workerID int, request *PrefetchRequest) {
jobKey := pm.makeJobKey(request.FileId, request.ChunkIndex)
startTime := time.Now()
glog.V(4).Infof("Worker %d processing prefetch for %s chunk %d", workerID, request.FileId, request.ChunkIndex)
// Check if job was cancelled
pm.RLock()
job, exists := pm.activeJobs[jobKey]
pm.RUnlock()
if !exists {
glog.V(4).Infof("Job %s already cancelled or completed", jobKey)
return
}
if atomic.LoadInt32(&job.cancelled) == 1 {
glog.V(4).Infof("Job %s was cancelled", jobKey)
pm.removeJob(jobKey)
return
}
// Create timeout context
ctx, cancel := context.WithTimeout(request.ctx, pm.jobTimeout)
defer cancel()
// TODO: Implement actual chunk fetching logic
// For now, simulate the work and call the callback
data, err := pm.fetchChunk(ctx, request)
// Update metrics
duration := time.Since(startTime)
if err != nil {
atomic.AddInt64(&pm.failedFetch, 1)
if ctx.Err() == context.DeadlineExceeded {
atomic.AddInt64(&pm.timeoutReqs, 1)
}
glog.V(3).Infof("Worker %d failed to prefetch %s chunk %d after %v: %v", workerID, request.FileId, request.ChunkIndex, duration, err)
} else {
atomic.AddInt64(&pm.successfulFetch, 1)
glog.V(4).Infof("Worker %d successfully prefetched %s chunk %d in %v (%d bytes)", workerID, request.FileId, request.ChunkIndex, duration, len(data))
}
// Call the callback if provided
if request.Callback != nil {
request.Callback(data, err)
}
// Remove job from active jobs
pm.removeJob(jobKey)
}
// fetchChunk performs the actual chunk fetch operation
// TODO: Integrate with existing SeaweedFS chunk reading logic
func (pm *PrefetchManager) fetchChunk(ctx context.Context, request *PrefetchRequest) ([]byte, error) {
// This is a placeholder implementation
// In the real implementation, this would:
// 1. Use the existing chunk cache to check if chunk is already cached
// 2. If not cached, fetch from volume servers using existing logic
// 3. Store in cache for future use
glog.V(4).Infof("Simulating fetch of %s chunk %d (offset %d, size %d)",
request.FileId, request.ChunkIndex, request.Offset, request.Size)
// Simulate some work
select {
case <-time.After(10 * time.Millisecond):
// Return empty data for now
return make([]byte, request.Size), nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Cancel cancels a pending or active prefetch request
func (pm *PrefetchManager) Cancel(fileId string, chunkIndex uint32) bool {
jobKey := pm.makeJobKey(fileId, chunkIndex)
pm.RLock()
job, exists := pm.activeJobs[jobKey]
pm.RUnlock()
if !exists {
return false
}
atomic.StoreInt32(&job.cancelled, 1)
glog.V(4).Infof("Cancelled prefetch for %s chunk %d", fileId, chunkIndex)
return true
}
// cleanupWorker periodically removes expired jobs
func (pm *PrefetchManager) cleanupWorker() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pm.cleanup()
case <-pm.shutdown:
return
}
}
}
// cleanup removes expired jobs
func (pm *PrefetchManager) cleanup() {
now := time.Now()
expiredJobKeys := make([]string, 0)
pm.RLock()
for jobKey, job := range pm.activeJobs {
if now.Sub(job.startTime) > pm.jobTimeout*2 { // Give extra time for cleanup
expiredJobKeys = append(expiredJobKeys, jobKey)
}
}
pm.RUnlock()
if len(expiredJobKeys) > 0 {
pm.Lock()
for _, jobKey := range expiredJobKeys {
delete(pm.activeJobs, jobKey)
}
pm.Unlock()
glog.V(3).Infof("Cleaned up %d expired prefetch jobs", len(expiredJobKeys))
}
}
// GetMetrics returns current prefetch metrics
func (pm *PrefetchManager) GetMetrics() PrefetchMetrics {
pm.RLock()
activeJobCount := len(pm.activeJobs)
pm.RUnlock()
return PrefetchMetrics{
TotalRequests: atomic.LoadInt64(&pm.totalRequests),
SuccessfulFetch: atomic.LoadInt64(&pm.successfulFetch),
FailedFetch: atomic.LoadInt64(&pm.failedFetch),
DuplicateReqs: atomic.LoadInt64(&pm.duplicateReqs),
TimeoutReqs: atomic.LoadInt64(&pm.timeoutReqs),
ActiveJobs: int64(activeJobCount),
Workers: int64(pm.maxWorkers),
}
}
// PrefetchMetrics holds prefetch performance metrics
type PrefetchMetrics struct {
TotalRequests int64
SuccessfulFetch int64
FailedFetch int64
DuplicateReqs int64
TimeoutReqs int64
ActiveJobs int64
Workers int64
}
// Shutdown gracefully shuts down the prefetch manager
func (pm *PrefetchManager) Shutdown() {
glog.V(1).Infof("Shutting down PrefetchManager...")
close(pm.shutdown)
// Wait for workers to finish
pm.workerWg.Wait()
// Clear active jobs
pm.Lock()
pm.activeJobs = make(map[string]*PrefetchJob)
pm.Unlock()
close(pm.done)
glog.V(1).Infof("PrefetchManager shutdown complete")
}
// Helper methods
func (pm *PrefetchManager) makeJobKey(fileId string, chunkIndex uint32) string {
return fileId + ":" + string(rune(chunkIndex))
}
func (pm *PrefetchManager) removeJob(jobKey string) {
pm.Lock()
delete(pm.activeJobs, jobKey)
pm.Unlock()
}