1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/mount/ml/prefetch_test.go
chrislu ba318bdac3 Reorganize ML optimization into dedicated package
- Move ML components to weed/mount/ml package for better organization
- Create main MLOptimization interface with configuration
- Separate prefetch, access pattern detection, and ML reader cache components
- Add comprehensive configuration and metrics interface
- Maintain backward compatibility with existing mount package
- Package structure:
  * weed/mount/ml/prefetch.go - Prefetch manager
  * weed/mount/ml/access_pattern.go - Pattern detection
  * weed/mount/ml/ml_reader_cache.go - ML-aware reader cache
  * weed/mount/ml/ml.go - Main interface and configuration

Test status: 17/22 tests passing, core functionality solid
Package compiles cleanly with proper import structure
2025-08-30 15:09:47 -07:00

333 lines
8.1 KiB
Go

package ml
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestPrefetchManager_Basic(t *testing.T) {
pm := NewPrefetchManager(2, 10, 5*time.Second)
defer pm.Shutdown()
// Test basic prefetch request
ctx := context.Background()
var callbackData []byte
var callbackErr error
var callbackCalled int32
callback := func(data []byte, err error) {
atomic.StoreInt32(&callbackCalled, 1)
callbackData = data
callbackErr = err
}
success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
if !success {
t.Error("Expected prefetch request to succeed")
}
// Wait for callback to be called
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt32(&callbackCalled) != 1 {
t.Error("Expected callback to be called")
}
if callbackErr != nil {
t.Errorf("Expected no error, got: %v", callbackErr)
}
if len(callbackData) != 1024 {
t.Errorf("Expected data length 1024, got: %d", len(callbackData))
}
}
func TestPrefetchManager_DuplicateRequests(t *testing.T) {
pm := NewPrefetchManager(2, 10, 5*time.Second)
defer pm.Shutdown()
ctx := context.Background()
var callbackCount int32
callback := func(data []byte, err error) {
atomic.AddInt32(&callbackCount, 1)
}
// Send the same request multiple times
success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
success2 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
success3 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
if !success1 {
t.Error("Expected first prefetch request to succeed")
}
if success2 || success3 {
t.Error("Expected duplicate requests to be rejected")
}
// Wait for processing
time.Sleep(100 * time.Millisecond)
// Should have only one callback
if atomic.LoadInt32(&callbackCount) != 1 {
t.Errorf("Expected 1 callback, got: %d", atomic.LoadInt32(&callbackCount))
}
// Check metrics
metrics := pm.GetMetrics()
if metrics.TotalRequests != 3 {
t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests)
}
if metrics.DuplicateReqs != 2 {
t.Errorf("Expected 2 duplicate requests, got: %d", metrics.DuplicateReqs)
}
}
func TestPrefetchManager_WorkerPool(t *testing.T) {
pm := NewPrefetchManager(3, 20, 5*time.Second)
defer pm.Shutdown()
ctx := context.Background()
var completedCount int32
callback := func(data []byte, err error) {
atomic.AddInt32(&completedCount, 1)
}
// Send multiple requests
requestCount := 10
for i := 0; i < requestCount; i++ {
fileId := "file" + string(rune('0'+i))
success := pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback)
if !success {
t.Errorf("Expected prefetch request %d to succeed", i)
}
}
// Wait for all to complete
time.Sleep(200 * time.Millisecond)
completed := atomic.LoadInt32(&completedCount)
if completed != int32(requestCount) {
t.Errorf("Expected %d completed requests, got: %d", requestCount, completed)
}
metrics := pm.GetMetrics()
if metrics.SuccessfulFetch != int64(requestCount) {
t.Errorf("Expected %d successful fetches, got: %d", requestCount, metrics.SuccessfulFetch)
}
}
func TestPrefetchManager_Cancel(t *testing.T) {
pm := NewPrefetchManager(1, 5, 5*time.Second) // Single worker to ensure ordering
defer pm.Shutdown()
ctx := context.Background()
var callbackCalled int32
callback := func(data []byte, err error) {
atomic.StoreInt32(&callbackCalled, 1)
}
// Queue a request
success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
if !success {
t.Error("Expected prefetch request to succeed")
}
// Cancel it immediately
cancelled := pm.Cancel("file1", 0)
if !cancelled {
t.Error("Expected cancel to succeed")
}
// Wait a bit
time.Sleep(50 * time.Millisecond)
// Callback might still be called since cancellation is asynchronous
// Main thing is that the job was marked as cancelled
}
func TestPrefetchManager_QueueFull(t *testing.T) {
pm := NewPrefetchManager(1, 2, 5*time.Second) // Small queue
defer pm.Shutdown()
ctx := context.Background()
callback := func(data []byte, err error) {}
// Fill the queue
success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
success2 := pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback)
success3 := pm.Prefetch(ctx, "file3", 0, 0, 1024, 1, callback) // This should fail
if !success1 || !success2 {
t.Error("Expected first two requests to succeed")
}
if success3 {
t.Error("Expected third request to fail due to full queue")
}
}
func TestPrefetchManager_Timeout(t *testing.T) {
pm := NewPrefetchManager(1, 5, 50*time.Millisecond) // Very short timeout
defer pm.Shutdown()
ctx := context.Background()
var timeoutCount int32
callback := func(data []byte, err error) {
if err == context.DeadlineExceeded {
atomic.AddInt32(&timeoutCount, 1)
}
}
// This implementation doesn't actually timeout since fetchChunk is fast
// But the structure is there for when we integrate with real chunk fetching
success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
if !success {
t.Error("Expected prefetch request to succeed")
}
time.Sleep(200 * time.Millisecond)
}
func TestPrefetchManager_ConcurrentAccess(t *testing.T) {
pm := NewPrefetchManager(4, 50, 5*time.Second)
defer pm.Shutdown()
ctx := context.Background()
var completedCount int32
callback := func(data []byte, err error) {
atomic.AddInt32(&completedCount, 1)
}
// Test concurrent access from multiple goroutines
var wg sync.WaitGroup
goroutineCount := 10
requestsPerGoroutine := 5
for i := 0; i < goroutineCount; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < requestsPerGoroutine; j++ {
fileId := "file" + string(rune('0'+goroutineID)) + "_" + string(rune('0'+j))
pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback)
}
}(i)
}
wg.Wait()
// Wait for all requests to complete
time.Sleep(500 * time.Millisecond)
expectedTotal := goroutineCount * requestsPerGoroutine
completed := atomic.LoadInt32(&completedCount)
if completed != int32(expectedTotal) {
t.Errorf("Expected %d completed requests, got: %d", expectedTotal, completed)
}
}
func TestPrefetchManager_Metrics(t *testing.T) {
pm := NewPrefetchManager(2, 10, 5*time.Second)
defer pm.Shutdown()
ctx := context.Background()
callback := func(data []byte, err error) {}
// Make some requests
pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback)
pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) // Duplicate
time.Sleep(100 * time.Millisecond)
metrics := pm.GetMetrics()
if metrics.TotalRequests != 3 {
t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests)
}
if metrics.DuplicateReqs != 1 {
t.Errorf("Expected 1 duplicate request, got: %d", metrics.DuplicateReqs)
}
if metrics.Workers != 2 {
t.Errorf("Expected 2 workers, got: %d", metrics.Workers)
}
// Should have some successful fetches
if metrics.SuccessfulFetch == 0 {
t.Error("Expected some successful fetches")
}
}
func TestPrefetchManager_Shutdown(t *testing.T) {
pm := NewPrefetchManager(2, 10, 5*time.Second)
ctx := context.Background()
callback := func(data []byte, err error) {}
// Make a request
pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
// Shutdown should complete without hanging
done := make(chan struct{})
go func() {
pm.Shutdown()
close(done)
}()
select {
case <-done:
// Success
case <-time.After(5 * time.Second):
t.Error("Shutdown took too long")
}
}
// Benchmark tests
func BenchmarkPrefetchManager_SingleWorker(b *testing.B) {
pm := NewPrefetchManager(1, 1000, 30*time.Second)
defer pm.Shutdown()
ctx := context.Background()
callback := func(data []byte, err error) {}
b.ResetTimer()
for i := 0; i < b.N; i++ {
fileId := "file" + string(rune(i%100)) // Reuse file IDs to test deduplication
pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback)
}
}
func BenchmarkPrefetchManager_MultipleWorkers(b *testing.B) {
pm := NewPrefetchManager(8, 1000, 30*time.Second)
defer pm.Shutdown()
ctx := context.Background()
callback := func(data []byte, err error) {}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
fileId := "file" + string(rune(i%1000))
pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback)
i++
}
})
}