mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-08 20:32:47 +02:00
- Implement PrefetchManager with configurable worker pool and deduplication - Add AccessPatternDetector for sequential, strided, and ML-specific patterns - Create MLReaderCache with ML-aware prefetching capabilities - Add comprehensive unit tests for prefetch manager - Include foundation for detecting training datasets, model loading, and epoch patterns - Support configurable prefetch parameters optimized for ML workloads Features: - Concurrent prefetch workers (8 by default) - Pattern detection for sequential, model, epoch, and strided access - ML-specific heuristics for large file and dataset access - Comprehensive metrics and monitoring - Graceful shutdown and cleanup Tests: - PrefetchManager: All tests passing (9/9) - AccessPatternDetector: Core functionality implemented - MLReaderCache: Basic functionality and integration tests
496 lines
14 KiB
Markdown
496 lines
14 KiB
Markdown
# SeaweedFS FUSE ML Optimization Plan
|
|
|
|
## Analysis Summary
|
|
|
|
Based on examination of JuiceFS's recent 600 commits and current SeaweedFS FUSE implementation, this plan identifies key ML-focused optimizations that can be ported to SeaweedFS.
|
|
|
|
### Key JuiceFS Optimizations for ML Workloads:
|
|
|
|
1. **Smart Prefetching System** (`pkg/chunk/prefetch.go`)
|
|
- Concurrent prefetch workers (configurable parallelism)
|
|
- Duplicate request deduplication
|
|
- Background chunk fetching
|
|
|
|
2. **Advanced Caching Architecture**
|
|
- Multi-tiered caching (memory + disk with size-based tiers)
|
|
- Open file cache with chunk-level caching (`pkg/meta/openfile.go`)
|
|
- Intelligent cache eviction based on access patterns
|
|
|
|
3. **Performance Optimizations**
|
|
- Support for writeback cache mode
|
|
- Memory cache optimization with separate allocation
|
|
- Better cache hit detection and metrics
|
|
|
|
### Current SeaweedFS Limitations:
|
|
|
|
1. **Basic Caching**: Simple tiered cache without smart prefetching
|
|
2. **No Sequential Access Detection**: Missing readahead optimizations
|
|
3. **Limited Concurrency Control**: Basic reader cache without pattern detection
|
|
4. **No ML-Specific Optimizations**: Missing batch processing awareness
|
|
|
|
## Implementation Plan
|
|
|
|
### Phase 1: Smart Prefetching System (Priority: High)
|
|
|
|
**1.1 Create Prefetch Worker Pool**
|
|
```go
|
|
// Location: weed/mount/prefetch.go (new file)
|
|
type PrefetchManager struct {
|
|
workers chan *PrefetchRequest
|
|
activeJobs map[string]*PrefetchJob
|
|
maxWorkers int
|
|
jobTimeout time.Duration
|
|
}
|
|
|
|
type PrefetchRequest struct {
|
|
FileId string
|
|
ChunkIndex uint32
|
|
Priority int
|
|
Callback func([]byte, error)
|
|
}
|
|
```
|
|
|
|
**1.2 Sequential Access Detection**
|
|
```go
|
|
// Location: weed/mount/access_pattern.go (new file)
|
|
type AccessPatternDetector struct {
|
|
recentAccesses []AccessInfo
|
|
sequentialThreshold int
|
|
readaheadSize int64
|
|
}
|
|
|
|
// Integration in weedfs_file_read.go
|
|
func (fh *FileHandle) detectSequentialAccess(offset int64, size int) bool {
|
|
// Detect if current read follows sequential pattern
|
|
// Trigger prefetch for next chunks if sequential
|
|
}
|
|
```
|
|
|
|
**1.3 Enhanced Reader Cache with Prefetching**
|
|
```go
|
|
// Location: weed/filer/reader_cache.go (enhancement)
|
|
func (rc *ReaderCache) MaybePrefetch(chunkViews *Interval[*ChunkView]) {
|
|
// Enhanced version with sequential detection
|
|
// Prefetch multiple chunks ahead for sequential reads
|
|
// Use ML-aware heuristics for prefetch distance
|
|
}
|
|
```
|
|
|
|
### Phase 2: Enhanced Caching (Priority: High)
|
|
|
|
**2.1 Open File Cache with Chunk Metadata**
|
|
```go
|
|
// Location: weed/mount/open_file_cache.go (new file)
|
|
type OpenFileCache struct {
|
|
files map[uint64]*OpenFile // inode -> OpenFile
|
|
mutex sync.RWMutex
|
|
maxFiles int
|
|
ttl time.Duration
|
|
}
|
|
|
|
type OpenFile struct {
|
|
Inode uint64
|
|
ChunkCache map[uint32]*ChunkMetadata
|
|
AccessTime time.Time
|
|
ReadPattern AccessPattern
|
|
}
|
|
|
|
type ChunkMetadata struct {
|
|
Offset uint64
|
|
Size uint64
|
|
CacheLevel int // 0=memory, 1=disk, 2=not cached
|
|
LastAccess time.Time
|
|
}
|
|
```
|
|
|
|
**2.2 ML-Aware Cache Eviction Policy**
|
|
```go
|
|
// Location: weed/util/chunk_cache/ml_cache_policy.go (new file)
|
|
type MLCachePolicy struct {
|
|
// Factors in:
|
|
// - File access recency
|
|
// - Sequential vs random access patterns
|
|
// - File size (prefer caching smaller frequently accessed files)
|
|
// - Training vs inference workload detection
|
|
}
|
|
|
|
func (policy *MLCachePolicy) ShouldEvict(chunk *CacheEntry) bool {
|
|
// ML-specific eviction logic
|
|
// Keep chunks that are part of training datasets longer
|
|
// Prioritize model checkpoints during inference
|
|
}
|
|
```
|
|
|
|
**2.3 Writeback Cache Support**
|
|
```go
|
|
// Location: weed/mount/weedfs.go (enhancement)
|
|
func (wfs *WFS) configureFuseOptions() {
|
|
// Add support for FOPEN_KEEP_CACHE
|
|
// Implement writeback cache similar to JuiceFS
|
|
// Enable kernel caching for read-heavy ML workloads
|
|
}
|
|
```
|
|
|
|
### Phase 3: ML Pattern Detection (Priority: Medium)
|
|
|
|
**3.1 Training Data Access Pattern Detection**
|
|
```go
|
|
// Location: weed/mount/ml_patterns.go (new file)
|
|
type MLWorkloadDetector struct {
|
|
accessHistory []AccessEvent
|
|
patterns []AccessPattern
|
|
}
|
|
|
|
type AccessPattern int
|
|
const (
|
|
RandomAccess AccessPattern = iota
|
|
SequentialAccess
|
|
StridedAccess // Common in image datasets
|
|
BatchAccess // Multiple files accessed together
|
|
EpochAccess // Dataset restart patterns
|
|
)
|
|
|
|
func (detector *MLWorkloadDetector) DetectPattern(accesses []AccessEvent) AccessPattern {
|
|
// Analyze access patterns to detect:
|
|
// - Image dataset traversal (often sequential with restarts)
|
|
// - Model checkpoint loading (large sequential reads)
|
|
// - Tensor file access patterns
|
|
}
|
|
```
|
|
|
|
**3.2 Dataset Traversal Optimization**
|
|
```go
|
|
// Location: weed/mount/dataset_optimizer.go (new file)
|
|
func (opt *DatasetOptimizer) OptimizeForTraining() {
|
|
// Pre-load dataset metadata
|
|
// Prefetch next batch of files during current batch processing
|
|
// Implement epoch boundary detection and cache warming
|
|
}
|
|
```
|
|
|
|
### Phase 4: Batch Optimization (Priority: Medium)
|
|
|
|
**4.1 Batch Read Aggregation**
|
|
```go
|
|
// Location: weed/mount/batch_reader.go (new file)
|
|
type BatchReader struct {
|
|
pendingReads []ReadRequest
|
|
batchSize int
|
|
timeout time.Duration
|
|
}
|
|
|
|
func (br *BatchReader) AggregateReads() {
|
|
// Combine multiple small reads into larger requests
|
|
// Optimize for common ML access patterns
|
|
// Reduce network overhead for distributed training
|
|
}
|
|
```
|
|
|
|
**4.2 Tensor File Optimization**
|
|
```go
|
|
// Location: weed/mount/tensor_optimizer.go (new file)
|
|
func (to *TensorOptimizer) OptimizeForTensorFlow() {
|
|
// Detect TFRecord, PyTorch .pt files
|
|
// Optimize chunk sizes for tensor data
|
|
// Implement tensor-aware prefetching
|
|
}
|
|
```
|
|
|
|
### Phase 5: Configuration and Monitoring (Priority: Low)
|
|
|
|
**5.1 ML-Specific Mount Options**
|
|
```go
|
|
// Location: weed/command/mount.go (enhancement)
|
|
var mlOptions = struct {
|
|
enableMLOptimization *bool
|
|
prefetchWorkers *int
|
|
mlCacheSize *int64
|
|
trainingMode *bool
|
|
datasetPath *string
|
|
}
|
|
|
|
// New mount flags:
|
|
// -ml.optimization=true
|
|
// -ml.prefetchWorkers=8
|
|
// -ml.cacheSize=1GB
|
|
// -ml.trainingMode=true
|
|
// -ml.datasetPath=/datasets
|
|
```
|
|
|
|
**5.2 Performance Metrics**
|
|
```go
|
|
// Location: weed/mount/ml_metrics.go (new file)
|
|
type MLMetrics struct {
|
|
PrefetchHitRate float64
|
|
SequentialDetected int64
|
|
CacheHitsByPattern map[AccessPattern]int64
|
|
BatchEfficiency float64
|
|
}
|
|
|
|
func (metrics *MLMetrics) Export() {
|
|
// Export to Prometheus/Grafana for monitoring
|
|
// Track ML-specific performance indicators
|
|
}
|
|
```
|
|
|
|
## Testing Plan
|
|
|
|
### Unit Testing Strategy
|
|
|
|
#### Phase 1 Tests
|
|
1. **Prefetch Manager Tests**
|
|
```go
|
|
// Location: weed/mount/prefetch_test.go
|
|
func TestPrefetchManager_WorkerPool(t *testing.T)
|
|
func TestPrefetchManager_DuplicateRequests(t *testing.T)
|
|
func TestPrefetchManager_PriorityQueue(t *testing.T)
|
|
func TestPrefetchManager_Timeout(t *testing.T)
|
|
```
|
|
|
|
2. **Access Pattern Detection Tests**
|
|
```go
|
|
// Location: weed/mount/access_pattern_test.go
|
|
func TestSequentialDetection(t *testing.T)
|
|
func TestRandomAccessDetection(t *testing.T)
|
|
func TestStridedAccessDetection(t *testing.T)
|
|
func TestPatternTransition(t *testing.T)
|
|
```
|
|
|
|
#### Phase 2 Tests
|
|
3. **Open File Cache Tests**
|
|
```go
|
|
// Location: weed/mount/open_file_cache_test.go
|
|
func TestOpenFileCache_Basic(t *testing.T)
|
|
func TestOpenFileCache_Eviction(t *testing.T)
|
|
func TestOpenFileCache_ChunkMetadata(t *testing.T)
|
|
func TestOpenFileCache_Concurrent(t *testing.T)
|
|
```
|
|
|
|
4. **ML Cache Policy Tests**
|
|
```go
|
|
// Location: weed/util/chunk_cache/ml_cache_policy_test.go
|
|
func TestMLCachePolicy_TrainingWorkload(t *testing.T)
|
|
func TestMLCachePolicy_InferenceWorkload(t *testing.T)
|
|
func TestMLCachePolicy_EvictionHeuristics(t *testing.T)
|
|
```
|
|
|
|
#### Phase 3 Tests
|
|
5. **ML Pattern Detection Tests**
|
|
```go
|
|
// Location: weed/mount/ml_patterns_test.go
|
|
func TestMLWorkloadDetector_ImageDataset(t *testing.T)
|
|
func TestMLWorkloadDetector_TextDataset(t *testing.T)
|
|
func TestMLWorkloadDetector_ModelCheckpoints(t *testing.T)
|
|
func TestMLWorkloadDetector_EpochBoundary(t *testing.T)
|
|
```
|
|
|
|
#### Phase 4 Tests
|
|
6. **Batch Optimization Tests**
|
|
```go
|
|
// Location: weed/mount/batch_reader_test.go
|
|
func TestBatchReader_Aggregation(t *testing.T)
|
|
func TestBatchReader_Timeout(t *testing.T)
|
|
func TestBatchReader_TensorFiles(t *testing.T)
|
|
```
|
|
|
|
### Integration Testing
|
|
|
|
#### Test Environment Setup
|
|
```bash
|
|
#!/bin/bash
|
|
# test/ml_integration/setup.sh
|
|
|
|
# Setup SeaweedFS cluster for ML testing
|
|
make clean
|
|
make
|
|
|
|
# Start master server
|
|
./weed master &
|
|
sleep 2
|
|
|
|
# Start volume servers
|
|
./weed volume -dir=./vol1 -mserver=localhost:9333 -port=8080 &
|
|
./weed volume -dir=./vol2 -mserver=localhost:9333 -port=8081 &
|
|
sleep 2
|
|
|
|
# Start filer
|
|
./weed filer -master=localhost:9333 &
|
|
sleep 2
|
|
```
|
|
|
|
#### ML Workload Simulation
|
|
```go
|
|
// Location: test/ml_integration/ml_workload_test.go
|
|
func TestMLWorkloadSimulation(t *testing.T) {
|
|
// Simulate PyTorch DataLoader access patterns
|
|
// Test with ImageNet-style dataset structure
|
|
// Measure cache hit rates and throughput
|
|
}
|
|
|
|
func TestSequentialDatasetTraversal(t *testing.T) {
|
|
// Test epoch-based dataset iteration
|
|
// Verify prefetch effectiveness
|
|
// Check memory usage patterns
|
|
}
|
|
|
|
func TestConcurrentTrainingWorkers(t *testing.T) {
|
|
// Simulate multiple training processes
|
|
// Test batch read aggregation
|
|
// Verify no cache conflicts
|
|
}
|
|
```
|
|
|
|
#### Performance Benchmarks
|
|
```go
|
|
// Location: test/ml_integration/benchmark_test.go
|
|
func BenchmarkSequentialRead(b *testing.B) {
|
|
// Compare before/after optimization
|
|
// Measure throughput improvements
|
|
}
|
|
|
|
func BenchmarkRandomRead(b *testing.B) {
|
|
// Test cache effectiveness for random access
|
|
}
|
|
|
|
func BenchmarkConcurrentReads(b *testing.B) {
|
|
// Test scalability with multiple readers
|
|
}
|
|
```
|
|
|
|
### Load Testing
|
|
|
|
#### Test Datasets
|
|
1. **Image Dataset**: 100K images, 224x224 RGB (common CNN input)
|
|
2. **Text Dataset**: 10M text samples (NLP training data)
|
|
3. **Model Checkpoints**: Large PyTorch/TensorFlow model files
|
|
4. **Mixed Workload**: Combination of training and inference access patterns
|
|
|
|
#### Load Test Scenarios
|
|
```go
|
|
// Location: test/ml_load/scenarios.go
|
|
|
|
type LoadTestScenario struct {
|
|
Name string
|
|
Workers int
|
|
Duration time.Duration
|
|
AccessPattern AccessPattern
|
|
DatasetType string
|
|
ExpectedMetrics PerformanceMetrics
|
|
}
|
|
|
|
var scenarios = []LoadTestScenario{
|
|
{
|
|
Name: "CNN Training",
|
|
Workers: 4,
|
|
Duration: 5 * time.Minute,
|
|
AccessPattern: SequentialAccess,
|
|
DatasetType: "ImageDataset",
|
|
},
|
|
{
|
|
Name: "NLP Training",
|
|
Workers: 8,
|
|
Duration: 10 * time.Minute,
|
|
AccessPattern: BatchAccess,
|
|
DatasetType: "TextDataset",
|
|
},
|
|
// More scenarios...
|
|
}
|
|
```
|
|
|
|
### Continuous Integration Tests
|
|
|
|
#### GitHub Actions Workflow
|
|
```yaml
|
|
# Location: .github/workflows/ml-optimization-test.yml
|
|
name: ML Optimization Tests
|
|
|
|
on: [push, pull_request]
|
|
|
|
jobs:
|
|
ml-unit-tests:
|
|
runs-on: ubuntu-latest
|
|
steps:
|
|
- uses: actions/checkout@v2
|
|
- uses: actions/setup-go@v2
|
|
with:
|
|
go-version: 1.21
|
|
- run: go test ./weed/mount/... -tags=ml_optimization
|
|
|
|
ml-integration-tests:
|
|
runs-on: ubuntu-latest
|
|
steps:
|
|
- uses: actions/checkout@v2
|
|
- run: make
|
|
- run: ./test/ml_integration/run_tests.sh
|
|
|
|
ml-performance-tests:
|
|
runs-on: ubuntu-latest
|
|
steps:
|
|
- uses: actions/checkout@v2
|
|
- run: go test -bench=. ./test/ml_integration/
|
|
```
|
|
|
|
## Implementation Timeline
|
|
|
|
### Week 1-2: Foundation + Testing Setup
|
|
- Implement basic prefetch worker pool
|
|
- Add sequential access detection
|
|
- Create access pattern detector
|
|
- **Testing**: Unit tests for prefetch manager and access pattern detection
|
|
- **Commit**: "Phase 1: Add smart prefetching foundation with tests"
|
|
|
|
### Week 3-4: Enhanced Caching + Integration Tests
|
|
- Implement open file cache with chunk metadata
|
|
- Add ML-aware cache eviction policies
|
|
- Enable writeback cache support
|
|
- **Testing**: Integration tests for caching system
|
|
- **Commit**: "Phase 2: Enhanced ML-aware caching with comprehensive tests"
|
|
|
|
### Week 5-6: ML Patterns + Load Testing
|
|
- Create ML workload detector
|
|
- Implement dataset traversal optimization
|
|
- Add training-specific optimizations
|
|
- **Testing**: ML pattern detection tests and load testing setup
|
|
- **Commit**: "Phase 3: ML pattern detection with load testing framework"
|
|
|
|
### Week 7-8: Batch Optimization + Performance Testing
|
|
- Implement batch read aggregation
|
|
- Add tensor file optimizations
|
|
- Integration testing and performance tuning
|
|
- **Testing**: Performance benchmarks and optimization verification
|
|
- **Commit**: "Phase 4: Batch optimization with performance benchmarks"
|
|
|
|
### Week 9-10: Configuration, Monitoring & CI
|
|
- Add ML-specific mount options
|
|
- Implement performance metrics
|
|
- Documentation and final testing
|
|
- **Testing**: End-to-end testing and CI pipeline setup
|
|
- **Commit**: "Phase 5: ML monitoring and configuration with full test suite"
|
|
|
|
## Expected Performance Improvements
|
|
|
|
1. **Sequential Read Throughput**: 3-5x improvement for large file streaming
|
|
2. **Training Data Loading**: 2-3x faster dataset iteration
|
|
3. **Cache Hit Rate**: 40-60% improvement with ML-aware caching
|
|
4. **Memory Efficiency**: 20-30% reduction in memory usage through better eviction
|
|
5. **Network Overhead**: 50% reduction through batch aggregation
|
|
|
|
## Testing Success Criteria
|
|
|
|
### Performance Benchmarks
|
|
- [ ] Sequential read throughput >= 3x baseline
|
|
- [ ] Cache hit rate >= 60% for training workloads
|
|
- [ ] Memory usage increase <= 20% despite additional caching
|
|
- [ ] Prefetch accuracy >= 80% for sequential access
|
|
|
|
### Functional Tests
|
|
- [ ] All unit tests pass with >= 90% code coverage
|
|
- [ ] Integration tests pass for common ML frameworks
|
|
- [ ] Load tests complete without memory leaks
|
|
- [ ] Concurrent access tests show no data corruption
|
|
|
|
### Compatibility Tests
|
|
- [ ] Existing FUSE functionality unaffected
|
|
- [ ] No performance regression for non-ML workloads
|
|
- [ ] Works with PyTorch, TensorFlow, and generic file access
|
|
- [ ] Cross-platform compatibility (Linux, macOS)
|