mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-24 20:42:47 +02:00
* add more tests * move to new package * add github action * Update fuse-integration.yml * Update fuse-integration.yml * Update test/fuse_integration/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/fuse_integration/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/fuse_integration/framework.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/fuse_integration/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/fuse_integration/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix * Update test/fuse_integration/concurrent_operations_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
448 lines
12 KiB
Go
448 lines
12 KiB
Go
package fuse_test
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestConcurrentFileOperations tests concurrent file operations
|
|
func TestConcurrentFileOperations(t *testing.T) {
|
|
framework := NewFuseTestFramework(t, DefaultTestConfig())
|
|
defer framework.Cleanup()
|
|
|
|
require.NoError(t, framework.Setup(DefaultTestConfig()))
|
|
|
|
t.Run("ConcurrentFileWrites", func(t *testing.T) {
|
|
testConcurrentFileWrites(t, framework)
|
|
})
|
|
|
|
t.Run("ConcurrentFileReads", func(t *testing.T) {
|
|
testConcurrentFileReads(t, framework)
|
|
})
|
|
|
|
t.Run("ConcurrentReadWrite", func(t *testing.T) {
|
|
testConcurrentReadWrite(t, framework)
|
|
})
|
|
|
|
t.Run("ConcurrentDirectoryOperations", func(t *testing.T) {
|
|
testConcurrentDirectoryOperations(t, framework)
|
|
})
|
|
|
|
t.Run("ConcurrentFileCreation", func(t *testing.T) {
|
|
testConcurrentFileCreation(t, framework)
|
|
})
|
|
}
|
|
|
|
// testConcurrentFileWrites tests multiple goroutines writing to different files
|
|
func testConcurrentFileWrites(t *testing.T, framework *FuseTestFramework) {
|
|
numWorkers := 10
|
|
filesPerWorker := 5
|
|
var wg sync.WaitGroup
|
|
var mutex sync.Mutex
|
|
errors := make([]error, 0)
|
|
|
|
// Function to collect errors safely
|
|
addError := func(err error) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
errors = append(errors, err)
|
|
}
|
|
|
|
// Start concurrent workers
|
|
for worker := 0; worker < numWorkers; worker++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
|
|
for file := 0; file < filesPerWorker; file++ {
|
|
filename := fmt.Sprintf("worker_%d_file_%d.txt", workerID, file)
|
|
content := []byte(fmt.Sprintf("Worker %d, File %d - %s", workerID, file, time.Now().String()))
|
|
|
|
mountPath := filepath.Join(framework.GetMountPoint(), filename)
|
|
if err := os.WriteFile(mountPath, content, 0644); err != nil {
|
|
addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
|
|
return
|
|
}
|
|
|
|
// Verify file was written correctly
|
|
readContent, err := os.ReadFile(mountPath)
|
|
if err != nil {
|
|
addError(fmt.Errorf("worker %d file %d read: %v", workerID, file, err))
|
|
return
|
|
}
|
|
|
|
if !bytes.Equal(content, readContent) {
|
|
addError(fmt.Errorf("worker %d file %d: content mismatch", workerID, file))
|
|
return
|
|
}
|
|
}
|
|
}(worker)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Check for errors
|
|
require.Empty(t, errors, "Concurrent writes failed: %v", errors)
|
|
|
|
// Verify all files exist and have correct content
|
|
for worker := 0; worker < numWorkers; worker++ {
|
|
for file := 0; file < filesPerWorker; file++ {
|
|
filename := fmt.Sprintf("worker_%d_file_%d.txt", worker, file)
|
|
framework.AssertFileExists(filename)
|
|
}
|
|
}
|
|
}
|
|
|
|
// testConcurrentFileReads tests multiple goroutines reading from the same file
|
|
func testConcurrentFileReads(t *testing.T, framework *FuseTestFramework) {
|
|
// Create a test file
|
|
filename := "concurrent_read_test.txt"
|
|
testData := make([]byte, 1024*1024) // 1MB
|
|
_, err := rand.Read(testData)
|
|
require.NoError(t, err)
|
|
|
|
framework.CreateTestFile(filename, testData)
|
|
|
|
numReaders := 20
|
|
var wg sync.WaitGroup
|
|
var mutex sync.Mutex
|
|
errors := make([]error, 0)
|
|
|
|
addError := func(err error) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
errors = append(errors, err)
|
|
}
|
|
|
|
// Start concurrent readers
|
|
for reader := 0; reader < numReaders; reader++ {
|
|
wg.Add(1)
|
|
go func(readerID int) {
|
|
defer wg.Done()
|
|
|
|
mountPath := filepath.Join(framework.GetMountPoint(), filename)
|
|
|
|
// Read multiple times
|
|
for i := 0; i < 3; i++ {
|
|
readData, err := os.ReadFile(mountPath)
|
|
if err != nil {
|
|
addError(fmt.Errorf("reader %d iteration %d: %v", readerID, i, err))
|
|
return
|
|
}
|
|
|
|
if !bytes.Equal(testData, readData) {
|
|
addError(fmt.Errorf("reader %d iteration %d: data mismatch", readerID, i))
|
|
return
|
|
}
|
|
}
|
|
}(reader)
|
|
}
|
|
|
|
wg.Wait()
|
|
require.Empty(t, errors, "Concurrent reads failed: %v", errors)
|
|
}
|
|
|
|
// testConcurrentReadWrite tests simultaneous read and write operations
|
|
func testConcurrentReadWrite(t *testing.T, framework *FuseTestFramework) {
|
|
filename := "concurrent_rw_test.txt"
|
|
initialData := bytes.Repeat([]byte("INITIAL"), 1000)
|
|
framework.CreateTestFile(filename, initialData)
|
|
|
|
var wg sync.WaitGroup
|
|
var mutex sync.Mutex
|
|
errors := make([]error, 0)
|
|
|
|
addError := func(err error) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
errors = append(errors, err)
|
|
}
|
|
|
|
mountPath := filepath.Join(framework.GetMountPoint(), filename)
|
|
|
|
// Start readers
|
|
numReaders := 5
|
|
for i := 0; i < numReaders; i++ {
|
|
wg.Add(1)
|
|
go func(readerID int) {
|
|
defer wg.Done()
|
|
|
|
for j := 0; j < 10; j++ {
|
|
_, err := os.ReadFile(mountPath)
|
|
if err != nil {
|
|
addError(fmt.Errorf("reader %d: %v", readerID, err))
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Start writers
|
|
numWriters := 2
|
|
for i := 0; i < numWriters; i++ {
|
|
wg.Add(1)
|
|
go func(writerID int) {
|
|
defer wg.Done()
|
|
|
|
for j := 0; j < 5; j++ {
|
|
newData := bytes.Repeat([]byte(fmt.Sprintf("WRITER%d", writerID)), 1000)
|
|
err := os.WriteFile(mountPath, newData, 0644)
|
|
if err != nil {
|
|
addError(fmt.Errorf("writer %d: %v", writerID, err))
|
|
return
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
require.Empty(t, errors, "Concurrent read/write failed: %v", errors)
|
|
|
|
// Verify file still exists and is readable
|
|
framework.AssertFileExists(filename)
|
|
}
|
|
|
|
// testConcurrentDirectoryOperations tests concurrent directory operations
|
|
func testConcurrentDirectoryOperations(t *testing.T, framework *FuseTestFramework) {
|
|
numWorkers := 8
|
|
var wg sync.WaitGroup
|
|
var mutex sync.Mutex
|
|
errors := make([]error, 0)
|
|
|
|
addError := func(err error) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
errors = append(errors, err)
|
|
}
|
|
|
|
// Each worker creates a directory tree
|
|
for worker := 0; worker < numWorkers; worker++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
|
|
// Create worker directory
|
|
workerDir := fmt.Sprintf("worker_%d", workerID)
|
|
mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
|
|
|
|
if err := os.Mkdir(mountPath, 0755); err != nil {
|
|
addError(fmt.Errorf("worker %d mkdir: %v", workerID, err))
|
|
return
|
|
}
|
|
|
|
// Create subdirectories and files
|
|
for i := 0; i < 5; i++ {
|
|
subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
|
|
if err := os.Mkdir(subDir, 0755); err != nil {
|
|
addError(fmt.Errorf("worker %d subdir %d: %v", workerID, i, err))
|
|
return
|
|
}
|
|
|
|
// Create file in subdirectory
|
|
testFile := filepath.Join(subDir, "test.txt")
|
|
content := []byte(fmt.Sprintf("Worker %d, Subdir %d", workerID, i))
|
|
if err := os.WriteFile(testFile, content, 0644); err != nil {
|
|
addError(fmt.Errorf("worker %d file %d: %v", workerID, i, err))
|
|
return
|
|
}
|
|
}
|
|
}(worker)
|
|
}
|
|
|
|
wg.Wait()
|
|
require.Empty(t, errors, "Concurrent directory operations failed: %v", errors)
|
|
|
|
// Verify all structures were created
|
|
for worker := 0; worker < numWorkers; worker++ {
|
|
workerDir := fmt.Sprintf("worker_%d", worker)
|
|
mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
|
|
|
|
info, err := os.Stat(mountPath)
|
|
require.NoError(t, err)
|
|
assert.True(t, info.IsDir())
|
|
|
|
// Check subdirectories
|
|
for i := 0; i < 5; i++ {
|
|
subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
|
|
info, err := os.Stat(subDir)
|
|
require.NoError(t, err)
|
|
assert.True(t, info.IsDir())
|
|
|
|
testFile := filepath.Join(subDir, "test.txt")
|
|
expectedContent := []byte(fmt.Sprintf("Worker %d, Subdir %d", worker, i))
|
|
actualContent, err := os.ReadFile(testFile)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, expectedContent, actualContent)
|
|
}
|
|
}
|
|
}
|
|
|
|
// testConcurrentFileCreation tests concurrent creation of files in same directory
|
|
func testConcurrentFileCreation(t *testing.T, framework *FuseTestFramework) {
|
|
// Create test directory
|
|
testDir := "concurrent_creation"
|
|
framework.CreateTestDir(testDir)
|
|
|
|
numWorkers := 15
|
|
filesPerWorker := 10
|
|
var wg sync.WaitGroup
|
|
var mutex sync.Mutex
|
|
errors := make([]error, 0)
|
|
createdFiles := make(map[string]bool)
|
|
|
|
addError := func(err error) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
errors = append(errors, err)
|
|
}
|
|
|
|
addFile := func(filename string) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
createdFiles[filename] = true
|
|
}
|
|
|
|
// Create files concurrently
|
|
for worker := 0; worker < numWorkers; worker++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
|
|
for file := 0; file < filesPerWorker; file++ {
|
|
filename := fmt.Sprintf("file_%d_%d.txt", workerID, file)
|
|
relativePath := filepath.Join(testDir, filename)
|
|
mountPath := filepath.Join(framework.GetMountPoint(), relativePath)
|
|
|
|
content := []byte(fmt.Sprintf("Worker %d, File %d, Time: %s",
|
|
workerID, file, time.Now().Format(time.RFC3339Nano)))
|
|
|
|
if err := os.WriteFile(mountPath, content, 0644); err != nil {
|
|
addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
|
|
return
|
|
}
|
|
|
|
addFile(filename)
|
|
}
|
|
}(worker)
|
|
}
|
|
|
|
wg.Wait()
|
|
require.Empty(t, errors, "Concurrent file creation failed: %v", errors)
|
|
|
|
// Verify all files were created
|
|
expectedCount := numWorkers * filesPerWorker
|
|
assert.Equal(t, expectedCount, len(createdFiles))
|
|
|
|
// Read directory and verify count
|
|
mountPath := filepath.Join(framework.GetMountPoint(), testDir)
|
|
entries, err := os.ReadDir(mountPath)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, expectedCount, len(entries))
|
|
|
|
// Verify each file exists and has content
|
|
for filename := range createdFiles {
|
|
relativePath := filepath.Join(testDir, filename)
|
|
framework.AssertFileExists(relativePath)
|
|
}
|
|
}
|
|
|
|
// TestStressOperations tests high-load scenarios
|
|
func TestStressOperations(t *testing.T) {
|
|
framework := NewFuseTestFramework(t, DefaultTestConfig())
|
|
defer framework.Cleanup()
|
|
|
|
require.NoError(t, framework.Setup(DefaultTestConfig()))
|
|
|
|
t.Run("HighFrequencySmallWrites", func(t *testing.T) {
|
|
testHighFrequencySmallWrites(t, framework)
|
|
})
|
|
|
|
t.Run("ManySmallFiles", func(t *testing.T) {
|
|
testManySmallFiles(t, framework)
|
|
})
|
|
}
|
|
|
|
// testHighFrequencySmallWrites tests many small writes to the same file
|
|
func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) {
|
|
filename := "high_freq_writes.txt"
|
|
mountPath := filepath.Join(framework.GetMountPoint(), filename)
|
|
|
|
// Open file for writing
|
|
file, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
|
|
require.NoError(t, err)
|
|
defer file.Close()
|
|
|
|
// Perform many small writes
|
|
numWrites := 1000
|
|
writeSize := 100
|
|
|
|
for i := 0; i < numWrites; i++ {
|
|
data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20)))
|
|
_, err := file.Write(data)
|
|
require.NoError(t, err)
|
|
}
|
|
file.Close()
|
|
|
|
// Verify file size
|
|
info, err := os.Stat(mountPath)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, totalSize, info.Size())
|
|
}
|
|
|
|
// testManySmallFiles tests creating many small files
|
|
func testManySmallFiles(t *testing.T, framework *FuseTestFramework) {
|
|
testDir := "many_small_files"
|
|
framework.CreateTestDir(testDir)
|
|
|
|
numFiles := 500
|
|
var wg sync.WaitGroup
|
|
var mutex sync.Mutex
|
|
errors := make([]error, 0)
|
|
|
|
addError := func(err error) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
errors = append(errors, err)
|
|
}
|
|
|
|
// Create files in batches
|
|
batchSize := 50
|
|
for batch := 0; batch < numFiles/batchSize; batch++ {
|
|
wg.Add(1)
|
|
go func(batchID int) {
|
|
defer wg.Done()
|
|
|
|
for i := 0; i < batchSize; i++ {
|
|
fileNum := batchID*batchSize + i
|
|
filename := filepath.Join(testDir, fmt.Sprintf("small_file_%04d.txt", fileNum))
|
|
content := []byte(fmt.Sprintf("File %d content", fileNum))
|
|
|
|
mountPath := filepath.Join(framework.GetMountPoint(), filename)
|
|
if err := os.WriteFile(mountPath, content, 0644); err != nil {
|
|
addError(fmt.Errorf("file %d: %v", fileNum, err))
|
|
return
|
|
}
|
|
}
|
|
}(batch)
|
|
}
|
|
|
|
wg.Wait()
|
|
require.Empty(t, errors, "Many small files creation failed: %v", errors)
|
|
|
|
// Verify directory listing
|
|
mountPath := filepath.Join(framework.GetMountPoint(), testDir)
|
|
entries, err := os.ReadDir(mountPath)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, numFiles, len(entries))
|
|
}
|