mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Implement comprehensive rebalancing tests with multiple scenarios: * Single consumer gets all partitions * Two consumers rebalance when second joins * Consumer leave triggers rebalancing * Multiple consumers join simultaneously - Add cooperative-sticky assignment strategy with: * Sticky behavior to minimize partition movement * Fair distribution respecting target counts * Support for multiple topics and partial subscriptions * Comprehensive test coverage for all scenarios - Implement advanced rebalance timeout handling: * RebalanceTimeoutManager for sophisticated timeout logic * Member eviction based on rebalance and session timeouts * Leader eviction and automatic leader selection * Stuck rebalance detection and forced completion * Detailed rebalance status reporting with member timeout info - Add ProduceMessageToPartition method for partition-specific testing - All new functionality includes comprehensive test coverage Consumer group robustness significantly improved with production-ready timeout handling and assignment strategies.
331 lines
9.5 KiB
Go
331 lines
9.5 KiB
Go
package consumer
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestRebalanceTimeoutManager_CheckRebalanceTimeouts(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Create a group with a member that has a short rebalance timeout
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
|
|
member := &GroupMember{
|
|
ID: "member1",
|
|
ClientID: "client1",
|
|
SessionTimeout: 30000, // 30 seconds
|
|
RebalanceTimeout: 1000, // 1 second (very short for testing)
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now().Add(-2 * time.Second), // Joined 2 seconds ago
|
|
}
|
|
group.Members["member1"] = member
|
|
group.Mu.Unlock()
|
|
|
|
// Check timeouts - member should be evicted
|
|
rtm.CheckRebalanceTimeouts()
|
|
|
|
group.Mu.RLock()
|
|
if len(group.Members) != 0 {
|
|
t.Errorf("Expected member to be evicted due to rebalance timeout, but %d members remain", len(group.Members))
|
|
}
|
|
|
|
if group.State != GroupStateEmpty {
|
|
t.Errorf("Expected group state to be Empty after member eviction, got %s", group.State.String())
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestRebalanceTimeoutManager_SessionTimeoutFallback(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Create a group with a member that has exceeded session timeout
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
|
|
member := &GroupMember{
|
|
ID: "member1",
|
|
ClientID: "client1",
|
|
SessionTimeout: 1000, // 1 second
|
|
RebalanceTimeout: 30000, // 30 seconds
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now().Add(-2 * time.Second), // Last heartbeat 2 seconds ago
|
|
JoinedAt: time.Now(),
|
|
}
|
|
group.Members["member1"] = member
|
|
group.Mu.Unlock()
|
|
|
|
// Check timeouts - member should be evicted due to session timeout
|
|
rtm.CheckRebalanceTimeouts()
|
|
|
|
group.Mu.RLock()
|
|
if len(group.Members) != 0 {
|
|
t.Errorf("Expected member to be evicted due to session timeout, but %d members remain", len(group.Members))
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Create a group with leader and another member
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
group.Leader = "member1"
|
|
|
|
// Leader with expired rebalance timeout
|
|
leader := &GroupMember{
|
|
ID: "member1",
|
|
ClientID: "client1",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 1000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now().Add(-2 * time.Second),
|
|
}
|
|
group.Members["member1"] = leader
|
|
|
|
// Another member that's still valid
|
|
member2 := &GroupMember{
|
|
ID: "member2",
|
|
ClientID: "client2",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
group.Members["member2"] = member2
|
|
group.Mu.Unlock()
|
|
|
|
// Check timeouts - leader should be evicted, new leader selected
|
|
rtm.CheckRebalanceTimeouts()
|
|
|
|
group.Mu.RLock()
|
|
if len(group.Members) != 1 {
|
|
t.Errorf("Expected 1 member to remain after leader eviction, got %d", len(group.Members))
|
|
}
|
|
|
|
if group.Leader != "member2" {
|
|
t.Errorf("Expected member2 to become new leader, got %s", group.Leader)
|
|
}
|
|
|
|
if group.State != GroupStatePreparingRebalance {
|
|
t.Errorf("Expected group to restart rebalancing after leader eviction, got %s", group.State.String())
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestRebalanceTimeoutManager_IsRebalanceStuck(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Create a group that's been rebalancing for a while
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
group.LastActivity = time.Now().Add(-15 * time.Minute) // 15 minutes ago
|
|
group.Mu.Unlock()
|
|
|
|
// Check if rebalance is stuck (max 10 minutes)
|
|
maxDuration := 10 * time.Minute
|
|
if !rtm.IsRebalanceStuck(group, maxDuration) {
|
|
t.Error("Expected rebalance to be detected as stuck")
|
|
}
|
|
|
|
// Test with a group that's not stuck
|
|
group.Mu.Lock()
|
|
group.LastActivity = time.Now().Add(-5 * time.Minute) // 5 minutes ago
|
|
group.Mu.Unlock()
|
|
|
|
if rtm.IsRebalanceStuck(group, maxDuration) {
|
|
t.Error("Expected rebalance to not be detected as stuck")
|
|
}
|
|
|
|
// Test with stable group (should not be stuck)
|
|
group.Mu.Lock()
|
|
group.State = GroupStateStable
|
|
group.LastActivity = time.Now().Add(-15 * time.Minute)
|
|
group.Mu.Unlock()
|
|
|
|
if rtm.IsRebalanceStuck(group, maxDuration) {
|
|
t.Error("Stable group should not be detected as stuck")
|
|
}
|
|
}
|
|
|
|
func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Test forcing completion from PreparingRebalance
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
|
|
member := &GroupMember{
|
|
ID: "member1",
|
|
State: MemberStatePending,
|
|
}
|
|
group.Members["member1"] = member
|
|
group.Mu.Unlock()
|
|
|
|
rtm.ForceCompleteRebalance(group)
|
|
|
|
group.Mu.RLock()
|
|
if group.State != GroupStateCompletingRebalance {
|
|
t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String())
|
|
}
|
|
group.Mu.RUnlock()
|
|
|
|
// Test forcing completion from CompletingRebalance
|
|
rtm.ForceCompleteRebalance(group)
|
|
|
|
group.Mu.RLock()
|
|
if group.State != GroupStateStable {
|
|
t.Errorf("Expected group state to be Stable, got %s", group.State.String())
|
|
}
|
|
|
|
if member.State != MemberStateStable {
|
|
t.Errorf("Expected member state to be Stable, got %s", member.State.String())
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|
|
|
|
func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Test with non-existent group
|
|
status := rtm.GetRebalanceStatus("non-existent")
|
|
if status != nil {
|
|
t.Error("Expected nil status for non-existent group")
|
|
}
|
|
|
|
// Create a group with members
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
group.Generation = 5
|
|
group.Leader = "member1"
|
|
group.LastActivity = time.Now().Add(-2 * time.Minute)
|
|
|
|
member1 := &GroupMember{
|
|
ID: "member1",
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now().Add(-30 * time.Second),
|
|
JoinedAt: time.Now().Add(-2 * time.Minute),
|
|
SessionTimeout: 30000, // 30 seconds
|
|
RebalanceTimeout: 300000, // 5 minutes
|
|
}
|
|
group.Members["member1"] = member1
|
|
|
|
member2 := &GroupMember{
|
|
ID: "member2",
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now().Add(-10 * time.Second),
|
|
JoinedAt: time.Now().Add(-1 * time.Minute),
|
|
SessionTimeout: 60000, // 1 minute
|
|
RebalanceTimeout: 180000, // 3 minutes
|
|
}
|
|
group.Members["member2"] = member2
|
|
group.Mu.Unlock()
|
|
|
|
// Get status
|
|
status = rtm.GetRebalanceStatus("test-group")
|
|
|
|
if status == nil {
|
|
t.Fatal("Expected non-nil status")
|
|
}
|
|
|
|
if status.GroupID != "test-group" {
|
|
t.Errorf("Expected group ID 'test-group', got %s", status.GroupID)
|
|
}
|
|
|
|
if status.State != GroupStatePreparingRebalance {
|
|
t.Errorf("Expected state PreparingRebalance, got %s", status.State.String())
|
|
}
|
|
|
|
if status.Generation != 5 {
|
|
t.Errorf("Expected generation 5, got %d", status.Generation)
|
|
}
|
|
|
|
if status.MemberCount != 2 {
|
|
t.Errorf("Expected 2 members, got %d", status.MemberCount)
|
|
}
|
|
|
|
if status.Leader != "member1" {
|
|
t.Errorf("Expected leader 'member1', got %s", status.Leader)
|
|
}
|
|
|
|
if !status.IsRebalancing {
|
|
t.Error("Expected IsRebalancing to be true")
|
|
}
|
|
|
|
if len(status.Members) != 2 {
|
|
t.Errorf("Expected 2 member statuses, got %d", len(status.Members))
|
|
}
|
|
|
|
// Check member timeout calculations
|
|
for _, memberStatus := range status.Members {
|
|
if memberStatus.SessionTimeRemaining < 0 {
|
|
t.Errorf("Session time remaining should not be negative for member %s", memberStatus.MemberID)
|
|
}
|
|
|
|
if memberStatus.RebalanceTimeRemaining < 0 {
|
|
t.Errorf("Rebalance time remaining should not be negative for member %s", memberStatus.MemberID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRebalanceTimeoutManager_DefaultRebalanceTimeout(t *testing.T) {
|
|
coordinator := NewGroupCoordinator()
|
|
defer coordinator.Close()
|
|
|
|
rtm := coordinator.rebalanceTimeoutManager
|
|
|
|
// Create a group with a member that has no rebalance timeout set (0)
|
|
group := coordinator.GetOrCreateGroup("test-group")
|
|
group.Mu.Lock()
|
|
group.State = GroupStatePreparingRebalance
|
|
|
|
member := &GroupMember{
|
|
ID: "member1",
|
|
ClientID: "client1",
|
|
SessionTimeout: 30000, // 30 seconds
|
|
RebalanceTimeout: 0, // Not set, should use default
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now().Add(-6 * time.Minute), // Joined 6 minutes ago
|
|
}
|
|
group.Members["member1"] = member
|
|
group.Mu.Unlock()
|
|
|
|
// Default rebalance timeout is 5 minutes (300000ms), so member should be evicted
|
|
rtm.CheckRebalanceTimeouts()
|
|
|
|
group.Mu.RLock()
|
|
if len(group.Members) != 0 {
|
|
t.Errorf("Expected member to be evicted using default rebalance timeout, but %d members remain", len(group.Members))
|
|
}
|
|
group.Mu.RUnlock()
|
|
}
|