1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/joingroup.go
chrislu b30834cc95 kafka: fix deadlock issues in static membership tests
- Fix deadlock in FindStaticMember by adding FindStaticMemberLocked version
- Fix deadlock in RegisterStaticMember by adding RegisterStaticMemberLocked version
- Fix deadlock in UnregisterStaticMember by adding UnregisterStaticMemberLocked version
- Fix GroupInstanceID parsing in parseLeaveGroupRequest method
- All static membership tests now pass without deadlocks:
  - JoinGroup static membership (join, reconnection, dynamic members)
  - LeaveGroup static membership (leave, wrong instance ID validation)
  - DescribeGroups static membership

The deadlocks occurred because protocol handlers were calling GroupCoordinator
methods that tried to acquire locks on groups that were already locked by the
calling handler. The fix introduces *Locked versions of these methods that
assume the group is already locked by the caller.
2025-09-15 21:42:32 -07:00

972 lines
32 KiB
Go

package protocol
import (
"encoding/binary"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// JoinGroup API (key 11) - Consumer group protocol
// Handles consumer joining a consumer group and initial coordination
// JoinGroupRequest represents a JoinGroup request from a Kafka client
type JoinGroupRequest struct {
GroupID string
SessionTimeout int32
RebalanceTimeout int32
MemberID string // Empty for new members
GroupInstanceID string // Optional static membership
ProtocolType string // "consumer" for regular consumers
GroupProtocols []GroupProtocol
}
// GroupProtocol represents a supported assignment protocol
type GroupProtocol struct {
Name string
Metadata []byte
}
// JoinGroupResponse represents a JoinGroup response to a Kafka client
type JoinGroupResponse struct {
CorrelationID uint32
ErrorCode int16
GenerationID int32
GroupProtocol string
GroupLeader string
MemberID string
Version uint16
Members []JoinGroupMember // Only populated for group leader
}
// JoinGroupMember represents member info sent to group leader
type JoinGroupMember struct {
MemberID string
GroupInstanceID string
Metadata []byte
}
// Error codes for JoinGroup are imported from errors.go
func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 100 {
dumpLen = 100
}
fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse JoinGroup request
request, err := h.parseJoinGroupRequest(requestBody, apiVersion)
if err != nil {
fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err)
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n",
request.GroupID, request.MemberID, request.SessionTimeout)
fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols))
for i, protocol := range request.GroupProtocols {
fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n",
i, protocol.Name, len(protocol.Metadata), protocol.Metadata)
}
// Validate request
if request.GroupID == "" {
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) {
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout), nil
}
// Get or create consumer group
group := h.groupCoordinator.GetOrCreateGroup(request.GroupID)
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
// Handle member ID logic with static membership support
var memberID string
var isNewMember bool
var existingMember *consumer.GroupMember
// Check for static membership first
if request.GroupInstanceID != "" {
existingMember = h.groupCoordinator.FindStaticMemberLocked(group, request.GroupInstanceID)
if existingMember != nil {
memberID = existingMember.ID
isNewMember = false
fmt.Printf("DEBUG: JoinGroup found existing static member ID '%s' for instance '%s'\n", memberID, request.GroupInstanceID)
} else {
// New static member
memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "static")
isNewMember = true
fmt.Printf("DEBUG: JoinGroup generated new static member ID '%s' for instance '%s'\n", memberID, request.GroupInstanceID)
}
} else {
// Dynamic membership logic
clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
if request.MemberID == "" {
// New member - check if we already have a member for this client
var existingMemberID string
for existingID, member := range group.Members {
if member.ClientID == clientKey && !h.groupCoordinator.IsStaticMember(member) {
existingMemberID = existingID
break
}
}
if existingMemberID != "" {
// Reuse existing member ID for this client
memberID = existingMemberID
isNewMember = false
fmt.Printf("DEBUG: JoinGroup reusing existing member ID '%s' for client key '%s'\n", memberID, clientKey)
} else {
// Generate new deterministic member ID
memberID = h.groupCoordinator.GenerateMemberID(clientKey, "consumer")
isNewMember = true
fmt.Printf("DEBUG: JoinGroup generated new member ID '%s' for client key '%s'\n", memberID, clientKey)
}
} else {
memberID = request.MemberID
// Check if member exists
if _, exists := group.Members[memberID]; !exists {
// Member ID provided but doesn't exist - reject
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
}
isNewMember = false
fmt.Printf("DEBUG: JoinGroup using provided member ID '%s'\n", memberID)
}
}
// Check group state
fmt.Printf("DEBUG: JoinGroup current group state: %s, generation: %d\n", group.State, group.Generation)
switch group.State {
case consumer.GroupStateEmpty, consumer.GroupStateStable:
// Can join or trigger rebalance
if isNewMember || len(group.Members) == 0 {
group.State = consumer.GroupStatePreparingRebalance
group.Generation++
fmt.Printf("DEBUG: JoinGroup transitioned to PreparingRebalance, new generation: %d\n", group.Generation)
}
case consumer.GroupStatePreparingRebalance:
// Rebalance in progress - if this is the leader and we have members, transition to CompletingRebalance
if len(group.Members) > 0 && memberID == group.Leader {
group.State = consumer.GroupStateCompletingRebalance
fmt.Printf("DEBUG: JoinGroup leader '%s' transitioning group to CompletingRebalance (ready for SyncGroup)\n", memberID)
}
case consumer.GroupStateCompletingRebalance:
// Allow join but don't change generation until SyncGroup
case consumer.GroupStateDead:
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
// Extract client host from connection context
clientHost := ExtractClientHost(h.connContext)
fmt.Printf("DEBUG: JoinGroup extracted client host: %s\n", clientHost)
// Create or update member with enhanced metadata parsing
var groupInstanceID *string
if request.GroupInstanceID != "" {
groupInstanceID = &request.GroupInstanceID
}
// Use deterministic client identifier based on group + session timeout + protocol
clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
member := &consumer.GroupMember{
ID: memberID,
ClientID: clientKey, // Use deterministic client key for member identification
ClientHost: clientHost, // Now extracted from actual connection
GroupInstanceID: groupInstanceID,
SessionTimeout: request.SessionTimeout,
RebalanceTimeout: request.RebalanceTimeout,
Subscription: h.extractSubscriptionFromProtocolsEnhanced(request.GroupProtocols),
State: consumer.MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
// Store protocol metadata for leader
if len(request.GroupProtocols) > 0 {
if len(request.GroupProtocols[0].Metadata) == 0 {
// Generate subscription metadata for available topics
availableTopics := h.getAvailableTopics()
fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics)
metadata := make([]byte, 0, 64)
// Version (2 bytes) - use version 0
metadata = append(metadata, 0, 0)
// Topics count (4 bytes)
topicsCount := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics)))
metadata = append(metadata, topicsCount...)
// Topics (string array)
for _, topic := range availableTopics {
topicLen := make([]byte, 2)
binary.BigEndian.PutUint16(topicLen, uint16(len(topic)))
metadata = append(metadata, topicLen...)
metadata = append(metadata, []byte(topic)...)
}
// UserData length (4 bytes) - empty
metadata = append(metadata, 0, 0, 0, 0)
member.Metadata = metadata
fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata)
} else {
member.Metadata = request.GroupProtocols[0].Metadata
}
}
// Add member to group
group.Members[memberID] = member
// Register static member if applicable
if member.GroupInstanceID != nil && *member.GroupInstanceID != "" {
h.groupCoordinator.RegisterStaticMemberLocked(group, member)
fmt.Printf("DEBUG: JoinGroup registered static member '%s' with instance ID '%s'\n", memberID, *member.GroupInstanceID)
}
// Update group's subscribed topics
h.updateGroupSubscription(group)
// Select assignment protocol using enhanced selection logic
existingProtocols := make([]string, 0)
for _ = range group.Members {
// Collect protocols from existing members (simplified - in real implementation
// we'd track each member's supported protocols)
existingProtocols = append(existingProtocols, "range") // placeholder
}
groupProtocol := SelectBestProtocol(request.GroupProtocols, existingProtocols)
group.Protocol = groupProtocol
fmt.Printf("DEBUG: JoinGroup selected protocol: %s (from %d client protocols)\n",
groupProtocol, len(request.GroupProtocols))
// Select group leader (first member or keep existing if still present)
if group.Leader == "" || group.Members[group.Leader] == nil {
group.Leader = memberID
fmt.Printf("DEBUG: JoinGroup elected new leader: '%s' for group '%s'\n", memberID, request.GroupID)
} else {
fmt.Printf("DEBUG: JoinGroup keeping existing leader: '%s' for group '%s'\n", group.Leader, request.GroupID)
}
// Build response
response := JoinGroupResponse{
CorrelationID: correlationID,
ErrorCode: ErrorCodeNone,
GenerationID: group.Generation,
GroupProtocol: groupProtocol,
GroupLeader: group.Leader,
MemberID: memberID,
Version: apiVersion,
}
fmt.Printf("DEBUG: JoinGroup response - Generation: %d, Protocol: '%s', Leader: '%s', Member: '%s'\n",
response.GenerationID, response.GroupProtocol, response.GroupLeader, response.MemberID)
// If this member is the leader, include all member info
if memberID == group.Leader {
// TESTING: Try empty members array to see if that fixes the size issue
response.Members = make([]JoinGroupMember, 0)
} else {
}
return h.buildJoinGroupResponse(response), nil
}
func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGroupRequest, error) {
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// JoinGroup v5 body starts with GroupID according to Kafka spec
// GroupID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group ID length")
}
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
return nil, fmt.Errorf("invalid group ID length")
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// Session timeout (4 bytes)
if offset+4 > len(data) {
return nil, fmt.Errorf("missing session timeout")
}
sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
// Rebalance timeout (4 bytes) - for v1+ versions
rebalanceTimeout := sessionTimeout // Default to session timeout for v0
if apiVersion >= 1 && offset+4 <= len(data) {
rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
}
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
}
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
memberID := ""
if memberIDLength > 0 {
if offset+memberIDLength > len(data) {
return nil, fmt.Errorf("invalid member ID length")
}
memberID = string(data[offset : offset+memberIDLength])
offset += memberIDLength
}
// Parse Group Instance ID (nullable string) - for JoinGroup v5+
var groupInstanceID string
if apiVersion >= 5 {
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group instance ID length")
}
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 {
if offset+int(instanceIDLength) > len(data) {
return nil, fmt.Errorf("invalid group instance ID length")
}
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
}
// Parse Protocol Type
if len(data) < offset+2 {
return nil, fmt.Errorf("JoinGroup request missing protocol type")
}
protocolTypeLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
if len(data) < offset+int(protocolTypeLength) {
return nil, fmt.Errorf("JoinGroup request protocol type too short")
}
protocolType := string(data[offset : offset+int(protocolTypeLength)])
offset += int(protocolTypeLength)
// Parse Group Protocols array
if len(data) < offset+4 {
return nil, fmt.Errorf("JoinGroup request missing group protocols")
}
protocolsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
fmt.Printf("DEBUG: JoinGroup - GroupID: %s, SessionTimeout: %d, RebalanceTimeout: %d, MemberID: %s, ProtocolType: %s, ProtocolsCount: %d\n",
groupID, sessionTimeout, rebalanceTimeout, memberID, protocolType, protocolsCount)
protocols := make([]GroupProtocol, 0, protocolsCount)
for i := uint32(0); i < protocolsCount && offset < len(data); i++ {
// Parse protocol name
if len(data) < offset+2 {
break
}
protocolNameLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
if len(data) < offset+int(protocolNameLength) {
break
}
protocolName := string(data[offset : offset+int(protocolNameLength)])
offset += int(protocolNameLength)
// Parse protocol metadata
if len(data) < offset+4 {
break
}
metadataLength := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
var metadata []byte
if metadataLength > 0 && len(data) >= offset+int(metadataLength) {
metadata = make([]byte, metadataLength)
copy(metadata, data[offset:offset+int(metadataLength)])
offset += int(metadataLength)
}
protocols = append(protocols, GroupProtocol{
Name: protocolName,
Metadata: metadata,
})
fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength)
}
return &JoinGroupRequest{
GroupID: groupID,
SessionTimeout: sessionTimeout,
RebalanceTimeout: rebalanceTimeout,
MemberID: memberID,
GroupInstanceID: groupInstanceID,
ProtocolType: protocolType,
GroupProtocols: protocols,
}, nil
}
func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
// Estimate response size
estimatedSize := 0
// CorrelationID(4) + (optional throttle 4) + error_code(2) + generation_id(4)
if response.Version >= 2 {
estimatedSize = 4 + 4 + 2 + 4
} else {
estimatedSize = 4 + 2 + 4
}
estimatedSize += 2 + len(response.GroupProtocol) // protocol string
estimatedSize += 2 + len(response.GroupLeader) // leader string
estimatedSize += 2 + len(response.MemberID) // member id string
estimatedSize += 4 // members array count
for _, member := range response.Members {
// MemberID string
estimatedSize += 2 + len(member.MemberID)
if response.Version >= 5 {
// GroupInstanceID string
estimatedSize += 2 + len(member.GroupInstanceID)
}
// Metadata bytes (4 + len)
estimatedSize += 4 + len(member.Metadata)
}
result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// JoinGroup v2 adds throttle_time_ms
if response.Version >= 2 {
throttleTimeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling
result = append(result, throttleTimeBytes...)
}
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Generation ID (4 bytes)
generationBytes := make([]byte, 4)
binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID))
result = append(result, generationBytes...)
// Group protocol (string)
protocolLength := make([]byte, 2)
binary.BigEndian.PutUint16(protocolLength, uint16(len(response.GroupProtocol)))
result = append(result, protocolLength...)
result = append(result, []byte(response.GroupProtocol)...)
// Group leader (string)
leaderLength := make([]byte, 2)
binary.BigEndian.PutUint16(leaderLength, uint16(len(response.GroupLeader)))
result = append(result, leaderLength...)
result = append(result, []byte(response.GroupLeader)...)
// Member ID (string)
memberIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID)))
result = append(result, memberIDLength...)
result = append(result, []byte(response.MemberID)...)
// Members array (4 bytes count + members)
memberCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members)))
result = append(result, memberCountBytes...)
for _, member := range response.Members {
// Member ID (string)
memberLength := make([]byte, 2)
binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID)))
result = append(result, memberLength...)
result = append(result, []byte(member.MemberID)...)
if response.Version >= 5 {
// Group instance ID (string) - can be empty
instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
result = append(result, instanceIDLength...)
if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...)
}
}
// Metadata (bytes)
metadataLength := make([]byte, 4)
binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata)))
result = append(result, metadataLength...)
result = append(result, member.Metadata...)
}
return result
}
func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode int16) []byte {
response := JoinGroupResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
GenerationID: -1,
GroupProtocol: "",
GroupLeader: "",
MemberID: "",
Version: 2,
Members: []JoinGroupMember{},
}
return h.buildJoinGroupResponse(response)
}
// buildMinimalJoinGroupResponse creates a minimal hardcoded response for testing
func (h *Handler) buildMinimalJoinGroupResponse(correlationID uint32, apiVersion uint16) []byte {
// Create the absolute minimal JoinGroup response that should work with kafka-go
response := make([]byte, 0, 64)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes) - v2+ only
if apiVersion >= 2 {
response = append(response, 0, 0, 0, 0) // No throttling
}
// Error code (2 bytes) - 0 = success
response = append(response, 0, 0)
// Generation ID (4 bytes) - use 1
response = append(response, 0, 0, 0, 1)
// Group protocol (STRING) - "range"
response = append(response, 0, 5) // length
response = append(response, []byte("range")...)
// Group leader (STRING) - "test-member"
response = append(response, 0, 11) // length
response = append(response, []byte("test-member")...)
// Member ID (STRING) - "test-member" (same as leader)
response = append(response, 0, 11) // length
response = append(response, []byte("test-member")...)
// Members array (4 bytes count + members)
response = append(response, 0, 0, 0, 1) // 1 member
// Member 0:
// Member ID (STRING) - "test-member"
response = append(response, 0, 11) // length
response = append(response, []byte("test-member")...)
// Member metadata (BYTES) - empty
response = append(response, 0, 0, 0, 0) // 0 bytes
fmt.Printf("DEBUG: JoinGroup minimal response (%d bytes): %x\n", len(response), response)
return response
}
// extractSubscriptionFromProtocols - legacy method for backward compatibility
func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string {
return h.extractSubscriptionFromProtocolsEnhanced(protocols)
}
// extractSubscriptionFromProtocolsEnhanced uses improved metadata parsing with better error handling
func (h *Handler) extractSubscriptionFromProtocolsEnhanced(protocols []GroupProtocol) []string {
// Analyze protocol metadata for debugging
debugInfo := AnalyzeProtocolMetadata(protocols)
for _, info := range debugInfo {
if info.ParsedOK {
fmt.Printf("DEBUG: Protocol %s parsed successfully: version=%d, topics=%v\n",
info.Strategy, info.Version, info.Topics)
} else {
fmt.Printf("DEBUG: Protocol %s parse failed: %s\n", info.Strategy, info.ParseError)
}
}
// Extract topics using enhanced parsing
topics := ExtractTopicsFromMetadata(protocols, h.getAvailableTopics())
fmt.Printf("DEBUG: Enhanced subscription extraction result: %v\n", topics)
return topics
}
func (h *Handler) parseConsumerProtocolMetadata(metadata []byte) []string {
if len(metadata) < 6 { // version(2) + topics_count(4)
return nil
}
offset := 0
// Parse version (2 bytes)
version := binary.BigEndian.Uint16(metadata[offset : offset+2])
offset += 2
// Parse topics array
if len(metadata) < offset+4 {
return nil
}
topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4])
offset += 4
fmt.Printf("DEBUG: Consumer protocol metadata - Version: %d, TopicsCount: %d\n", version, topicsCount)
topics := make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(metadata); i++ {
// Parse topic name
if len(metadata) < offset+2 {
break
}
topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2])
offset += 2
if len(metadata) < offset+int(topicNameLength) {
break
}
topicName := string(metadata[offset : offset+int(topicNameLength)])
offset += int(topicNameLength)
topics = append(topics, topicName)
fmt.Printf("DEBUG: Consumer subscribed to topic: %s\n", topicName)
}
return topics
}
func (h *Handler) updateGroupSubscription(group *consumer.ConsumerGroup) {
// Update group's subscribed topics from all members
group.SubscribedTopics = make(map[string]bool)
for _, member := range group.Members {
for _, topic := range member.Subscription {
group.SubscribedTopics[topic] = true
}
}
}
// SyncGroup API (key 14) - Consumer group coordination completion
// Called by group members after JoinGroup to get partition assignments
// SyncGroupRequest represents a SyncGroup request from a Kafka client
type SyncGroupRequest struct {
GroupID string
GenerationID int32
MemberID string
GroupInstanceID string
GroupAssignments []GroupAssignment // Only from group leader
}
// GroupAssignment represents partition assignment for a group member
type GroupAssignment struct {
MemberID string
Assignment []byte // Serialized assignment data
}
// SyncGroupResponse represents a SyncGroup response to a Kafka client
type SyncGroupResponse struct {
CorrelationID uint32
ErrorCode int16
Assignment []byte // Serialized partition assignment for this member
}
// Additional error codes for SyncGroup
// Error codes for SyncGroup are imported from errors.go
func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 100 {
dumpLen = 100
}
fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse SyncGroup request
request, err := h.parseSyncGroupRequest(requestBody, apiVersion)
if err != nil {
fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n",
request.GroupID, request.MemberID, request.GenerationID)
// Validate request
if request.GroupID == "" || request.MemberID == "" {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
group.Mu.Lock()
defer group.Mu.Unlock()
// Update group's last activity
group.LastActivity = time.Now()
// Validate member exists
member, exists := group.Members[request.MemberID]
if !exists {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
}
// Validate generation
if request.GenerationID != group.Generation {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil
}
// Check if this is the group leader with assignments
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
// Leader is providing assignments - process and store them
err = h.processGroupAssignments(group, request.GroupAssignments)
if err != nil {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
}
// Move group to stable state
group.State = consumer.GroupStateStable
// Mark all members as stable
for _, m := range group.Members {
m.State = consumer.MemberStateStable
}
} else if group.State == consumer.GroupStateCompletingRebalance {
// Non-leader member waiting for assignments
// Assignments should already be processed by leader
} else {
// Trigger partition assignment using built-in strategy
topicPartitions := h.getTopicPartitions(group)
group.AssignPartitions(topicPartitions)
group.State = consumer.GroupStateStable
for _, m := range group.Members {
m.State = consumer.MemberStateStable
}
}
// Get assignment for this member
assignment := h.serializeMemberAssignment(member.Assignment)
// Build response
response := SyncGroupResponse{
CorrelationID: correlationID,
ErrorCode: ErrorCodeNone,
Assignment: assignment,
}
return h.buildSyncGroupResponse(response, apiVersion), nil
}
func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGroupRequest, error) {
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
offset := 0
// SyncGroup v3 body starts with GroupID according to Kafka spec
// GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
return nil, fmt.Errorf("invalid group ID length")
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
// Generation ID (4 bytes)
if offset+4 > len(data) {
return nil, fmt.Errorf("missing generation ID")
}
generationID := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
// MemberID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing member ID length")
}
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+memberIDLength > len(data) {
return nil, fmt.Errorf("invalid member ID length")
}
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
// GroupInstanceID (nullable string) - for SyncGroup v3+
var groupInstanceID string
if apiVersion >= 3 {
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group instance ID length")
}
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 {
if offset+int(instanceIDLength) > len(data) {
return nil, fmt.Errorf("invalid group instance ID length")
}
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
}
// For simplicity, we'll parse basic fields
// In a full implementation, we'd parse the full group assignments array
return &SyncGroupRequest{
GroupID: groupID,
GenerationID: generationID,
MemberID: memberID,
GroupInstanceID: groupInstanceID,
GroupAssignments: []GroupAssignment{},
}, nil
}
func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse, apiVersion uint16) []byte {
estimatedSize := 16 + len(response.Assignment)
result := make([]byte, 0, estimatedSize)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// SyncGroup v1+ has throttle_time_ms at the beginning
// SyncGroup v0 does NOT include throttle_time_ms
if apiVersion >= 1 {
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
}
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
result = append(result, errorCodeBytes...)
// Assignment (bytes)
assignmentLength := make([]byte, 4)
binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment)))
result = append(result, assignmentLength...)
result = append(result, response.Assignment...)
return result
}
func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte {
response := SyncGroupResponse{
CorrelationID: correlationID,
ErrorCode: errorCode,
Assignment: []byte{},
}
return h.buildSyncGroupResponse(response, apiVersion)
}
func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error {
// In a full implementation, we'd deserialize the assignment data
// and update each member's partition assignment
// For now, we'll trigger our own assignment logic
topicPartitions := h.getTopicPartitions(group)
group.AssignPartitions(topicPartitions)
return nil
}
func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 {
topicPartitions := make(map[string][]int32)
// Get partition info for all subscribed topics
for topic := range group.SubscribedTopics {
// Check if topic exists using SeaweedMQ handler
if h.seaweedMQHandler.TopicExists(topic) {
// For now, assume 1 partition per topic (can be extended later)
// In a real implementation, this would query SeaweedMQ for actual partition count
partitions := []int32{0}
topicPartitions[topic] = partitions
} else {
// Default to single partition if topic not found
topicPartitions[topic] = []int32{0}
}
}
return topicPartitions
}
func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssignment) []byte {
// Build ConsumerGroupMemberAssignment format exactly as Sarama expects:
// Version(2) + Topics array + UserData bytes
// Group assignments by topic
topicAssignments := make(map[string][]int32)
for _, assignment := range assignments {
topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition)
}
result := make([]byte, 0, 64)
// Version (2 bytes) - use version 1
result = append(result, 0, 1)
// Number of topics (4 bytes) - array length
numTopicsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments)))
result = append(result, numTopicsBytes...)
// Topics - each topic follows Kafka string + int32 array format
for topic, partitions := range topicAssignments {
// Topic name as Kafka string: length(2) + content
topicLenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic)))
result = append(result, topicLenBytes...)
result = append(result, []byte(topic)...)
// Partitions as int32 array: length(4) + elements
numPartitionsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions)))
result = append(result, numPartitionsBytes...)
// Partitions (4 bytes each)
for _, partition := range partitions {
partitionBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionBytes, uint32(partition))
result = append(result, partitionBytes...)
}
}
// UserData as Kafka bytes: length(4) + data (empty in our case)
// For empty user data, just put length = 0
result = append(result, 0, 0, 0, 0)
fmt.Printf("DEBUG: Generated assignment bytes (%d): %x\n", len(result), result)
return result
}
// getAvailableTopics returns list of available topics for subscription metadata
func (h *Handler) getAvailableTopics() []string {
return h.seaweedMQHandler.ListTopics()
}