mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-10 05:12:47 +02:00
617 lines
15 KiB
Go
617 lines
15 KiB
Go
package ml
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
)
|
|
|
|
func TestOpenFileCache_Basic(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
// Test opening a file
|
|
entry := &filer_pb.Entry{
|
|
Name: "test.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
|
|
inode := uint64(1)
|
|
fullPath := "/test/test.txt"
|
|
fileInfo := cache.OpenFile(inode, entry, fullPath)
|
|
|
|
if fileInfo == nil {
|
|
t.Fatal("OpenFile should return file info")
|
|
}
|
|
|
|
if fileInfo.Inode != inode {
|
|
t.Errorf("Expected inode %d, got %d", inode, fileInfo.Inode)
|
|
}
|
|
|
|
if fileInfo.OpenCount != 1 {
|
|
t.Errorf("Expected open count 1, got %d", fileInfo.OpenCount)
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_MLFileDetection(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
testCases := []struct {
|
|
name string
|
|
path string
|
|
filename string
|
|
size uint64
|
|
expected MLFileType
|
|
}{
|
|
{"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100 * 1024 * 1024, MLFileModel},
|
|
{"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2 * 1024 * 1024, MLFileDataset},
|
|
{"Config file", "/config/training.yaml", "training.yaml", 1024, MLFileConfig},
|
|
{"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50 * 1024 * 1024, MLFileModel},
|
|
{"Log file", "/logs/training.log", "training.log", 10 * 1024, MLFileLog},
|
|
{"Regular file", "/documents/readme.txt", "readme.txt", 5 * 1024, MLFileUnknown},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
entry := &filer_pb.Entry{
|
|
Name: tc.filename,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: tc.size,
|
|
},
|
|
}
|
|
|
|
inode := uint64(time.Now().UnixNano()) // Unique inode
|
|
fileInfo := cache.OpenFile(inode, entry, tc.path)
|
|
|
|
if tc.expected == MLFileUnknown {
|
|
if fileInfo.IsMLFile {
|
|
t.Errorf("File %s should not be detected as ML file", tc.path)
|
|
}
|
|
} else {
|
|
if !fileInfo.IsMLFile {
|
|
t.Errorf("File %s should be detected as ML file", tc.path)
|
|
}
|
|
|
|
if fileInfo.FileType != tc.expected {
|
|
t.Errorf("Expected file type %v, got %v", tc.expected, fileInfo.FileType)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_ChunkMetadata(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
inode := uint64(1)
|
|
entry := &filer_pb.Entry{
|
|
Name: "data.bin",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 10240,
|
|
},
|
|
}
|
|
fullPath := "/data/data.bin"
|
|
|
|
cache.OpenFile(inode, entry, fullPath)
|
|
|
|
// Test updating chunk metadata
|
|
chunkIndex := uint32(0)
|
|
metadata := &ChunkMetadata{
|
|
FileId: "chunk_0",
|
|
Offset: 0,
|
|
Size: 1024,
|
|
CacheLevel: 0,
|
|
LastAccess: time.Now(),
|
|
AccessCount: 1,
|
|
Pattern: SequentialAccess,
|
|
}
|
|
|
|
cache.UpdateChunkCache(inode, chunkIndex, metadata)
|
|
|
|
// Test retrieving chunk metadata
|
|
retrieved, exists := cache.GetChunkMetadata(inode, chunkIndex)
|
|
if !exists {
|
|
t.Error("Chunk metadata should exist")
|
|
}
|
|
|
|
if retrieved.FileId != metadata.FileId {
|
|
t.Errorf("Expected FileId %s, got %s", metadata.FileId, retrieved.FileId)
|
|
}
|
|
|
|
if retrieved.AccessCount != 2 { // Should be incremented during retrieval
|
|
t.Errorf("Expected access count 2, got %d", retrieved.AccessCount)
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_LRUEviction(t *testing.T) {
|
|
cache := NewOpenFileCache(3, 5*time.Minute) // Small cache for testing
|
|
defer cache.Shutdown()
|
|
|
|
// Fill cache to capacity
|
|
for i := 1; i <= 3; i++ {
|
|
entry := &filer_pb.Entry{
|
|
Name: "file" + string(rune('0'+i)) + ".txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/file" + string(rune('0'+i)) + ".txt"
|
|
cache.OpenFile(uint64(i), entry, fullPath)
|
|
cache.CloseFile(uint64(i)) // Close immediately so they can be evicted
|
|
}
|
|
|
|
// Add one more file - should trigger eviction
|
|
entry4 := &filer_pb.Entry{
|
|
Name: "file4.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
cache.OpenFile(uint64(4), entry4, "/test/file4.txt")
|
|
|
|
metrics := cache.GetMetrics()
|
|
if metrics.EvictedFiles == 0 {
|
|
t.Error("Should have evicted at least one file")
|
|
}
|
|
|
|
// File 1 should be evicted (oldest)
|
|
file1Info := cache.GetFileInfo(uint64(1))
|
|
if file1Info != nil {
|
|
t.Error("File 1 should have been evicted")
|
|
}
|
|
|
|
// File 4 should still be there
|
|
file4Info := cache.GetFileInfo(uint64(4))
|
|
if file4Info == nil {
|
|
t.Error("File 4 should still be in cache")
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_TTLCleanup(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 100*time.Millisecond) // Short TTL for testing
|
|
defer cache.Shutdown()
|
|
|
|
inode := uint64(1)
|
|
entry := &filer_pb.Entry{
|
|
Name: "test.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
|
|
fileInfo := cache.OpenFile(inode, entry, "/test/test.txt")
|
|
cache.CloseFile(inode) // Close so it can be cleaned up
|
|
|
|
// Wait for TTL to expire
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
// Trigger cleanup manually
|
|
cache.cleanup()
|
|
|
|
// File should be cleaned up
|
|
retrievedInfo := cache.GetFileInfo(inode)
|
|
if retrievedInfo != nil {
|
|
t.Error("File should have been cleaned up after TTL expiration")
|
|
}
|
|
|
|
_ = fileInfo // Avoid unused variable warning
|
|
}
|
|
|
|
func TestOpenFileCache_MultipleOpens(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
inode := uint64(1)
|
|
entry := &filer_pb.Entry{
|
|
Name: "shared.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/shared.txt"
|
|
|
|
// Open file multiple times
|
|
fileInfo1 := cache.OpenFile(inode, entry, fullPath)
|
|
fileInfo2 := cache.OpenFile(inode, entry, fullPath)
|
|
|
|
if fileInfo1 != fileInfo2 {
|
|
t.Error("Multiple opens of same file should return same file info")
|
|
}
|
|
|
|
if fileInfo1.OpenCount != 2 {
|
|
t.Errorf("Expected open count 2, got %d", fileInfo1.OpenCount)
|
|
}
|
|
|
|
// Close once
|
|
canEvict1 := cache.CloseFile(inode)
|
|
if canEvict1 {
|
|
t.Error("Should not be able to evict file with open count > 0")
|
|
}
|
|
|
|
if fileInfo1.OpenCount != 1 {
|
|
t.Errorf("Expected open count 1 after first close, got %d", fileInfo1.OpenCount)
|
|
}
|
|
|
|
// Close again
|
|
canEvict2 := cache.CloseFile(inode)
|
|
if !canEvict2 {
|
|
t.Error("Should be able to evict file with open count 0")
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_Metrics(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
// Add some files of different types
|
|
files := []struct {
|
|
inode uint64
|
|
filename string
|
|
path string
|
|
size uint64
|
|
}{
|
|
{1, "model.pt", "/models/model.pt", 100 * 1024 * 1024},
|
|
{2, "data.jpg", "/datasets/data.jpg", 2 * 1024 * 1024},
|
|
{3, "config.yaml", "/config/config.yaml", 1024},
|
|
{4, "regular.txt", "/docs/regular.txt", 5 * 1024},
|
|
}
|
|
|
|
for _, file := range files {
|
|
entry := &filer_pb.Entry{
|
|
Name: file.filename,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: file.size,
|
|
},
|
|
}
|
|
cache.OpenFile(file.inode, entry, file.path)
|
|
|
|
// Add some chunk metadata
|
|
metadata := &ChunkMetadata{
|
|
FileId: "chunk_" + string(rune(file.inode)),
|
|
Offset: 0,
|
|
Size: 1024,
|
|
CacheLevel: 0,
|
|
}
|
|
cache.UpdateChunkCache(file.inode, 0, metadata)
|
|
}
|
|
|
|
metrics := cache.GetMetrics()
|
|
|
|
if metrics.TotalFiles != 4 {
|
|
t.Errorf("Expected 4 total files, got %d", metrics.TotalFiles)
|
|
}
|
|
|
|
if metrics.MLFiles < 2 { // Should detect at least model and dataset
|
|
t.Errorf("Expected at least 2 ML files, got %d", metrics.MLFiles)
|
|
}
|
|
|
|
if metrics.TotalChunks != 4 {
|
|
t.Errorf("Expected 4 total chunks, got %d", metrics.TotalChunks)
|
|
}
|
|
|
|
// Check file type counts
|
|
if metrics.FileTypes[MLFileModel] == 0 {
|
|
t.Error("Should detect at least one model file")
|
|
}
|
|
|
|
if metrics.FileTypes[MLFileDataset] == 0 {
|
|
t.Error("Should detect at least one dataset file")
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_ConcurrentAccess(t *testing.T) {
|
|
cache := NewOpenFileCache(100, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
// Test concurrent access to the cache
|
|
numGoroutines := 10
|
|
done := make(chan bool, numGoroutines)
|
|
|
|
for i := 0; i < numGoroutines; i++ {
|
|
go func(id int) {
|
|
defer func() { done <- true }()
|
|
|
|
inode := uint64(id)
|
|
entry := &filer_pb.Entry{
|
|
Name: "file" + string(rune('0'+id)) + ".txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/file" + string(rune('0'+id)) + ".txt"
|
|
|
|
// Perform multiple operations
|
|
for j := 0; j < 10; j++ {
|
|
cache.OpenFile(inode, entry, fullPath)
|
|
|
|
metadata := &ChunkMetadata{
|
|
FileId: "chunk_" + string(rune(id)) + "_" + string(rune(j)),
|
|
Offset: uint64(j * 1024),
|
|
Size: 1024,
|
|
CacheLevel: 0,
|
|
}
|
|
cache.UpdateChunkCache(inode, uint32(j), metadata)
|
|
|
|
cache.GetChunkMetadata(inode, uint32(j))
|
|
cache.CloseFile(inode)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for all goroutines to complete
|
|
for i := 0; i < numGoroutines; i++ {
|
|
<-done
|
|
}
|
|
|
|
// Verify cache state
|
|
metrics := cache.GetMetrics()
|
|
if metrics.TotalFiles == 0 {
|
|
t.Error("Should have some files in cache after concurrent operations")
|
|
}
|
|
}
|
|
|
|
func TestMLFileDetector_Extensions(t *testing.T) {
|
|
detector := newMLFileDetector()
|
|
|
|
testCases := []struct {
|
|
filename string
|
|
path string
|
|
expected MLFileType
|
|
}{
|
|
{"model.pt", "/models/model.pt", MLFileModel},
|
|
{"weights.pth", "/models/weights.pth", MLFileModel},
|
|
{"data.jpg", "/datasets/data.jpg", MLFileDataset},
|
|
{"config.yaml", "/config/config.yaml", MLFileConfig},
|
|
{"tensor.safetensors", "/tensors/tensor.safetensors", MLFileModel},
|
|
{"training.log", "/logs/training.log", MLFileLog},
|
|
{"document.txt", "/docs/document.txt", MLFileUnknown},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.filename, func(t *testing.T) {
|
|
entry := &filer_pb.Entry{
|
|
Name: tc.filename,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
|
|
isML, fileType := detector.DetectMLFile(entry, tc.path)
|
|
|
|
if tc.expected == MLFileUnknown {
|
|
// For unknown files, either ML detection result is acceptable
|
|
t.Logf("File %s: isML=%v, type=%v", tc.filename, isML, fileType)
|
|
} else {
|
|
if !isML {
|
|
t.Errorf("File %s should be detected as ML file", tc.filename)
|
|
}
|
|
|
|
if fileType != tc.expected {
|
|
t.Errorf("File %s: expected type %v, got %v", tc.filename, tc.expected, fileType)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestMLFileDetector_PathPatterns(t *testing.T) {
|
|
detector := newMLFileDetector()
|
|
|
|
testCases := []struct {
|
|
path string
|
|
filename string
|
|
expected MLFileType
|
|
}{
|
|
{"/datasets/train/file.bin", "file.bin", MLFileDataset},
|
|
{"/models/checkpoint/weights", "weights", MLFileModel},
|
|
{"/data/validation/sample.dat", "sample.dat", MLFileDataset},
|
|
{"/checkpoints/model_v1.bin", "model_v1.bin", MLFileModel},
|
|
{"/documents/report.pdf", "report.pdf", MLFileUnknown},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.path, func(t *testing.T) {
|
|
entry := &filer_pb.Entry{
|
|
Name: tc.filename,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
|
|
isML, fileType := detector.DetectMLFile(entry, tc.path)
|
|
|
|
if tc.expected == MLFileUnknown {
|
|
t.Logf("Path %s: isML=%v, type=%v", tc.path, isML, fileType)
|
|
} else {
|
|
if !isML {
|
|
t.Errorf("Path %s should be detected as ML file", tc.path)
|
|
}
|
|
|
|
if fileType != tc.expected {
|
|
t.Errorf("Path %s: expected type %v, got %v", tc.path, tc.expected, fileType)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestMLFileDetector_SizeHeuristics(t *testing.T) {
|
|
detector := newMLFileDetector()
|
|
|
|
// Large file with model-related name should be detected as model
|
|
largeModelEntry := &filer_pb.Entry{
|
|
Name: "large_model.bin",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 500 * 1024 * 1024, // 500MB
|
|
},
|
|
}
|
|
|
|
isML, fileType := detector.DetectMLFile(largeModelEntry, "/checkpoints/large_model.bin")
|
|
|
|
if !isML {
|
|
t.Error("Large model file should be detected as ML file")
|
|
}
|
|
|
|
if fileType != MLFileModel {
|
|
t.Errorf("Large model file should be detected as model, got %v", fileType)
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_EvictionProtection(t *testing.T) {
|
|
cache := NewOpenFileCache(2, 5*time.Minute) // Very small cache
|
|
defer cache.Shutdown()
|
|
|
|
// Open two files and keep them open
|
|
for i := 1; i <= 2; i++ {
|
|
entry := &filer_pb.Entry{
|
|
Name: "file" + string(rune('0'+i)) + ".txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/file" + string(rune('0'+i)) + ".txt"
|
|
cache.OpenFile(uint64(i), entry, fullPath)
|
|
// Don't close - keep them open
|
|
}
|
|
|
|
// Try to open a third file - should not evict open files
|
|
entry3 := &filer_pb.Entry{
|
|
Name: "file3.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
cache.OpenFile(uint64(3), entry3, "/test/file3.txt")
|
|
|
|
// All files should still be there since none could be evicted
|
|
for i := 1; i <= 3; i++ {
|
|
fileInfo := cache.GetFileInfo(uint64(i))
|
|
if fileInfo == nil {
|
|
t.Errorf("File %d should still be in cache (eviction protection)", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_GetFileInfo_CacheHitMiss(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
inode := uint64(1)
|
|
|
|
// Test cache miss
|
|
fileInfo := cache.GetFileInfo(inode)
|
|
if fileInfo != nil {
|
|
t.Error("Should return nil for non-existent file")
|
|
}
|
|
|
|
initialMetrics := cache.GetMetrics()
|
|
if initialMetrics.CacheMisses == 0 {
|
|
t.Error("Should record cache miss")
|
|
}
|
|
|
|
// Add file to cache
|
|
entry := &filer_pb.Entry{
|
|
Name: "test.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
cache.OpenFile(inode, entry, "/test/test.txt")
|
|
|
|
// Test cache hit
|
|
fileInfo = cache.GetFileInfo(inode)
|
|
if fileInfo == nil {
|
|
t.Error("Should return file info for existing file")
|
|
}
|
|
|
|
finalMetrics := cache.GetMetrics()
|
|
if finalMetrics.CacheHits == 0 {
|
|
t.Error("Should record cache hit")
|
|
}
|
|
|
|
if finalMetrics.CacheHits <= initialMetrics.CacheHits {
|
|
t.Error("Cache hits should increase")
|
|
}
|
|
}
|
|
|
|
func TestOpenFileCache_Shutdown(t *testing.T) {
|
|
cache := NewOpenFileCache(10, 5*time.Minute)
|
|
|
|
// Add some files
|
|
for i := 1; i <= 3; i++ {
|
|
entry := &filer_pb.Entry{
|
|
Name: "file" + string(rune('0'+i)) + ".txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/file" + string(rune('0'+i)) + ".txt"
|
|
cache.OpenFile(uint64(i), entry, fullPath)
|
|
}
|
|
|
|
// Test graceful shutdown
|
|
done := make(chan struct{})
|
|
go func() {
|
|
cache.Shutdown()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Success
|
|
case <-time.After(5 * time.Second):
|
|
t.Error("Shutdown took too long")
|
|
}
|
|
}
|
|
|
|
// Benchmark tests
|
|
|
|
func BenchmarkOpenFileCache_OpenFile(b *testing.B) {
|
|
cache := NewOpenFileCache(1000, 30*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
entry := &filer_pb.Entry{
|
|
Name: "benchmark.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/benchmark.txt"
|
|
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
inode := uint64(i % 100) // Cycle through 100 files
|
|
cache.OpenFile(inode, entry, fullPath)
|
|
}
|
|
}
|
|
|
|
func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) {
|
|
cache := NewOpenFileCache(1000, 30*time.Minute)
|
|
defer cache.Shutdown()
|
|
|
|
// Pre-populate cache
|
|
entry := &filer_pb.Entry{
|
|
Name: "benchmark.txt",
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: 1024,
|
|
},
|
|
}
|
|
fullPath := "/test/benchmark.txt"
|
|
|
|
for i := 0; i < 100; i++ {
|
|
cache.OpenFile(uint64(i), entry, fullPath)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
inode := uint64(i % 100)
|
|
cache.GetFileInfo(inode)
|
|
}
|
|
}
|