mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-08 20:32:47 +02:00
* volume assginment concurrency * accurate tests * ensure uniqness * reserve atomically * address comments * atomic * ReserveOneVolumeForReservation * duplicated * Update weed/topology/node.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/topology/node.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * atomic counter * dedup * select the appropriate functions based on the useReservations flag --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
306 lines
10 KiB
Go
306 lines
10 KiB
Go
package topology
|
||
|
||
import (
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||
)
|
||
|
||
// TestRaceConditionStress simulates the original issue scenario:
|
||
// High concurrent writes causing capacity misjudgment
|
||
func TestRaceConditionStress(t *testing.T) {
|
||
// Create a cluster similar to the issue description:
|
||
// 3 volume servers, 200GB each, 5GB volume limit = 40 volumes max per server
|
||
const (
|
||
numServers = 3
|
||
volumeLimitMB = 5000 // 5GB in MB
|
||
storagePerServerGB = 200 // 200GB per server
|
||
maxVolumesPerServer = storagePerServerGB * 1024 / volumeLimitMB // 200*1024/5000 = 40
|
||
concurrentRequests = 50 // High concurrency like the issue
|
||
)
|
||
|
||
// Create test topology
|
||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), uint64(volumeLimitMB)*1024*1024, 5, false)
|
||
|
||
dc := NewDataCenter("dc1")
|
||
topo.LinkChildNode(dc)
|
||
rack := NewRack("rack1")
|
||
dc.LinkChildNode(rack)
|
||
|
||
// Create 3 volume servers with realistic capacity
|
||
servers := make([]*DataNode, numServers)
|
||
for i := 0; i < numServers; i++ {
|
||
dn := NewDataNode(fmt.Sprintf("server%d", i+1))
|
||
rack.LinkChildNode(dn)
|
||
|
||
// Set up disk with capacity for 40 volumes
|
||
disk := NewDisk(types.HardDriveType.String())
|
||
disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = maxVolumesPerServer
|
||
dn.LinkChildNode(disk)
|
||
|
||
servers[i] = dn
|
||
}
|
||
|
||
vg := NewDefaultVolumeGrowth()
|
||
rp, _ := super_block.NewReplicaPlacementFromString("000") // Single replica like the issue
|
||
|
||
option := &VolumeGrowOption{
|
||
Collection: "test-bucket-large", // Same collection name as issue
|
||
ReplicaPlacement: rp,
|
||
DiskType: types.HardDriveType,
|
||
}
|
||
|
||
// Track results
|
||
var successfulAllocations int64
|
||
var failedAllocations int64
|
||
var totalVolumesCreated int64
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// Launch concurrent volume creation requests
|
||
startTime := time.Now()
|
||
for i := 0; i < concurrentRequests; i++ {
|
||
wg.Add(1)
|
||
go func(requestId int) {
|
||
defer wg.Done()
|
||
|
||
// This is the critical test: multiple threads trying to allocate simultaneously
|
||
servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
|
||
|
||
if err != nil {
|
||
atomic.AddInt64(&failedAllocations, 1)
|
||
t.Logf("Request %d failed: %v", requestId, err)
|
||
return
|
||
}
|
||
|
||
// Simulate volume creation delay (like in real scenario)
|
||
time.Sleep(time.Millisecond * 50)
|
||
|
||
// Simulate successful volume creation
|
||
for _, server := range servers {
|
||
disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
|
||
deltaDiskUsage := &DiskUsageCounts{
|
||
volumeCount: 1,
|
||
}
|
||
disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
|
||
atomic.AddInt64(&totalVolumesCreated, 1)
|
||
}
|
||
|
||
// Release reservations (simulates successful registration)
|
||
reservation.releaseAllReservations()
|
||
atomic.AddInt64(&successfulAllocations, 1)
|
||
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
duration := time.Since(startTime)
|
||
|
||
// Verify results
|
||
t.Logf("Test completed in %v", duration)
|
||
t.Logf("Successful allocations: %d", successfulAllocations)
|
||
t.Logf("Failed allocations: %d", failedAllocations)
|
||
t.Logf("Total volumes created: %d", totalVolumesCreated)
|
||
|
||
// Check capacity limits are respected
|
||
totalCapacityUsed := int64(0)
|
||
for i, server := range servers {
|
||
disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
|
||
volumeCount := disk.diskUsages.getOrCreateDisk(types.HardDriveType).volumeCount
|
||
totalCapacityUsed += volumeCount
|
||
|
||
t.Logf("Server %d: %d volumes (max: %d)", i+1, volumeCount, maxVolumesPerServer)
|
||
|
||
// Critical test: No server should exceed its capacity
|
||
if volumeCount > maxVolumesPerServer {
|
||
t.Errorf("RACE CONDITION DETECTED: Server %d exceeded capacity: %d > %d",
|
||
i+1, volumeCount, maxVolumesPerServer)
|
||
}
|
||
}
|
||
|
||
// Verify totals make sense
|
||
if totalVolumesCreated != totalCapacityUsed {
|
||
t.Errorf("Volume count mismatch: created=%d, actual=%d", totalVolumesCreated, totalCapacityUsed)
|
||
}
|
||
|
||
// The total should never exceed the cluster capacity (120 volumes for 3 servers × 40 each)
|
||
maxClusterCapacity := int64(numServers * maxVolumesPerServer)
|
||
if totalCapacityUsed > maxClusterCapacity {
|
||
t.Errorf("RACE CONDITION DETECTED: Cluster capacity exceeded: %d > %d",
|
||
totalCapacityUsed, maxClusterCapacity)
|
||
}
|
||
|
||
// With reservations, we should have controlled allocation
|
||
// Total requests = successful + failed should equal concurrentRequests
|
||
if successfulAllocations+failedAllocations != concurrentRequests {
|
||
t.Errorf("Request count mismatch: success=%d + failed=%d != total=%d",
|
||
successfulAllocations, failedAllocations, concurrentRequests)
|
||
}
|
||
|
||
t.Logf("✅ Race condition test passed: Capacity limits respected with %d concurrent requests",
|
||
concurrentRequests)
|
||
}
|
||
|
||
// TestCapacityJudgmentAccuracy verifies that the capacity calculation is accurate
|
||
// under various load conditions
|
||
func TestCapacityJudgmentAccuracy(t *testing.T) {
|
||
// Create a single server with known capacity
|
||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 5*1024*1024*1024, 5, false)
|
||
|
||
dc := NewDataCenter("dc1")
|
||
topo.LinkChildNode(dc)
|
||
rack := NewRack("rack1")
|
||
dc.LinkChildNode(rack)
|
||
|
||
dn := NewDataNode("server1")
|
||
rack.LinkChildNode(dn)
|
||
|
||
// Server with capacity for exactly 10 volumes
|
||
disk := NewDisk(types.HardDriveType.String())
|
||
diskUsage := disk.diskUsages.getOrCreateDisk(types.HardDriveType)
|
||
diskUsage.maxVolumeCount = 10
|
||
dn.LinkChildNode(disk)
|
||
|
||
// Also set max volume count on the DataNode level (gets propagated up)
|
||
dn.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 10
|
||
|
||
vg := NewDefaultVolumeGrowth()
|
||
rp, _ := super_block.NewReplicaPlacementFromString("000")
|
||
|
||
option := &VolumeGrowOption{
|
||
Collection: "test",
|
||
ReplicaPlacement: rp,
|
||
DiskType: types.HardDriveType,
|
||
}
|
||
|
||
// Test accurate capacity reporting at each step
|
||
for i := 0; i < 10; i++ {
|
||
// Check available space before reservation
|
||
availableBefore := dn.AvailableSpaceFor(option)
|
||
availableForReservation := dn.AvailableSpaceForReservation(option)
|
||
|
||
expectedAvailable := int64(10 - i)
|
||
if availableBefore != expectedAvailable {
|
||
t.Errorf("Step %d: Expected %d available, got %d", i, expectedAvailable, availableBefore)
|
||
}
|
||
|
||
if availableForReservation != expectedAvailable {
|
||
t.Errorf("Step %d: Expected %d available for reservation, got %d", i, expectedAvailable, availableForReservation)
|
||
}
|
||
|
||
// Try to reserve and allocate
|
||
_, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
|
||
if err != nil {
|
||
t.Fatalf("Step %d: Unexpected reservation failure: %v", i, err)
|
||
}
|
||
|
||
// Check that available space for reservation decreased
|
||
availableAfterReservation := dn.AvailableSpaceForReservation(option)
|
||
if availableAfterReservation != expectedAvailable-1 {
|
||
t.Errorf("Step %d: Expected %d available after reservation, got %d",
|
||
i, expectedAvailable-1, availableAfterReservation)
|
||
}
|
||
|
||
// Simulate successful volume creation by properly updating disk usage hierarchy
|
||
disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
|
||
|
||
// Create a volume usage delta to simulate volume creation
|
||
deltaDiskUsage := &DiskUsageCounts{
|
||
volumeCount: 1,
|
||
}
|
||
|
||
// Properly propagate the usage up the hierarchy
|
||
disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
|
||
|
||
// Debug: Check the volume count after update
|
||
diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
|
||
currentVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
|
||
t.Logf("Step %d: Volume count after update: %d", i, currentVolumeCount)
|
||
|
||
// Release reservation
|
||
reservation.releaseAllReservations()
|
||
|
||
// Verify final state
|
||
availableAfter := dn.AvailableSpaceFor(option)
|
||
expectedAfter := int64(10 - i - 1)
|
||
if availableAfter != expectedAfter {
|
||
t.Errorf("Step %d: Expected %d available after creation, got %d",
|
||
i, expectedAfter, availableAfter)
|
||
// More debugging
|
||
diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
|
||
maxVolumes := atomic.LoadInt64(&diskUsageOnNode.maxVolumeCount)
|
||
remoteVolumes := atomic.LoadInt64(&diskUsageOnNode.remoteVolumeCount)
|
||
actualVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
|
||
t.Logf("Debug Step %d: max=%d, volume=%d, remote=%d", i, maxVolumes, actualVolumeCount, remoteVolumes)
|
||
}
|
||
}
|
||
|
||
// At this point, no more reservations should succeed
|
||
_, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
|
||
if err == nil {
|
||
t.Error("Expected reservation to fail when at capacity")
|
||
}
|
||
|
||
t.Logf("✅ Capacity judgment accuracy test passed")
|
||
}
|
||
|
||
// TestReservationSystemPerformance measures the performance impact of reservations
|
||
func TestReservationSystemPerformance(t *testing.T) {
|
||
// Create topology
|
||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||
|
||
dc := NewDataCenter("dc1")
|
||
topo.LinkChildNode(dc)
|
||
rack := NewRack("rack1")
|
||
dc.LinkChildNode(rack)
|
||
|
||
dn := NewDataNode("server1")
|
||
rack.LinkChildNode(dn)
|
||
|
||
disk := NewDisk(types.HardDriveType.String())
|
||
disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 1000
|
||
dn.LinkChildNode(disk)
|
||
|
||
vg := NewDefaultVolumeGrowth()
|
||
rp, _ := super_block.NewReplicaPlacementFromString("000")
|
||
|
||
option := &VolumeGrowOption{
|
||
Collection: "test",
|
||
ReplicaPlacement: rp,
|
||
DiskType: types.HardDriveType,
|
||
}
|
||
|
||
// Benchmark reservation operations
|
||
const iterations = 1000
|
||
|
||
startTime := time.Now()
|
||
for i := 0; i < iterations; i++ {
|
||
_, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
|
||
if err != nil {
|
||
t.Fatalf("Iteration %d failed: %v", i, err)
|
||
}
|
||
reservation.releaseAllReservations()
|
||
|
||
// Simulate volume creation
|
||
diskUsage := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
|
||
atomic.AddInt64(&diskUsage.volumeCount, 1)
|
||
}
|
||
duration := time.Since(startTime)
|
||
|
||
avgDuration := duration / iterations
|
||
t.Logf("Performance: %d reservations in %v (avg: %v per reservation)",
|
||
iterations, duration, avgDuration)
|
||
|
||
// Performance should be reasonable (less than 1ms per reservation on average)
|
||
if avgDuration > time.Millisecond {
|
||
t.Errorf("Reservation system performance concern: %v per reservation", avgDuration)
|
||
} else {
|
||
t.Logf("✅ Performance test passed: %v per reservation", avgDuration)
|
||
}
|
||
}
|