mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
351 lines
10 KiB
Go
351 lines
10 KiB
Go
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
|
|
)
|
|
|
|
// TestConsumerGroups tests consumer group functionality
|
|
// This test requires SeaweedFS masters to be running and will skip if not available
|
|
func TestConsumerGroups(t *testing.T) {
|
|
gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
|
|
defer gateway.CleanupAndClose()
|
|
|
|
addr := gateway.StartAndWait()
|
|
|
|
t.Logf("Running consumer group tests with SMQ backend for offset persistence")
|
|
|
|
t.Run("BasicFunctionality", func(t *testing.T) {
|
|
testConsumerGroupBasicFunctionality(t, addr)
|
|
})
|
|
|
|
t.Run("OffsetCommitAndFetch", func(t *testing.T) {
|
|
testConsumerGroupOffsetCommitAndFetch(t, addr)
|
|
})
|
|
|
|
t.Run("Rebalancing", func(t *testing.T) {
|
|
testConsumerGroupRebalancing(t, addr)
|
|
})
|
|
}
|
|
|
|
func testConsumerGroupBasicFunctionality(t *testing.T, addr string) {
|
|
topicName := testutil.GenerateUniqueTopicName("consumer-group-basic")
|
|
groupID := testutil.GenerateUniqueGroupID("basic-group")
|
|
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Create topic and produce messages
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
messages := msgGen.GenerateStringMessages(9) // 3 messages per consumer
|
|
err = client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages")
|
|
|
|
// Test with multiple consumers in the same group
|
|
numConsumers := 3
|
|
handler := &ConsumerGroupHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, len(messages)),
|
|
ready: make(chan bool),
|
|
t: t,
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
consumerErrors := make(chan error, numConsumers)
|
|
|
|
for i := 0; i < numConsumers; i++ {
|
|
wg.Add(1)
|
|
go func(consumerID int) {
|
|
defer wg.Done()
|
|
|
|
consumerGroup, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
|
|
if err != nil {
|
|
consumerErrors <- fmt.Errorf("consumer %d: failed to create consumer group: %v", consumerID, err)
|
|
return
|
|
}
|
|
defer consumerGroup.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
err = consumerGroup.Consume(ctx, []string{topicName}, handler)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
consumerErrors <- fmt.Errorf("consumer %d: consumption error: %v", consumerID, err)
|
|
return
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for consumers to be ready
|
|
readyCount := 0
|
|
for readyCount < numConsumers {
|
|
select {
|
|
case <-handler.ready:
|
|
readyCount++
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Timeout waiting for consumers to be ready")
|
|
}
|
|
}
|
|
|
|
// Collect consumed messages
|
|
consumedMessages := make([]*sarama.ConsumerMessage, 0, len(messages))
|
|
messageTimeout := time.After(10 * time.Second)
|
|
|
|
for len(consumedMessages) < len(messages) {
|
|
select {
|
|
case msg := <-handler.messages:
|
|
consumedMessages = append(consumedMessages, msg)
|
|
case err := <-consumerErrors:
|
|
t.Fatalf("Consumer error: %v", err)
|
|
case <-messageTimeout:
|
|
t.Fatalf("Timeout waiting for messages. Got %d/%d messages", len(consumedMessages), len(messages))
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify all messages were consumed exactly once
|
|
testutil.AssertEqual(t, len(messages), len(consumedMessages), "Message count mismatch")
|
|
|
|
// Verify message uniqueness (no duplicates)
|
|
messageKeys := make(map[string]bool)
|
|
for _, msg := range consumedMessages {
|
|
key := string(msg.Key)
|
|
if messageKeys[key] {
|
|
t.Errorf("Duplicate message key: %s", key)
|
|
}
|
|
messageKeys[key] = true
|
|
}
|
|
}
|
|
|
|
func testConsumerGroupOffsetCommitAndFetch(t *testing.T, addr string) {
|
|
topicName := testutil.GenerateUniqueTopicName("offset-commit-test")
|
|
groupID := testutil.GenerateUniqueGroupID("offset-group")
|
|
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Create topic and produce messages
|
|
err := client.CreateTopic(topicName, 1, 1)
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
messages := msgGen.GenerateStringMessages(5)
|
|
err = client.ProduceMessages(topicName, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages")
|
|
|
|
// First consumer: consume first 3 messages and commit offsets
|
|
handler1 := &OffsetTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, len(messages)),
|
|
ready: make(chan bool),
|
|
stopAfter: 3,
|
|
t: t,
|
|
}
|
|
|
|
consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
|
|
testutil.AssertNoError(t, err, "Failed to create first consumer group")
|
|
|
|
ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel1()
|
|
|
|
go func() {
|
|
err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("First consumer error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for first consumer to be ready and consume messages
|
|
<-handler1.ready
|
|
consumedCount := 0
|
|
for consumedCount < 3 {
|
|
select {
|
|
case <-handler1.messages:
|
|
consumedCount++
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Timeout waiting for first consumer messages")
|
|
}
|
|
}
|
|
|
|
consumerGroup1.Close()
|
|
cancel1()
|
|
time.Sleep(500 * time.Millisecond) // Wait for cleanup
|
|
|
|
// Stop the first consumer after N messages
|
|
// Allow a brief moment for commit/heartbeat to flush
|
|
time.Sleep(1 * time.Second)
|
|
|
|
// Start a second consumer in the same group to verify resumption from committed offset
|
|
handler2 := &OffsetTestHandler{
|
|
messages: make(chan *sarama.ConsumerMessage, len(messages)),
|
|
ready: make(chan bool),
|
|
stopAfter: 2,
|
|
t: t,
|
|
}
|
|
consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
|
|
testutil.AssertNoError(t, err, "Failed to create second consumer group")
|
|
defer consumerGroup2.Close()
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel2()
|
|
|
|
go func() {
|
|
err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Logf("Second consumer error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for second consumer and collect remaining messages
|
|
<-handler2.ready
|
|
secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
|
|
consumedCount = 0
|
|
for consumedCount < 2 {
|
|
select {
|
|
case msg := <-handler2.messages:
|
|
consumedCount++
|
|
secondConsumerMessages = append(secondConsumerMessages, msg)
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Timeout waiting for second consumer messages. Got %d/2", consumedCount)
|
|
}
|
|
}
|
|
|
|
// Verify second consumer started from correct offset
|
|
if len(secondConsumerMessages) > 0 {
|
|
firstMessageOffset := secondConsumerMessages[0].Offset
|
|
if firstMessageOffset < 3 {
|
|
t.Fatalf("Second consumer should start from offset >= 3: got %d", firstMessageOffset)
|
|
}
|
|
}
|
|
}
|
|
|
|
func testConsumerGroupRebalancing(t *testing.T, addr string) {
|
|
topicName := testutil.GenerateUniqueTopicName("rebalancing-test")
|
|
groupID := testutil.GenerateUniqueGroupID("rebalance-group")
|
|
|
|
client := testutil.NewSaramaClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Create topic with multiple partitions for rebalancing
|
|
err := client.CreateTopic(topicName, 4, 1) // 4 partitions
|
|
testutil.AssertNoError(t, err, "Failed to create topic")
|
|
|
|
// Produce messages to all partitions
|
|
messages := msgGen.GenerateStringMessages(12) // 3 messages per partition
|
|
for i, msg := range messages {
|
|
partition := int32(i % 4)
|
|
err = client.ProduceMessageToPartition(topicName, partition, msg)
|
|
testutil.AssertNoError(t, err, "Failed to produce message")
|
|
}
|
|
|
|
t.Logf("Produced %d messages across 4 partitions", len(messages))
|
|
|
|
// Test scenario 1: Single consumer gets all partitions
|
|
t.Run("SingleConsumerAllPartitions", func(t *testing.T) {
|
|
testSingleConsumerAllPartitions(t, addr, topicName, groupID+"-single")
|
|
})
|
|
|
|
// Test scenario 2: Add second consumer, verify rebalancing
|
|
t.Run("TwoConsumersRebalance", func(t *testing.T) {
|
|
testTwoConsumersRebalance(t, addr, topicName, groupID+"-two")
|
|
})
|
|
|
|
// Test scenario 3: Remove consumer, verify rebalancing
|
|
t.Run("ConsumerLeaveRebalance", func(t *testing.T) {
|
|
testConsumerLeaveRebalance(t, addr, topicName, groupID+"-leave")
|
|
})
|
|
|
|
// Test scenario 4: Multiple consumers join simultaneously
|
|
t.Run("MultipleConsumersJoin", func(t *testing.T) {
|
|
testMultipleConsumersJoin(t, addr, topicName, groupID+"-multi")
|
|
})
|
|
}
|
|
|
|
// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
|
|
type ConsumerGroupHandler struct {
|
|
messages chan *sarama.ConsumerMessage
|
|
ready chan bool
|
|
readyOnce sync.Once
|
|
t *testing.T
|
|
}
|
|
|
|
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("Consumer group session setup")
|
|
h.readyOnce.Do(func() {
|
|
close(h.ready)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("Consumer group session cleanup")
|
|
return nil
|
|
}
|
|
|
|
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for {
|
|
select {
|
|
case message := <-claim.Messages():
|
|
if message == nil {
|
|
return nil
|
|
}
|
|
h.messages <- message
|
|
session.MarkMessage(message, "")
|
|
case <-session.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// OffsetTestHandler implements sarama.ConsumerGroupHandler for offset testing
|
|
type OffsetTestHandler struct {
|
|
messages chan *sarama.ConsumerMessage
|
|
ready chan bool
|
|
readyOnce sync.Once
|
|
stopAfter int
|
|
consumed int
|
|
t *testing.T
|
|
}
|
|
|
|
func (h *OffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("Offset test consumer setup")
|
|
h.readyOnce.Do(func() {
|
|
close(h.ready)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (h *OffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
|
|
h.t.Logf("Offset test consumer cleanup")
|
|
return nil
|
|
}
|
|
|
|
func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for {
|
|
select {
|
|
case message := <-claim.Messages():
|
|
if message == nil {
|
|
return nil
|
|
}
|
|
h.consumed++
|
|
h.messages <- message
|
|
session.MarkMessage(message, "")
|
|
|
|
// Stop after consuming the specified number of messages
|
|
if h.consumed >= h.stopAfter {
|
|
h.t.Logf("Stopping consumer after %d messages", h.consumed)
|
|
// Ensure commits are flushed before exiting the claim
|
|
session.Commit()
|
|
return nil
|
|
}
|
|
case <-session.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|