1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-10 05:12:47 +02:00
seaweedfs/weed/mount/ml/open_file_cache_test.go
2025-08-30 15:32:00 -07:00

617 lines
15 KiB
Go

package ml
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func TestOpenFileCache_Basic(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
defer cache.Shutdown()
// Test opening a file
entry := &filer_pb.Entry{
Name: "test.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
inode := uint64(1)
fullPath := "/test/test.txt"
fileInfo := cache.OpenFile(inode, entry, fullPath)
if fileInfo == nil {
t.Fatal("OpenFile should return file info")
}
if fileInfo.Inode != inode {
t.Errorf("Expected inode %d, got %d", inode, fileInfo.Inode)
}
if fileInfo.OpenCount != 1 {
t.Errorf("Expected open count 1, got %d", fileInfo.OpenCount)
}
}
func TestOpenFileCache_MLFileDetection(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
defer cache.Shutdown()
testCases := []struct {
name string
path string
filename string
size uint64
expected MLFileType
}{
{"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100 * 1024 * 1024, MLFileModel},
{"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2 * 1024 * 1024, MLFileDataset},
{"Config file", "/config/training.yaml", "training.yaml", 1024, MLFileConfig},
{"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50 * 1024 * 1024, MLFileModel},
{"Log file", "/logs/training.log", "training.log", 10 * 1024, MLFileLog},
{"Regular file", "/documents/readme.txt", "readme.txt", 5 * 1024, MLFileUnknown},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
entry := &filer_pb.Entry{
Name: tc.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: tc.size,
},
}
inode := uint64(time.Now().UnixNano()) // Unique inode
fileInfo := cache.OpenFile(inode, entry, tc.path)
if tc.expected == MLFileUnknown {
if fileInfo.IsMLFile {
t.Errorf("File %s should not be detected as ML file", tc.path)
}
} else {
if !fileInfo.IsMLFile {
t.Errorf("File %s should be detected as ML file", tc.path)
}
if fileInfo.FileType != tc.expected {
t.Errorf("Expected file type %v, got %v", tc.expected, fileInfo.FileType)
}
}
})
}
}
func TestOpenFileCache_ChunkMetadata(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
defer cache.Shutdown()
inode := uint64(1)
entry := &filer_pb.Entry{
Name: "data.bin",
Attributes: &filer_pb.FuseAttributes{
FileSize: 10240,
},
}
fullPath := "/data/data.bin"
cache.OpenFile(inode, entry, fullPath)
// Test updating chunk metadata
chunkIndex := uint32(0)
metadata := &ChunkMetadata{
FileId: "chunk_0",
Offset: 0,
Size: 1024,
CacheLevel: 0,
LastAccess: time.Now(),
AccessCount: 1,
Pattern: SequentialAccess,
}
cache.UpdateChunkCache(inode, chunkIndex, metadata)
// Test retrieving chunk metadata
retrieved, exists := cache.GetChunkMetadata(inode, chunkIndex)
if !exists {
t.Error("Chunk metadata should exist")
}
if retrieved.FileId != metadata.FileId {
t.Errorf("Expected FileId %s, got %s", metadata.FileId, retrieved.FileId)
}
if retrieved.AccessCount != 2 { // Should be incremented during retrieval
t.Errorf("Expected access count 2, got %d", retrieved.AccessCount)
}
}
func TestOpenFileCache_LRUEviction(t *testing.T) {
cache := NewOpenFileCache(3, 5*time.Minute) // Small cache for testing
defer cache.Shutdown()
// Fill cache to capacity
for i := 1; i <= 3; i++ {
entry := &filer_pb.Entry{
Name: "file" + string(rune('0'+i)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/file" + string(rune('0'+i)) + ".txt"
cache.OpenFile(uint64(i), entry, fullPath)
cache.CloseFile(uint64(i)) // Close immediately so they can be evicted
}
// Add one more file - should trigger eviction
entry4 := &filer_pb.Entry{
Name: "file4.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
cache.OpenFile(uint64(4), entry4, "/test/file4.txt")
metrics := cache.GetMetrics()
if metrics.EvictedFiles == 0 {
t.Error("Should have evicted at least one file")
}
// File 1 should be evicted (oldest)
file1Info := cache.GetFileInfo(uint64(1))
if file1Info != nil {
t.Error("File 1 should have been evicted")
}
// File 4 should still be there
file4Info := cache.GetFileInfo(uint64(4))
if file4Info == nil {
t.Error("File 4 should still be in cache")
}
}
func TestOpenFileCache_TTLCleanup(t *testing.T) {
cache := NewOpenFileCache(10, 100*time.Millisecond) // Short TTL for testing
defer cache.Shutdown()
inode := uint64(1)
entry := &filer_pb.Entry{
Name: "test.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fileInfo := cache.OpenFile(inode, entry, "/test/test.txt")
cache.CloseFile(inode) // Close so it can be cleaned up
// Wait for TTL to expire
time.Sleep(150 * time.Millisecond)
// Trigger cleanup manually
cache.cleanup()
// File should be cleaned up
retrievedInfo := cache.GetFileInfo(inode)
if retrievedInfo != nil {
t.Error("File should have been cleaned up after TTL expiration")
}
_ = fileInfo // Avoid unused variable warning
}
func TestOpenFileCache_MultipleOpens(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
defer cache.Shutdown()
inode := uint64(1)
entry := &filer_pb.Entry{
Name: "shared.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/shared.txt"
// Open file multiple times
fileInfo1 := cache.OpenFile(inode, entry, fullPath)
fileInfo2 := cache.OpenFile(inode, entry, fullPath)
if fileInfo1 != fileInfo2 {
t.Error("Multiple opens of same file should return same file info")
}
if fileInfo1.OpenCount != 2 {
t.Errorf("Expected open count 2, got %d", fileInfo1.OpenCount)
}
// Close once
canEvict1 := cache.CloseFile(inode)
if canEvict1 {
t.Error("Should not be able to evict file with open count > 0")
}
if fileInfo1.OpenCount != 1 {
t.Errorf("Expected open count 1 after first close, got %d", fileInfo1.OpenCount)
}
// Close again
canEvict2 := cache.CloseFile(inode)
if !canEvict2 {
t.Error("Should be able to evict file with open count 0")
}
}
func TestOpenFileCache_Metrics(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
defer cache.Shutdown()
// Add some files of different types
files := []struct {
inode uint64
filename string
path string
size uint64
}{
{1, "model.pt", "/models/model.pt", 100 * 1024 * 1024},
{2, "data.jpg", "/datasets/data.jpg", 2 * 1024 * 1024},
{3, "config.yaml", "/config/config.yaml", 1024},
{4, "regular.txt", "/docs/regular.txt", 5 * 1024},
}
for _, file := range files {
entry := &filer_pb.Entry{
Name: file.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: file.size,
},
}
cache.OpenFile(file.inode, entry, file.path)
// Add some chunk metadata
metadata := &ChunkMetadata{
FileId: "chunk_" + string(rune(file.inode)),
Offset: 0,
Size: 1024,
CacheLevel: 0,
}
cache.UpdateChunkCache(file.inode, 0, metadata)
}
metrics := cache.GetMetrics()
if metrics.TotalFiles != 4 {
t.Errorf("Expected 4 total files, got %d", metrics.TotalFiles)
}
if metrics.MLFiles < 2 { // Should detect at least model and dataset
t.Errorf("Expected at least 2 ML files, got %d", metrics.MLFiles)
}
if metrics.TotalChunks != 4 {
t.Errorf("Expected 4 total chunks, got %d", metrics.TotalChunks)
}
// Check file type counts
if metrics.FileTypes[MLFileModel] == 0 {
t.Error("Should detect at least one model file")
}
if metrics.FileTypes[MLFileDataset] == 0 {
t.Error("Should detect at least one dataset file")
}
}
func TestOpenFileCache_ConcurrentAccess(t *testing.T) {
cache := NewOpenFileCache(100, 5*time.Minute)
defer cache.Shutdown()
// Test concurrent access to the cache
numGoroutines := 10
done := make(chan bool, numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer func() { done <- true }()
inode := uint64(id)
entry := &filer_pb.Entry{
Name: "file" + string(rune('0'+id)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/file" + string(rune('0'+id)) + ".txt"
// Perform multiple operations
for j := 0; j < 10; j++ {
cache.OpenFile(inode, entry, fullPath)
metadata := &ChunkMetadata{
FileId: "chunk_" + string(rune(id)) + "_" + string(rune(j)),
Offset: uint64(j * 1024),
Size: 1024,
CacheLevel: 0,
}
cache.UpdateChunkCache(inode, uint32(j), metadata)
cache.GetChunkMetadata(inode, uint32(j))
cache.CloseFile(inode)
}
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < numGoroutines; i++ {
<-done
}
// Verify cache state
metrics := cache.GetMetrics()
if metrics.TotalFiles == 0 {
t.Error("Should have some files in cache after concurrent operations")
}
}
func TestMLFileDetector_Extensions(t *testing.T) {
detector := newMLFileDetector()
testCases := []struct {
filename string
path string
expected MLFileType
}{
{"model.pt", "/models/model.pt", MLFileModel},
{"weights.pth", "/models/weights.pth", MLFileModel},
{"data.jpg", "/datasets/data.jpg", MLFileDataset},
{"config.yaml", "/config/config.yaml", MLFileConfig},
{"tensor.safetensors", "/tensors/tensor.safetensors", MLFileModel},
{"training.log", "/logs/training.log", MLFileLog},
{"document.txt", "/docs/document.txt", MLFileUnknown},
}
for _, tc := range testCases {
t.Run(tc.filename, func(t *testing.T) {
entry := &filer_pb.Entry{
Name: tc.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
isML, fileType := detector.DetectMLFile(entry, tc.path)
if tc.expected == MLFileUnknown {
// For unknown files, either ML detection result is acceptable
t.Logf("File %s: isML=%v, type=%v", tc.filename, isML, fileType)
} else {
if !isML {
t.Errorf("File %s should be detected as ML file", tc.filename)
}
if fileType != tc.expected {
t.Errorf("File %s: expected type %v, got %v", tc.filename, tc.expected, fileType)
}
}
})
}
}
func TestMLFileDetector_PathPatterns(t *testing.T) {
detector := newMLFileDetector()
testCases := []struct {
path string
filename string
expected MLFileType
}{
{"/datasets/train/file.bin", "file.bin", MLFileDataset},
{"/models/checkpoint/weights", "weights", MLFileModel},
{"/data/validation/sample.dat", "sample.dat", MLFileDataset},
{"/checkpoints/model_v1.bin", "model_v1.bin", MLFileModel},
{"/documents/report.pdf", "report.pdf", MLFileUnknown},
}
for _, tc := range testCases {
t.Run(tc.path, func(t *testing.T) {
entry := &filer_pb.Entry{
Name: tc.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
isML, fileType := detector.DetectMLFile(entry, tc.path)
if tc.expected == MLFileUnknown {
t.Logf("Path %s: isML=%v, type=%v", tc.path, isML, fileType)
} else {
if !isML {
t.Errorf("Path %s should be detected as ML file", tc.path)
}
if fileType != tc.expected {
t.Errorf("Path %s: expected type %v, got %v", tc.path, tc.expected, fileType)
}
}
})
}
}
func TestMLFileDetector_SizeHeuristics(t *testing.T) {
detector := newMLFileDetector()
// Large file with model-related name should be detected as model
largeModelEntry := &filer_pb.Entry{
Name: "large_model.bin",
Attributes: &filer_pb.FuseAttributes{
FileSize: 500 * 1024 * 1024, // 500MB
},
}
isML, fileType := detector.DetectMLFile(largeModelEntry, "/checkpoints/large_model.bin")
if !isML {
t.Error("Large model file should be detected as ML file")
}
if fileType != MLFileModel {
t.Errorf("Large model file should be detected as model, got %v", fileType)
}
}
func TestOpenFileCache_EvictionProtection(t *testing.T) {
cache := NewOpenFileCache(2, 5*time.Minute) // Very small cache
defer cache.Shutdown()
// Open two files and keep them open
for i := 1; i <= 2; i++ {
entry := &filer_pb.Entry{
Name: "file" + string(rune('0'+i)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/file" + string(rune('0'+i)) + ".txt"
cache.OpenFile(uint64(i), entry, fullPath)
// Don't close - keep them open
}
// Try to open a third file - should not evict open files
entry3 := &filer_pb.Entry{
Name: "file3.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
cache.OpenFile(uint64(3), entry3, "/test/file3.txt")
// All files should still be there since none could be evicted
for i := 1; i <= 3; i++ {
fileInfo := cache.GetFileInfo(uint64(i))
if fileInfo == nil {
t.Errorf("File %d should still be in cache (eviction protection)", i)
}
}
}
func TestOpenFileCache_GetFileInfo_CacheHitMiss(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
defer cache.Shutdown()
inode := uint64(1)
// Test cache miss
fileInfo := cache.GetFileInfo(inode)
if fileInfo != nil {
t.Error("Should return nil for non-existent file")
}
initialMetrics := cache.GetMetrics()
if initialMetrics.CacheMisses == 0 {
t.Error("Should record cache miss")
}
// Add file to cache
entry := &filer_pb.Entry{
Name: "test.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
cache.OpenFile(inode, entry, "/test/test.txt")
// Test cache hit
fileInfo = cache.GetFileInfo(inode)
if fileInfo == nil {
t.Error("Should return file info for existing file")
}
finalMetrics := cache.GetMetrics()
if finalMetrics.CacheHits == 0 {
t.Error("Should record cache hit")
}
if finalMetrics.CacheHits <= initialMetrics.CacheHits {
t.Error("Cache hits should increase")
}
}
func TestOpenFileCache_Shutdown(t *testing.T) {
cache := NewOpenFileCache(10, 5*time.Minute)
// Add some files
for i := 1; i <= 3; i++ {
entry := &filer_pb.Entry{
Name: "file" + string(rune('0'+i)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/file" + string(rune('0'+i)) + ".txt"
cache.OpenFile(uint64(i), entry, fullPath)
}
// Test graceful shutdown
done := make(chan struct{})
go func() {
cache.Shutdown()
close(done)
}()
select {
case <-done:
// Success
case <-time.After(5 * time.Second):
t.Error("Shutdown took too long")
}
}
// Benchmark tests
func BenchmarkOpenFileCache_OpenFile(b *testing.B) {
cache := NewOpenFileCache(1000, 30*time.Minute)
defer cache.Shutdown()
entry := &filer_pb.Entry{
Name: "benchmark.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/benchmark.txt"
b.ResetTimer()
for i := 0; i < b.N; i++ {
inode := uint64(i % 100) // Cycle through 100 files
cache.OpenFile(inode, entry, fullPath)
}
}
func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) {
cache := NewOpenFileCache(1000, 30*time.Minute)
defer cache.Shutdown()
// Pre-populate cache
entry := &filer_pb.Entry{
Name: "benchmark.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/benchmark.txt"
for i := 0; i < 100; i++ {
cache.OpenFile(uint64(i), entry, fullPath)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
inode := uint64(i % 100)
cache.GetFileInfo(inode)
}
}