1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/offset_management_test.go
2025-09-13 13:03:51 -07:00

589 lines
16 KiB
Go

package protocol
import (
"encoding/binary"
"net"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
func TestHandler_handleOffsetCommit(t *testing.T) {
h := NewTestHandler()
defer h.Close()
// Create a consumer group with a stable member
group := h.groupCoordinator.GetOrCreateGroup("test-group")
group.Mu.Lock()
group.State = consumer.GroupStateStable
group.Generation = 1
group.Members["member1"] = &consumer.GroupMember{
ID: "member1",
State: consumer.MemberStateStable,
Assignment: []consumer.PartitionAssignment{
{Topic: "test-topic", Partition: 0},
},
}
group.Mu.Unlock()
// Create a basic offset commit request
requestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
correlationID := uint32(123)
response, err := h.handleOffsetCommit(correlationID, requestBody)
if err != nil {
t.Fatalf("handleOffsetCommit failed: %v", err)
}
if len(response) < 8 {
t.Fatalf("response too short: %d bytes", len(response))
}
// Check correlation ID in response
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
}
// Verify offset was committed
group.Mu.RLock()
if group.OffsetCommits == nil || group.OffsetCommits["test-topic"] == nil {
t.Error("offset commit was not stored")
} else {
commit, exists := group.OffsetCommits["test-topic"][0]
if !exists {
t.Error("offset commit for partition 0 was not stored")
} else if commit.Offset != 0 {
t.Errorf("expected offset 0, got %d", commit.Offset)
}
}
group.Mu.RUnlock()
}
func TestHandler_handleOffsetCommit_InvalidGroup(t *testing.T) {
h := NewTestHandler()
defer h.Close()
// Request for non-existent group
requestBody := createOffsetCommitRequestBody("nonexistent-group", 1, "member1")
correlationID := uint32(124)
response, err := h.handleOffsetCommit(correlationID, requestBody)
if err != nil {
t.Fatalf("handleOffsetCommit failed: %v", err)
}
// Should get error response
if len(response) < 8 {
t.Fatalf("error response too short: %d bytes", len(response))
}
// Response should have correlation ID
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
}
}
func TestHandler_handleOffsetCommit_WrongGeneration(t *testing.T) {
h := NewTestHandler()
defer h.Close()
// Create a consumer group with generation 2
group := h.groupCoordinator.GetOrCreateGroup("test-group")
group.Mu.Lock()
group.State = consumer.GroupStateStable
group.Generation = 2
group.Members["member1"] = &consumer.GroupMember{
ID: "member1",
State: consumer.MemberStateStable,
Assignment: []consumer.PartitionAssignment{
{Topic: "test-topic", Partition: 0},
},
}
group.Mu.Unlock()
// Request with wrong generation (1 instead of 2)
requestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
correlationID := uint32(125)
response, err := h.handleOffsetCommit(correlationID, requestBody)
if err != nil {
t.Fatalf("handleOffsetCommit failed: %v", err)
}
if len(response) < 8 {
t.Fatalf("response too short: %d bytes", len(response))
}
// Verify no offset was committed due to generation mismatch
group.Mu.RLock()
if group.OffsetCommits != nil && group.OffsetCommits["test-topic"] != nil {
if _, exists := group.OffsetCommits["test-topic"][0]; exists {
t.Error("offset should not have been committed with wrong generation")
}
}
group.Mu.RUnlock()
}
func TestHandler_handleOffsetFetch(t *testing.T) {
h := NewTestHandler()
defer h.Close()
// Create a consumer group with committed offsets
group := h.groupCoordinator.GetOrCreateGroup("test-group")
group.Mu.Lock()
group.State = consumer.GroupStateStable
group.Generation = 1
// Pre-populate with committed offset
group.OffsetCommits = map[string]map[int32]consumer.OffsetCommit{
"test-topic": {
0: {
Offset: 42,
Metadata: "test-metadata",
Timestamp: time.Now(),
},
},
}
group.Mu.Unlock()
// Create a basic offset fetch request
requestBody := createOffsetFetchRequestBody("test-group")
correlationID := uint32(126)
response, err := h.handleOffsetFetch(correlationID, 2, requestBody)
if err != nil {
t.Fatalf("handleOffsetFetch failed: %v", err)
}
if len(response) < 8 {
t.Fatalf("response too short: %d bytes", len(response))
}
// Check correlation ID in response
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
}
}
func TestHandler_handleOffsetFetch_NoCommittedOffset(t *testing.T) {
h := NewTestHandler()
defer h.Close()
// Create a consumer group without committed offsets
group := h.groupCoordinator.GetOrCreateGroup("test-group")
group.Mu.Lock()
group.State = consumer.GroupStateStable
group.Generation = 1
// No offset commits
group.Mu.Unlock()
requestBody := createOffsetFetchRequestBody("test-group")
correlationID := uint32(127)
response, err := h.handleOffsetFetch(correlationID, 2, requestBody)
if err != nil {
t.Fatalf("handleOffsetFetch failed: %v", err)
}
if len(response) < 8 {
t.Fatalf("response too short: %d bytes", len(response))
}
// Should get valid response even with no committed offsets
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("expected correlation ID %d, got %d", correlationID, respCorrelationID)
}
}
func TestHandler_commitOffset(t *testing.T) {
h := NewTestHandler()
defer h.Close()
group := &consumer.ConsumerGroup{
ID: "test-group",
OffsetCommits: nil,
}
// Test committing an offset
err := h.commitOffset(group, "test-topic", 0, 100, "test-metadata")
if err != nil {
t.Fatalf("commitOffset failed: %v", err)
}
// Verify offset was stored
if group.OffsetCommits == nil {
t.Fatal("OffsetCommits map was not initialized")
}
topicOffsets, exists := group.OffsetCommits["test-topic"]
if !exists {
t.Fatal("topic offsets not found")
}
commit, exists := topicOffsets[0]
if !exists {
t.Fatal("partition offset not found")
}
if commit.Offset != 100 {
t.Errorf("expected offset 100, got %d", commit.Offset)
}
if commit.Metadata != "test-metadata" {
t.Errorf("expected metadata 'test-metadata', got '%s'", commit.Metadata)
}
// Test updating existing offset
err = h.commitOffset(group, "test-topic", 0, 200, "updated-metadata")
if err != nil {
t.Fatalf("commitOffset update failed: %v", err)
}
updatedCommit := group.OffsetCommits["test-topic"][0]
if updatedCommit.Offset != 200 {
t.Errorf("expected updated offset 200, got %d", updatedCommit.Offset)
}
if updatedCommit.Metadata != "updated-metadata" {
t.Errorf("expected updated metadata 'updated-metadata', got '%s'", updatedCommit.Metadata)
}
}
func TestHandler_fetchOffset(t *testing.T) {
h := NewTestHandler()
defer h.Close()
// Test fetching from empty group
emptyGroup := &consumer.ConsumerGroup{
ID: "empty-group",
OffsetCommits: nil,
}
offset, metadata, err := h.fetchOffset(emptyGroup, "test-topic", 0)
if err != nil {
t.Errorf("fetchOffset should not error on empty group: %v", err)
}
if offset != -1 {
t.Errorf("expected offset -1 for empty group, got %d", offset)
}
if metadata != "" {
t.Errorf("expected empty metadata for empty group, got '%s'", metadata)
}
// Test fetching from group with committed offsets
group := &consumer.ConsumerGroup{
ID: "test-group",
OffsetCommits: map[string]map[int32]consumer.OffsetCommit{
"test-topic": {
0: {
Offset: 42,
Metadata: "test-metadata",
Timestamp: time.Now(),
},
},
},
}
offset, metadata, err = h.fetchOffset(group, "test-topic", 0)
if err != nil {
t.Errorf("fetchOffset failed: %v", err)
}
if offset != 42 {
t.Errorf("expected offset 42, got %d", offset)
}
if metadata != "test-metadata" {
t.Errorf("expected metadata 'test-metadata', got '%s'", metadata)
}
// Test fetching non-existent partition
offset, metadata, err = h.fetchOffset(group, "test-topic", 1)
if err != nil {
t.Errorf("fetchOffset should not error on non-existent partition: %v", err)
}
if offset != -1 {
t.Errorf("expected offset -1 for non-existent partition, got %d", offset)
}
// Test fetching non-existent topic
offset, metadata, err = h.fetchOffset(group, "nonexistent-topic", 0)
if err != nil {
t.Errorf("fetchOffset should not error on non-existent topic: %v", err)
}
if offset != -1 {
t.Errorf("expected offset -1 for non-existent topic, got %d", offset)
}
}
func TestHandler_OffsetCommitFetch_EndToEnd(t *testing.T) {
// Create two handlers connected via pipe to simulate client-server
server := NewTestHandler()
defer server.Close()
client := NewTestHandler()
defer client.Close()
serverConn, clientConn := net.Pipe()
defer serverConn.Close()
defer clientConn.Close()
// Setup consumer group on server
group := server.groupCoordinator.GetOrCreateGroup("test-group")
group.Mu.Lock()
group.State = consumer.GroupStateStable
group.Generation = 1
group.Members["member1"] = &consumer.GroupMember{
ID: "member1",
State: consumer.MemberStateStable,
Assignment: []consumer.PartitionAssignment{
{Topic: "test-topic", Partition: 0},
},
}
group.Mu.Unlock()
// Test offset commit
commitRequestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
commitResponse, err := server.handleOffsetCommit(456, commitRequestBody)
if err != nil {
t.Fatalf("offset commit failed: %v", err)
}
if len(commitResponse) < 8 {
t.Fatalf("commit response too short: %d bytes", len(commitResponse))
}
// Test offset fetch
fetchRequestBody := createOffsetFetchRequestBody("test-group")
fetchResponse, err := server.handleOffsetFetch(457, 2, fetchRequestBody)
if err != nil {
t.Fatalf("offset fetch failed: %v", err)
}
if len(fetchResponse) < 8 {
t.Fatalf("fetch response too short: %d bytes", len(fetchResponse))
}
// Verify the committed offset is present
group.Mu.RLock()
if group.OffsetCommits == nil || group.OffsetCommits["test-topic"] == nil {
t.Error("offset commit was not stored")
} else {
commit, exists := group.OffsetCommits["test-topic"][0]
if !exists {
t.Error("offset commit for partition 0 was not found")
} else if commit.Offset != 0 {
t.Errorf("expected committed offset 0, got %d", commit.Offset)
}
}
group.Mu.RUnlock()
}
func TestHandler_parseOffsetCommitRequest(t *testing.T) {
h := NewTestHandler()
defer h.Close()
requestBody := createOffsetCommitRequestBody("test-group", 1, "member1")
request, err := h.parseOffsetCommitRequest(requestBody)
if err != nil {
t.Fatalf("parseOffsetCommitRequest failed: %v", err)
}
if request.GroupID != "test-group" {
t.Errorf("expected group ID 'test-group', got '%s'", request.GroupID)
}
if request.GenerationID != 1 {
t.Errorf("expected generation ID 1, got %d", request.GenerationID)
}
if request.MemberID != "member1" {
t.Errorf("expected member ID 'member1', got '%s'", request.MemberID)
}
}
func TestHandler_parseOffsetFetchRequest(t *testing.T) {
h := NewTestHandler()
defer h.Close()
requestBody := createOffsetFetchRequestBody("test-group")
request, err := h.parseOffsetFetchRequest(requestBody)
if err != nil {
t.Fatalf("parseOffsetFetchRequest failed: %v", err)
}
if request.GroupID != "test-group" {
t.Errorf("expected group ID 'test-group', got '%s'", request.GroupID)
}
if len(request.Topics) == 0 {
t.Error("expected at least one topic in request")
} else {
if request.Topics[0].Name != "test-topic" {
t.Errorf("expected topic name 'test-topic', got '%s'", request.Topics[0].Name)
}
}
}
func TestHandler_buildOffsetCommitResponse(t *testing.T) {
h := NewTestHandler()
defer h.Close()
response := OffsetCommitResponse{
CorrelationID: 123,
Topics: []OffsetCommitTopicResponse{
{
Name: "test-topic",
Partitions: []OffsetCommitPartitionResponse{
{Index: 0, ErrorCode: ErrorCodeNone},
{Index: 1, ErrorCode: ErrorCodeOffsetMetadataTooLarge},
},
},
},
}
responseBytes := h.buildOffsetCommitResponse(response)
if len(responseBytes) < 16 {
t.Fatalf("response too short: %d bytes", len(responseBytes))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(responseBytes[0:4])
if correlationID != 123 {
t.Errorf("expected correlation ID 123, got %d", correlationID)
}
}
func TestHandler_buildOffsetFetchResponse(t *testing.T) {
h := NewTestHandler()
defer h.Close()
response := OffsetFetchResponse{
CorrelationID: 124,
Topics: []OffsetFetchTopicResponse{
{
Name: "test-topic",
Partitions: []OffsetFetchPartitionResponse{
{
Index: 0,
Offset: 42,
LeaderEpoch: -1,
Metadata: "test-metadata",
ErrorCode: ErrorCodeNone,
},
},
},
},
ErrorCode: ErrorCodeNone,
}
responseBytes := h.buildOffsetFetchResponse(response, 5) // Use API version 5 (includes leader epoch)
if len(responseBytes) < 20 {
t.Fatalf("response too short: %d bytes", len(responseBytes))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(responseBytes[0:4])
if correlationID != 124 {
t.Errorf("expected correlation ID 124, got %d", correlationID)
}
}
// Helper functions for creating test request bodies
func createOffsetCommitRequestBody(groupID string, generationID int32, memberID string) []byte {
body := make([]byte, 0, 128)
// Group ID (string)
groupIDBytes := []byte(groupID)
groupIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(groupIDLength, uint16(len(groupIDBytes)))
body = append(body, groupIDLength...)
body = append(body, groupIDBytes...)
// Generation ID (4 bytes)
generationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(generationIDBytes, uint32(generationID))
body = append(body, generationIDBytes...)
// Member ID (string)
memberIDBytes := []byte(memberID)
memberIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLength, uint16(len(memberIDBytes)))
body = append(body, memberIDLength...)
body = append(body, memberIDBytes...)
// RetentionTime (8 bytes)
body = append(body, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)
// Topics count (1)
body = append(body, 0, 0, 0, 1)
// Topic name: "test-topic"
topic := "test-topic"
topicBytes := []byte(topic)
topicLen := make([]byte, 2)
binary.BigEndian.PutUint16(topicLen, uint16(len(topicBytes)))
body = append(body, topicLen...)
body = append(body, topicBytes...)
// Partitions count (1)
body = append(body, 0, 0, 0, 1)
// Partition 0 fields: index(4) + offset(8) + leader_epoch(4) + metadata(NULLABLE STRING)
body = append(body, 0, 0, 0, 0) // partition index 0
body = append(body, 0, 0, 0, 0, 0, 0, 0, 0) // offset 0
body = append(body, 0xFF, 0xFF, 0xFF, 0xFF) // leader epoch -1
// metadata: null (-1)
body = append(body, 0xFF, 0xFF)
return body
}
func createOffsetFetchRequestBody(groupID string) []byte {
body := make([]byte, 0, 64)
// Group ID (string)
groupIDBytes := []byte(groupID)
groupIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(groupIDLength, uint16(len(groupIDBytes)))
body = append(body, groupIDLength...)
body = append(body, groupIDBytes...)
// Topics count (1)
body = append(body, 0, 0, 0, 1)
// Topic name: "test-topic"
topic := "test-topic"
topicBytes := []byte(topic)
topicLen := make([]byte, 2)
binary.BigEndian.PutUint16(topicLen, uint16(len(topicBytes)))
body = append(body, topicLen...)
body = append(body, topicBytes...)
// Partitions count (1)
body = append(body, 0, 0, 0, 1)
// Partition 0 index
body = append(body, 0, 0, 0, 0)
return body
}