1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-24 20:42:47 +02:00
seaweedfs/test/fuse_integration/concurrent_operations_test.go
Chris Lu 9982f91b4c
Add more fuse tests (#6992)
* add more tests

* move to new package

* add github action

* Update fuse-integration.yml

* Update fuse-integration.yml

* Update test/fuse_integration/README.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update test/fuse_integration/README.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update test/fuse_integration/framework.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update test/fuse_integration/README.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update test/fuse_integration/README.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix

* Update test/fuse_integration/concurrent_operations_test.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-07-16 12:43:08 -07:00

448 lines
12 KiB
Go

package fuse_test
import (
"bytes"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestConcurrentFileOperations tests concurrent file operations
func TestConcurrentFileOperations(t *testing.T) {
framework := NewFuseTestFramework(t, DefaultTestConfig())
defer framework.Cleanup()
require.NoError(t, framework.Setup(DefaultTestConfig()))
t.Run("ConcurrentFileWrites", func(t *testing.T) {
testConcurrentFileWrites(t, framework)
})
t.Run("ConcurrentFileReads", func(t *testing.T) {
testConcurrentFileReads(t, framework)
})
t.Run("ConcurrentReadWrite", func(t *testing.T) {
testConcurrentReadWrite(t, framework)
})
t.Run("ConcurrentDirectoryOperations", func(t *testing.T) {
testConcurrentDirectoryOperations(t, framework)
})
t.Run("ConcurrentFileCreation", func(t *testing.T) {
testConcurrentFileCreation(t, framework)
})
}
// testConcurrentFileWrites tests multiple goroutines writing to different files
func testConcurrentFileWrites(t *testing.T, framework *FuseTestFramework) {
numWorkers := 10
filesPerWorker := 5
var wg sync.WaitGroup
var mutex sync.Mutex
errors := make([]error, 0)
// Function to collect errors safely
addError := func(err error) {
mutex.Lock()
defer mutex.Unlock()
errors = append(errors, err)
}
// Start concurrent workers
for worker := 0; worker < numWorkers; worker++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for file := 0; file < filesPerWorker; file++ {
filename := fmt.Sprintf("worker_%d_file_%d.txt", workerID, file)
content := []byte(fmt.Sprintf("Worker %d, File %d - %s", workerID, file, time.Now().String()))
mountPath := filepath.Join(framework.GetMountPoint(), filename)
if err := os.WriteFile(mountPath, content, 0644); err != nil {
addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
return
}
// Verify file was written correctly
readContent, err := os.ReadFile(mountPath)
if err != nil {
addError(fmt.Errorf("worker %d file %d read: %v", workerID, file, err))
return
}
if !bytes.Equal(content, readContent) {
addError(fmt.Errorf("worker %d file %d: content mismatch", workerID, file))
return
}
}
}(worker)
}
wg.Wait()
// Check for errors
require.Empty(t, errors, "Concurrent writes failed: %v", errors)
// Verify all files exist and have correct content
for worker := 0; worker < numWorkers; worker++ {
for file := 0; file < filesPerWorker; file++ {
filename := fmt.Sprintf("worker_%d_file_%d.txt", worker, file)
framework.AssertFileExists(filename)
}
}
}
// testConcurrentFileReads tests multiple goroutines reading from the same file
func testConcurrentFileReads(t *testing.T, framework *FuseTestFramework) {
// Create a test file
filename := "concurrent_read_test.txt"
testData := make([]byte, 1024*1024) // 1MB
_, err := rand.Read(testData)
require.NoError(t, err)
framework.CreateTestFile(filename, testData)
numReaders := 20
var wg sync.WaitGroup
var mutex sync.Mutex
errors := make([]error, 0)
addError := func(err error) {
mutex.Lock()
defer mutex.Unlock()
errors = append(errors, err)
}
// Start concurrent readers
for reader := 0; reader < numReaders; reader++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Read multiple times
for i := 0; i < 3; i++ {
readData, err := os.ReadFile(mountPath)
if err != nil {
addError(fmt.Errorf("reader %d iteration %d: %v", readerID, i, err))
return
}
if !bytes.Equal(testData, readData) {
addError(fmt.Errorf("reader %d iteration %d: data mismatch", readerID, i))
return
}
}
}(reader)
}
wg.Wait()
require.Empty(t, errors, "Concurrent reads failed: %v", errors)
}
// testConcurrentReadWrite tests simultaneous read and write operations
func testConcurrentReadWrite(t *testing.T, framework *FuseTestFramework) {
filename := "concurrent_rw_test.txt"
initialData := bytes.Repeat([]byte("INITIAL"), 1000)
framework.CreateTestFile(filename, initialData)
var wg sync.WaitGroup
var mutex sync.Mutex
errors := make([]error, 0)
addError := func(err error) {
mutex.Lock()
defer mutex.Unlock()
errors = append(errors, err)
}
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Start readers
numReaders := 5
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := os.ReadFile(mountPath)
if err != nil {
addError(fmt.Errorf("reader %d: %v", readerID, err))
return
}
time.Sleep(10 * time.Millisecond)
}
}(i)
}
// Start writers
numWriters := 2
for i := 0; i < numWriters; i++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
for j := 0; j < 5; j++ {
newData := bytes.Repeat([]byte(fmt.Sprintf("WRITER%d", writerID)), 1000)
err := os.WriteFile(mountPath, newData, 0644)
if err != nil {
addError(fmt.Errorf("writer %d: %v", writerID, err))
return
}
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
require.Empty(t, errors, "Concurrent read/write failed: %v", errors)
// Verify file still exists and is readable
framework.AssertFileExists(filename)
}
// testConcurrentDirectoryOperations tests concurrent directory operations
func testConcurrentDirectoryOperations(t *testing.T, framework *FuseTestFramework) {
numWorkers := 8
var wg sync.WaitGroup
var mutex sync.Mutex
errors := make([]error, 0)
addError := func(err error) {
mutex.Lock()
defer mutex.Unlock()
errors = append(errors, err)
}
// Each worker creates a directory tree
for worker := 0; worker < numWorkers; worker++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// Create worker directory
workerDir := fmt.Sprintf("worker_%d", workerID)
mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
if err := os.Mkdir(mountPath, 0755); err != nil {
addError(fmt.Errorf("worker %d mkdir: %v", workerID, err))
return
}
// Create subdirectories and files
for i := 0; i < 5; i++ {
subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
if err := os.Mkdir(subDir, 0755); err != nil {
addError(fmt.Errorf("worker %d subdir %d: %v", workerID, i, err))
return
}
// Create file in subdirectory
testFile := filepath.Join(subDir, "test.txt")
content := []byte(fmt.Sprintf("Worker %d, Subdir %d", workerID, i))
if err := os.WriteFile(testFile, content, 0644); err != nil {
addError(fmt.Errorf("worker %d file %d: %v", workerID, i, err))
return
}
}
}(worker)
}
wg.Wait()
require.Empty(t, errors, "Concurrent directory operations failed: %v", errors)
// Verify all structures were created
for worker := 0; worker < numWorkers; worker++ {
workerDir := fmt.Sprintf("worker_%d", worker)
mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
info, err := os.Stat(mountPath)
require.NoError(t, err)
assert.True(t, info.IsDir())
// Check subdirectories
for i := 0; i < 5; i++ {
subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
info, err := os.Stat(subDir)
require.NoError(t, err)
assert.True(t, info.IsDir())
testFile := filepath.Join(subDir, "test.txt")
expectedContent := []byte(fmt.Sprintf("Worker %d, Subdir %d", worker, i))
actualContent, err := os.ReadFile(testFile)
require.NoError(t, err)
assert.Equal(t, expectedContent, actualContent)
}
}
}
// testConcurrentFileCreation tests concurrent creation of files in same directory
func testConcurrentFileCreation(t *testing.T, framework *FuseTestFramework) {
// Create test directory
testDir := "concurrent_creation"
framework.CreateTestDir(testDir)
numWorkers := 15
filesPerWorker := 10
var wg sync.WaitGroup
var mutex sync.Mutex
errors := make([]error, 0)
createdFiles := make(map[string]bool)
addError := func(err error) {
mutex.Lock()
defer mutex.Unlock()
errors = append(errors, err)
}
addFile := func(filename string) {
mutex.Lock()
defer mutex.Unlock()
createdFiles[filename] = true
}
// Create files concurrently
for worker := 0; worker < numWorkers; worker++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for file := 0; file < filesPerWorker; file++ {
filename := fmt.Sprintf("file_%d_%d.txt", workerID, file)
relativePath := filepath.Join(testDir, filename)
mountPath := filepath.Join(framework.GetMountPoint(), relativePath)
content := []byte(fmt.Sprintf("Worker %d, File %d, Time: %s",
workerID, file, time.Now().Format(time.RFC3339Nano)))
if err := os.WriteFile(mountPath, content, 0644); err != nil {
addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
return
}
addFile(filename)
}
}(worker)
}
wg.Wait()
require.Empty(t, errors, "Concurrent file creation failed: %v", errors)
// Verify all files were created
expectedCount := numWorkers * filesPerWorker
assert.Equal(t, expectedCount, len(createdFiles))
// Read directory and verify count
mountPath := filepath.Join(framework.GetMountPoint(), testDir)
entries, err := os.ReadDir(mountPath)
require.NoError(t, err)
assert.Equal(t, expectedCount, len(entries))
// Verify each file exists and has content
for filename := range createdFiles {
relativePath := filepath.Join(testDir, filename)
framework.AssertFileExists(relativePath)
}
}
// TestStressOperations tests high-load scenarios
func TestStressOperations(t *testing.T) {
framework := NewFuseTestFramework(t, DefaultTestConfig())
defer framework.Cleanup()
require.NoError(t, framework.Setup(DefaultTestConfig()))
t.Run("HighFrequencySmallWrites", func(t *testing.T) {
testHighFrequencySmallWrites(t, framework)
})
t.Run("ManySmallFiles", func(t *testing.T) {
testManySmallFiles(t, framework)
})
}
// testHighFrequencySmallWrites tests many small writes to the same file
func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) {
filename := "high_freq_writes.txt"
mountPath := filepath.Join(framework.GetMountPoint(), filename)
// Open file for writing
file, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
defer file.Close()
// Perform many small writes
numWrites := 1000
writeSize := 100
for i := 0; i < numWrites; i++ {
data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20)))
_, err := file.Write(data)
require.NoError(t, err)
}
file.Close()
// Verify file size
info, err := os.Stat(mountPath)
require.NoError(t, err)
assert.Equal(t, totalSize, info.Size())
}
// testManySmallFiles tests creating many small files
func testManySmallFiles(t *testing.T, framework *FuseTestFramework) {
testDir := "many_small_files"
framework.CreateTestDir(testDir)
numFiles := 500
var wg sync.WaitGroup
var mutex sync.Mutex
errors := make([]error, 0)
addError := func(err error) {
mutex.Lock()
defer mutex.Unlock()
errors = append(errors, err)
}
// Create files in batches
batchSize := 50
for batch := 0; batch < numFiles/batchSize; batch++ {
wg.Add(1)
go func(batchID int) {
defer wg.Done()
for i := 0; i < batchSize; i++ {
fileNum := batchID*batchSize + i
filename := filepath.Join(testDir, fmt.Sprintf("small_file_%04d.txt", fileNum))
content := []byte(fmt.Sprintf("File %d content", fileNum))
mountPath := filepath.Join(framework.GetMountPoint(), filename)
if err := os.WriteFile(mountPath, content, 0644); err != nil {
addError(fmt.Errorf("file %d: %v", fileNum, err))
return
}
}
}(batch)
}
wg.Wait()
require.Empty(t, errors, "Many small files creation failed: %v", errors)
// Verify directory listing
mountPath := filepath.Join(framework.GetMountPoint(), testDir)
entries, err := os.ReadDir(mountPath)
require.NoError(t, err)
assert.Equal(t, numFiles, len(entries))
}