mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-10 05:12:47 +02:00
- Move ML components to weed/mount/ml package for better organization - Create main MLOptimization interface with configuration - Separate prefetch, access pattern detection, and ML reader cache components - Add comprehensive configuration and metrics interface - Maintain backward compatibility with existing mount package - Package structure: * weed/mount/ml/prefetch.go - Prefetch manager * weed/mount/ml/access_pattern.go - Pattern detection * weed/mount/ml/ml_reader_cache.go - ML-aware reader cache * weed/mount/ml/ml.go - Main interface and configuration Test status: 17/22 tests passing, core functionality solid Package compiles cleanly with proper import structure
333 lines
8.1 KiB
Go
333 lines
8.1 KiB
Go
package ml
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestPrefetchManager_Basic(t *testing.T) {
|
|
pm := NewPrefetchManager(2, 10, 5*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
// Test basic prefetch request
|
|
ctx := context.Background()
|
|
var callbackData []byte
|
|
var callbackErr error
|
|
var callbackCalled int32
|
|
|
|
callback := func(data []byte, err error) {
|
|
atomic.StoreInt32(&callbackCalled, 1)
|
|
callbackData = data
|
|
callbackErr = err
|
|
}
|
|
|
|
success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
if !success {
|
|
t.Error("Expected prefetch request to succeed")
|
|
}
|
|
|
|
// Wait for callback to be called
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if atomic.LoadInt32(&callbackCalled) != 1 {
|
|
t.Error("Expected callback to be called")
|
|
}
|
|
|
|
if callbackErr != nil {
|
|
t.Errorf("Expected no error, got: %v", callbackErr)
|
|
}
|
|
|
|
if len(callbackData) != 1024 {
|
|
t.Errorf("Expected data length 1024, got: %d", len(callbackData))
|
|
}
|
|
}
|
|
|
|
func TestPrefetchManager_DuplicateRequests(t *testing.T) {
|
|
pm := NewPrefetchManager(2, 10, 5*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
var callbackCount int32
|
|
|
|
callback := func(data []byte, err error) {
|
|
atomic.AddInt32(&callbackCount, 1)
|
|
}
|
|
|
|
// Send the same request multiple times
|
|
success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
success2 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
success3 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
|
|
if !success1 {
|
|
t.Error("Expected first prefetch request to succeed")
|
|
}
|
|
|
|
if success2 || success3 {
|
|
t.Error("Expected duplicate requests to be rejected")
|
|
}
|
|
|
|
// Wait for processing
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Should have only one callback
|
|
if atomic.LoadInt32(&callbackCount) != 1 {
|
|
t.Errorf("Expected 1 callback, got: %d", atomic.LoadInt32(&callbackCount))
|
|
}
|
|
|
|
// Check metrics
|
|
metrics := pm.GetMetrics()
|
|
if metrics.TotalRequests != 3 {
|
|
t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests)
|
|
}
|
|
|
|
if metrics.DuplicateReqs != 2 {
|
|
t.Errorf("Expected 2 duplicate requests, got: %d", metrics.DuplicateReqs)
|
|
}
|
|
}
|
|
|
|
func TestPrefetchManager_WorkerPool(t *testing.T) {
|
|
pm := NewPrefetchManager(3, 20, 5*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
var completedCount int32
|
|
|
|
callback := func(data []byte, err error) {
|
|
atomic.AddInt32(&completedCount, 1)
|
|
}
|
|
|
|
// Send multiple requests
|
|
requestCount := 10
|
|
for i := 0; i < requestCount; i++ {
|
|
fileId := "file" + string(rune('0'+i))
|
|
success := pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback)
|
|
if !success {
|
|
t.Errorf("Expected prefetch request %d to succeed", i)
|
|
}
|
|
}
|
|
|
|
// Wait for all to complete
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
completed := atomic.LoadInt32(&completedCount)
|
|
if completed != int32(requestCount) {
|
|
t.Errorf("Expected %d completed requests, got: %d", requestCount, completed)
|
|
}
|
|
|
|
metrics := pm.GetMetrics()
|
|
if metrics.SuccessfulFetch != int64(requestCount) {
|
|
t.Errorf("Expected %d successful fetches, got: %d", requestCount, metrics.SuccessfulFetch)
|
|
}
|
|
}
|
|
|
|
func TestPrefetchManager_Cancel(t *testing.T) {
|
|
pm := NewPrefetchManager(1, 5, 5*time.Second) // Single worker to ensure ordering
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
var callbackCalled int32
|
|
|
|
callback := func(data []byte, err error) {
|
|
atomic.StoreInt32(&callbackCalled, 1)
|
|
}
|
|
|
|
// Queue a request
|
|
success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
if !success {
|
|
t.Error("Expected prefetch request to succeed")
|
|
}
|
|
|
|
// Cancel it immediately
|
|
cancelled := pm.Cancel("file1", 0)
|
|
if !cancelled {
|
|
t.Error("Expected cancel to succeed")
|
|
}
|
|
|
|
// Wait a bit
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Callback might still be called since cancellation is asynchronous
|
|
// Main thing is that the job was marked as cancelled
|
|
}
|
|
|
|
func TestPrefetchManager_QueueFull(t *testing.T) {
|
|
pm := NewPrefetchManager(1, 2, 5*time.Second) // Small queue
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
callback := func(data []byte, err error) {}
|
|
|
|
// Fill the queue
|
|
success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
success2 := pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback)
|
|
success3 := pm.Prefetch(ctx, "file3", 0, 0, 1024, 1, callback) // This should fail
|
|
|
|
if !success1 || !success2 {
|
|
t.Error("Expected first two requests to succeed")
|
|
}
|
|
|
|
if success3 {
|
|
t.Error("Expected third request to fail due to full queue")
|
|
}
|
|
}
|
|
|
|
func TestPrefetchManager_Timeout(t *testing.T) {
|
|
pm := NewPrefetchManager(1, 5, 50*time.Millisecond) // Very short timeout
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
var timeoutCount int32
|
|
|
|
callback := func(data []byte, err error) {
|
|
if err == context.DeadlineExceeded {
|
|
atomic.AddInt32(&timeoutCount, 1)
|
|
}
|
|
}
|
|
|
|
// This implementation doesn't actually timeout since fetchChunk is fast
|
|
// But the structure is there for when we integrate with real chunk fetching
|
|
success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
if !success {
|
|
t.Error("Expected prefetch request to succeed")
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
func TestPrefetchManager_ConcurrentAccess(t *testing.T) {
|
|
pm := NewPrefetchManager(4, 50, 5*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
var completedCount int32
|
|
|
|
callback := func(data []byte, err error) {
|
|
atomic.AddInt32(&completedCount, 1)
|
|
}
|
|
|
|
// Test concurrent access from multiple goroutines
|
|
var wg sync.WaitGroup
|
|
goroutineCount := 10
|
|
requestsPerGoroutine := 5
|
|
|
|
for i := 0; i < goroutineCount; i++ {
|
|
wg.Add(1)
|
|
go func(goroutineID int) {
|
|
defer wg.Done()
|
|
|
|
for j := 0; j < requestsPerGoroutine; j++ {
|
|
fileId := "file" + string(rune('0'+goroutineID)) + "_" + string(rune('0'+j))
|
|
pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Wait for all requests to complete
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
expectedTotal := goroutineCount * requestsPerGoroutine
|
|
completed := atomic.LoadInt32(&completedCount)
|
|
|
|
if completed != int32(expectedTotal) {
|
|
t.Errorf("Expected %d completed requests, got: %d", expectedTotal, completed)
|
|
}
|
|
}
|
|
|
|
func TestPrefetchManager_Metrics(t *testing.T) {
|
|
pm := NewPrefetchManager(2, 10, 5*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
callback := func(data []byte, err error) {}
|
|
|
|
// Make some requests
|
|
pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback)
|
|
pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) // Duplicate
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
metrics := pm.GetMetrics()
|
|
|
|
if metrics.TotalRequests != 3 {
|
|
t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests)
|
|
}
|
|
|
|
if metrics.DuplicateReqs != 1 {
|
|
t.Errorf("Expected 1 duplicate request, got: %d", metrics.DuplicateReqs)
|
|
}
|
|
|
|
if metrics.Workers != 2 {
|
|
t.Errorf("Expected 2 workers, got: %d", metrics.Workers)
|
|
}
|
|
|
|
// Should have some successful fetches
|
|
if metrics.SuccessfulFetch == 0 {
|
|
t.Error("Expected some successful fetches")
|
|
}
|
|
}
|
|
|
|
func TestPrefetchManager_Shutdown(t *testing.T) {
|
|
pm := NewPrefetchManager(2, 10, 5*time.Second)
|
|
|
|
ctx := context.Background()
|
|
callback := func(data []byte, err error) {}
|
|
|
|
// Make a request
|
|
pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
|
|
|
|
// Shutdown should complete without hanging
|
|
done := make(chan struct{})
|
|
go func() {
|
|
pm.Shutdown()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Success
|
|
case <-time.After(5 * time.Second):
|
|
t.Error("Shutdown took too long")
|
|
}
|
|
}
|
|
|
|
// Benchmark tests
|
|
|
|
func BenchmarkPrefetchManager_SingleWorker(b *testing.B) {
|
|
pm := NewPrefetchManager(1, 1000, 30*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
callback := func(data []byte, err error) {}
|
|
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
fileId := "file" + string(rune(i%100)) // Reuse file IDs to test deduplication
|
|
pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback)
|
|
}
|
|
}
|
|
|
|
func BenchmarkPrefetchManager_MultipleWorkers(b *testing.B) {
|
|
pm := NewPrefetchManager(8, 1000, 30*time.Second)
|
|
defer pm.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
callback := func(data []byte, err error) {}
|
|
|
|
b.ResetTimer()
|
|
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
i := 0
|
|
for pb.Next() {
|
|
fileId := "file" + string(rune(i%1000))
|
|
pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback)
|
|
i++
|
|
}
|
|
})
|
|
}
|