mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Add IncrementalCooperativeAssignmentStrategy with two-phase rebalancing: * Revocation phase: Members give up partitions that need reassignment * Assignment phase: Members receive new partitions after revocation - Implement IncrementalRebalanceState to track rebalance progress: * Phase tracking (None, Revocation, Assignment) * Revocation timeout handling with configurable timeouts * Partition tracking for revoked and pending assignments - Add sophisticated assignment logic: * Respect member topic subscriptions when distributing partitions * Calculate ideal assignments and determine necessary revocations * Support multiple topics with different subscription patterns * Minimize partition movement while ensuring fairness - Add comprehensive test coverage: * Basic assignment scenarios without rebalancing * Rebalance scenarios with revocation and assignment phases * Multiple topic scenarios with mixed subscriptions * Timeout handling and forced completion * State transition verification - Update GetAssignmentStrategy to support 'incremental-cooperative' protocol - Implement monitoring methods: * IsRebalanceInProgress() for status checking * GetRebalanceState() for detailed state inspection * ForceCompleteRebalance() for timeout scenarios This enables advanced rebalancing that reduces 'stop-the-world' effects by allowing consumers to incrementally give up and receive partitions during rebalancing.
399 lines
11 KiB
Go
399 lines
11 KiB
Go
package consumer
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_BasicAssignment(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Create members
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{}, // No existing assignment
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{}, // No existing assignment
|
|
},
|
|
}
|
|
|
|
// Topic partitions
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2, 3},
|
|
}
|
|
|
|
// First assignment (no existing assignments, should be direct)
|
|
assignments := strategy.Assign(members, topicPartitions)
|
|
|
|
// Verify assignments
|
|
if len(assignments) != 2 {
|
|
t.Errorf("Expected 2 member assignments, got %d", len(assignments))
|
|
}
|
|
|
|
totalPartitions := 0
|
|
for memberID, partitions := range assignments {
|
|
t.Logf("Member %s assigned %d partitions: %v", memberID, len(partitions), partitions)
|
|
totalPartitions += len(partitions)
|
|
}
|
|
|
|
if totalPartitions != 4 {
|
|
t.Errorf("Expected 4 total partitions assigned, got %d", totalPartitions)
|
|
}
|
|
|
|
// Should not be in rebalance state for initial assignment
|
|
if strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected no rebalance in progress for initial assignment")
|
|
}
|
|
}
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_RebalanceWithRevocation(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Create members with existing assignments
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 0},
|
|
{Topic: "topic-1", Partition: 1},
|
|
{Topic: "topic-1", Partition: 2},
|
|
{Topic: "topic-1", Partition: 3}, // This member has all partitions
|
|
},
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{}, // New member with no assignments
|
|
},
|
|
}
|
|
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2, 3},
|
|
}
|
|
|
|
// First call should start revocation phase
|
|
assignments1 := strategy.Assign(members, topicPartitions)
|
|
|
|
// Should be in revocation phase
|
|
if !strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected rebalance to be in progress")
|
|
}
|
|
|
|
state := strategy.GetRebalanceState()
|
|
if state.Phase != RebalancePhaseRevocation {
|
|
t.Errorf("Expected revocation phase, got %s", state.Phase)
|
|
}
|
|
|
|
// Member-1 should have some partitions revoked
|
|
member1Assignments := assignments1["member-1"]
|
|
if len(member1Assignments) >= 4 {
|
|
t.Errorf("Expected member-1 to have fewer than 4 partitions after revocation, got %d", len(member1Assignments))
|
|
}
|
|
|
|
// Member-2 should still have no assignments during revocation
|
|
member2Assignments := assignments1["member-2"]
|
|
if len(member2Assignments) != 0 {
|
|
t.Errorf("Expected member-2 to have 0 partitions during revocation, got %d", len(member2Assignments))
|
|
}
|
|
|
|
t.Logf("Revocation phase - Member-1: %d partitions, Member-2: %d partitions",
|
|
len(member1Assignments), len(member2Assignments))
|
|
|
|
// Simulate time passing and second call (should move to assignment phase)
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Force move to assignment phase by setting timeout to 0
|
|
state.RevocationTimeout = 0
|
|
|
|
assignments2 := strategy.Assign(members, topicPartitions)
|
|
|
|
// Should complete rebalance
|
|
if strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected rebalance to be completed")
|
|
}
|
|
|
|
// Both members should have partitions now
|
|
member1FinalAssignments := assignments2["member-1"]
|
|
member2FinalAssignments := assignments2["member-2"]
|
|
|
|
if len(member1FinalAssignments) == 0 {
|
|
t.Error("Expected member-1 to have some partitions after rebalance")
|
|
}
|
|
|
|
if len(member2FinalAssignments) == 0 {
|
|
t.Error("Expected member-2 to have some partitions after rebalance")
|
|
}
|
|
|
|
totalFinalPartitions := len(member1FinalAssignments) + len(member2FinalAssignments)
|
|
if totalFinalPartitions != 4 {
|
|
t.Errorf("Expected 4 total partitions after rebalance, got %d", totalFinalPartitions)
|
|
}
|
|
|
|
t.Logf("Final assignment - Member-1: %d partitions, Member-2: %d partitions",
|
|
len(member1FinalAssignments), len(member2FinalAssignments))
|
|
}
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_NoRevocationNeeded(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Create members with already balanced assignments
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 0},
|
|
{Topic: "topic-1", Partition: 1},
|
|
},
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 2},
|
|
{Topic: "topic-1", Partition: 3},
|
|
},
|
|
},
|
|
}
|
|
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2, 3},
|
|
}
|
|
|
|
// Assignment should not trigger rebalance
|
|
assignments := strategy.Assign(members, topicPartitions)
|
|
|
|
// Should not be in rebalance state
|
|
if strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected no rebalance in progress when assignments are already balanced")
|
|
}
|
|
|
|
// Assignments should remain the same
|
|
member1Assignments := assignments["member-1"]
|
|
member2Assignments := assignments["member-2"]
|
|
|
|
if len(member1Assignments) != 2 {
|
|
t.Errorf("Expected member-1 to keep 2 partitions, got %d", len(member1Assignments))
|
|
}
|
|
|
|
if len(member2Assignments) != 2 {
|
|
t.Errorf("Expected member-2 to keep 2 partitions, got %d", len(member2Assignments))
|
|
}
|
|
}
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_MultipleTopics(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Create members with mixed topic subscriptions
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1", "topic-2"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 0},
|
|
{Topic: "topic-1", Partition: 1},
|
|
{Topic: "topic-2", Partition: 0},
|
|
},
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 2},
|
|
},
|
|
},
|
|
{
|
|
ID: "member-3",
|
|
Subscription: []string{"topic-2"},
|
|
Assignment: []PartitionAssignment{}, // New member
|
|
},
|
|
}
|
|
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2},
|
|
"topic-2": {0, 1},
|
|
}
|
|
|
|
// Should trigger rebalance to distribute topic-2 partitions
|
|
assignments := strategy.Assign(members, topicPartitions)
|
|
|
|
// Verify all partitions are assigned
|
|
allAssignedPartitions := make(map[string]bool)
|
|
for _, memberAssignments := range assignments {
|
|
for _, assignment := range memberAssignments {
|
|
key := fmt.Sprintf("%s:%d", assignment.Topic, assignment.Partition)
|
|
allAssignedPartitions[key] = true
|
|
}
|
|
}
|
|
|
|
expectedPartitions := []string{"topic-1:0", "topic-1:1", "topic-1:2", "topic-2:0", "topic-2:1"}
|
|
for _, expected := range expectedPartitions {
|
|
if !allAssignedPartitions[expected] {
|
|
t.Errorf("Expected partition %s to be assigned", expected)
|
|
}
|
|
}
|
|
|
|
// Debug: Print all assigned partitions
|
|
t.Logf("All assigned partitions: %v", allAssignedPartitions)
|
|
}
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_ForceComplete(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Start a rebalance - create scenario where member-1 has all partitions but member-2 joins
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 0},
|
|
{Topic: "topic-1", Partition: 1},
|
|
{Topic: "topic-1", Partition: 2},
|
|
{Topic: "topic-1", Partition: 3},
|
|
},
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{}, // New member
|
|
},
|
|
}
|
|
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2, 3},
|
|
}
|
|
|
|
// This should start a rebalance (member-2 needs partitions)
|
|
strategy.Assign(members, topicPartitions)
|
|
|
|
if !strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected rebalance to be in progress")
|
|
}
|
|
|
|
// Force complete the rebalance
|
|
strategy.ForceCompleteRebalance()
|
|
|
|
if strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected rebalance to be completed after force complete")
|
|
}
|
|
|
|
state := strategy.GetRebalanceState()
|
|
if state.Phase != RebalancePhaseNone {
|
|
t.Errorf("Expected phase to be None after force complete, got %s", state.Phase)
|
|
}
|
|
}
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_RevocationTimeout(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Set a very short revocation timeout for testing
|
|
strategy.rebalanceState.RevocationTimeout = 1 * time.Millisecond
|
|
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 0},
|
|
{Topic: "topic-1", Partition: 1},
|
|
{Topic: "topic-1", Partition: 2},
|
|
{Topic: "topic-1", Partition: 3},
|
|
},
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{},
|
|
},
|
|
}
|
|
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2, 3},
|
|
}
|
|
|
|
// First call starts revocation
|
|
strategy.Assign(members, topicPartitions)
|
|
|
|
if !strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected rebalance to be in progress")
|
|
}
|
|
|
|
// Wait for timeout
|
|
time.Sleep(5 * time.Millisecond)
|
|
|
|
// Second call should complete due to timeout
|
|
assignments := strategy.Assign(members, topicPartitions)
|
|
|
|
if strategy.IsRebalanceInProgress() {
|
|
t.Error("Expected rebalance to be completed after timeout")
|
|
}
|
|
|
|
// Both members should have partitions
|
|
member1Assignments := assignments["member-1"]
|
|
member2Assignments := assignments["member-2"]
|
|
|
|
if len(member1Assignments) == 0 {
|
|
t.Error("Expected member-1 to have partitions after timeout")
|
|
}
|
|
|
|
if len(member2Assignments) == 0 {
|
|
t.Error("Expected member-2 to have partitions after timeout")
|
|
}
|
|
}
|
|
|
|
func TestIncrementalCooperativeAssignmentStrategy_StateTransitions(t *testing.T) {
|
|
strategy := NewIncrementalCooperativeAssignmentStrategy()
|
|
|
|
// Initial state should be None
|
|
state := strategy.GetRebalanceState()
|
|
if state.Phase != RebalancePhaseNone {
|
|
t.Errorf("Expected initial phase to be None, got %s", state.Phase)
|
|
}
|
|
|
|
// Create scenario that requires rebalancing
|
|
members := []*GroupMember{
|
|
{
|
|
ID: "member-1",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{
|
|
{Topic: "topic-1", Partition: 0},
|
|
{Topic: "topic-1", Partition: 1},
|
|
{Topic: "topic-1", Partition: 2},
|
|
{Topic: "topic-1", Partition: 3},
|
|
},
|
|
},
|
|
{
|
|
ID: "member-2",
|
|
Subscription: []string{"topic-1"},
|
|
Assignment: []PartitionAssignment{}, // New member
|
|
},
|
|
}
|
|
|
|
topicPartitions := map[string][]int32{
|
|
"topic-1": {0, 1, 2, 3}, // Same partitions, but need rebalancing due to new member
|
|
}
|
|
|
|
// First call should move to revocation phase
|
|
strategy.Assign(members, topicPartitions)
|
|
state = strategy.GetRebalanceState()
|
|
if state.Phase != RebalancePhaseRevocation {
|
|
t.Errorf("Expected phase to be Revocation, got %s", state.Phase)
|
|
}
|
|
|
|
// Force timeout to move to assignment phase
|
|
state.RevocationTimeout = 0
|
|
strategy.Assign(members, topicPartitions)
|
|
|
|
// Should complete and return to None
|
|
state = strategy.GetRebalanceState()
|
|
if state.Phase != RebalancePhaseNone {
|
|
t.Errorf("Expected phase to be None after completion, got %s", state.Phase)
|
|
}
|
|
}
|