package protocol import ( "encoding/binary" "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) // JoinGroup API (key 11) - Consumer group protocol // Handles consumer joining a consumer group and initial coordination // JoinGroupRequest represents a JoinGroup request from a Kafka client type JoinGroupRequest struct { GroupID string SessionTimeout int32 RebalanceTimeout int32 MemberID string // Empty for new members GroupInstanceID string // Optional static membership ProtocolType string // "consumer" for regular consumers GroupProtocols []GroupProtocol } // GroupProtocol represents a supported assignment protocol type GroupProtocol struct { Name string Metadata []byte } // JoinGroupResponse represents a JoinGroup response to a Kafka client type JoinGroupResponse struct { CorrelationID uint32 ErrorCode int16 GenerationID int32 GroupProtocol string GroupLeader string MemberID string Version uint16 Members []JoinGroupMember // Only populated for group leader } // JoinGroupMember represents member info sent to group leader type JoinGroupMember struct { MemberID string GroupInstanceID string Metadata []byte } // Error codes for JoinGroup are imported from errors.go func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // DEBUG: Hex dump the request to understand format dumpLen := len(requestBody) if dumpLen > 100 { dumpLen = 100 } fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) // Parse JoinGroup request request, err := h.parseJoinGroupRequest(requestBody, apiVersion) if err != nil { fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err) return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n", request.GroupID, request.MemberID, request.SessionTimeout) fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols)) for i, protocol := range request.GroupProtocols { fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n", i, protocol.Name, len(protocol.Metadata), protocol.Metadata) } // Validate request if request.GroupID == "" { return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) { return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout), nil } // Get or create consumer group group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) group.Mu.Lock() defer group.Mu.Unlock() // Update group's last activity group.LastActivity = time.Now() // Handle member ID logic with static membership support var memberID string var isNewMember bool var existingMember *consumer.GroupMember // Check for static membership first if request.GroupInstanceID != "" { existingMember = h.groupCoordinator.FindStaticMemberLocked(group, request.GroupInstanceID) if existingMember != nil { memberID = existingMember.ID isNewMember = false fmt.Printf("DEBUG: JoinGroup found existing static member ID '%s' for instance '%s'\n", memberID, request.GroupInstanceID) } else { // New static member memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "static") isNewMember = true fmt.Printf("DEBUG: JoinGroup generated new static member ID '%s' for instance '%s'\n", memberID, request.GroupInstanceID) } } else { // Dynamic membership logic clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) if request.MemberID == "" { // New member - check if we already have a member for this client var existingMemberID string for existingID, member := range group.Members { if member.ClientID == clientKey && !h.groupCoordinator.IsStaticMember(member) { existingMemberID = existingID break } } if existingMemberID != "" { // Reuse existing member ID for this client memberID = existingMemberID isNewMember = false fmt.Printf("DEBUG: JoinGroup reusing existing member ID '%s' for client key '%s'\n", memberID, clientKey) } else { // Generate new deterministic member ID memberID = h.groupCoordinator.GenerateMemberID(clientKey, "consumer") isNewMember = true fmt.Printf("DEBUG: JoinGroup generated new member ID '%s' for client key '%s'\n", memberID, clientKey) } } else { memberID = request.MemberID // Check if member exists if _, exists := group.Members[memberID]; !exists { // Member ID provided but doesn't exist - reject return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } isNewMember = false fmt.Printf("DEBUG: JoinGroup using provided member ID '%s'\n", memberID) } } // Check group state fmt.Printf("DEBUG: JoinGroup current group state: %s, generation: %d\n", group.State, group.Generation) switch group.State { case consumer.GroupStateEmpty, consumer.GroupStateStable: // Can join or trigger rebalance if isNewMember || len(group.Members) == 0 { group.State = consumer.GroupStatePreparingRebalance group.Generation++ fmt.Printf("DEBUG: JoinGroup transitioned to PreparingRebalance, new generation: %d\n", group.Generation) } case consumer.GroupStatePreparingRebalance: // Rebalance in progress - if this is the leader and we have members, transition to CompletingRebalance if len(group.Members) > 0 && memberID == group.Leader { group.State = consumer.GroupStateCompletingRebalance fmt.Printf("DEBUG: JoinGroup leader '%s' transitioning group to CompletingRebalance (ready for SyncGroup)\n", memberID) } case consumer.GroupStateCompletingRebalance: // Allow join but don't change generation until SyncGroup case consumer.GroupStateDead: return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } // Extract client host from connection context clientHost := ExtractClientHost(h.connContext) fmt.Printf("DEBUG: JoinGroup extracted client host: %s\n", clientHost) // Create or update member with enhanced metadata parsing var groupInstanceID *string if request.GroupInstanceID != "" { groupInstanceID = &request.GroupInstanceID } // Use deterministic client identifier based on group + session timeout + protocol clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) member := &consumer.GroupMember{ ID: memberID, ClientID: clientKey, // Use deterministic client key for member identification ClientHost: clientHost, // Now extracted from actual connection GroupInstanceID: groupInstanceID, SessionTimeout: request.SessionTimeout, RebalanceTimeout: request.RebalanceTimeout, Subscription: h.extractSubscriptionFromProtocolsEnhanced(request.GroupProtocols), State: consumer.MemberStatePending, LastHeartbeat: time.Now(), JoinedAt: time.Now(), } // Store protocol metadata for leader if len(request.GroupProtocols) > 0 { if len(request.GroupProtocols[0].Metadata) == 0 { // Generate subscription metadata for available topics availableTopics := h.getAvailableTopics() fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) metadata := make([]byte, 0, 64) // Version (2 bytes) - use version 0 metadata = append(metadata, 0, 0) // Topics count (4 bytes) topicsCount := make([]byte, 4) binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics))) metadata = append(metadata, topicsCount...) // Topics (string array) for _, topic := range availableTopics { topicLen := make([]byte, 2) binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) metadata = append(metadata, topicLen...) metadata = append(metadata, []byte(topic)...) } // UserData length (4 bytes) - empty metadata = append(metadata, 0, 0, 0, 0) member.Metadata = metadata fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata) } else { member.Metadata = request.GroupProtocols[0].Metadata } } // Add member to group group.Members[memberID] = member // Register static member if applicable if member.GroupInstanceID != nil && *member.GroupInstanceID != "" { h.groupCoordinator.RegisterStaticMemberLocked(group, member) fmt.Printf("DEBUG: JoinGroup registered static member '%s' with instance ID '%s'\n", memberID, *member.GroupInstanceID) } // Update group's subscribed topics h.updateGroupSubscription(group) // Select assignment protocol using enhanced selection logic existingProtocols := make([]string, 0) for _ = range group.Members { // Collect protocols from existing members (simplified - in real implementation // we'd track each member's supported protocols) existingProtocols = append(existingProtocols, "range") // placeholder } groupProtocol := SelectBestProtocol(request.GroupProtocols, existingProtocols) group.Protocol = groupProtocol fmt.Printf("DEBUG: JoinGroup selected protocol: %s (from %d client protocols)\n", groupProtocol, len(request.GroupProtocols)) // Select group leader (first member or keep existing if still present) if group.Leader == "" || group.Members[group.Leader] == nil { group.Leader = memberID fmt.Printf("DEBUG: JoinGroup elected new leader: '%s' for group '%s'\n", memberID, request.GroupID) } else { fmt.Printf("DEBUG: JoinGroup keeping existing leader: '%s' for group '%s'\n", group.Leader, request.GroupID) } // Build response response := JoinGroupResponse{ CorrelationID: correlationID, ErrorCode: ErrorCodeNone, GenerationID: group.Generation, GroupProtocol: groupProtocol, GroupLeader: group.Leader, MemberID: memberID, Version: apiVersion, } fmt.Printf("DEBUG: JoinGroup response - Generation: %d, Protocol: '%s', Leader: '%s', Member: '%s'\n", response.GenerationID, response.GroupProtocol, response.GroupLeader, response.MemberID) // If this member is the leader, include all member info if memberID == group.Leader { // TESTING: Try empty members array to see if that fixes the size issue response.Members = make([]JoinGroupMember, 0) } else { } return h.buildJoinGroupResponse(response), nil } func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGroupRequest, error) { if len(data) < 8 { return nil, fmt.Errorf("request too short") } offset := 0 // JoinGroup v5 body starts with GroupID according to Kafka spec // GroupID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing group ID length") } groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 if offset+groupIDLength > len(data) { return nil, fmt.Errorf("invalid group ID length") } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength // Session timeout (4 bytes) if offset+4 > len(data) { return nil, fmt.Errorf("missing session timeout") } sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 // Rebalance timeout (4 bytes) - for v1+ versions rebalanceTimeout := sessionTimeout // Default to session timeout for v0 if apiVersion >= 1 && offset+4 <= len(data) { rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 } // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") } memberIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 memberID := "" if memberIDLength > 0 { if offset+memberIDLength > len(data) { return nil, fmt.Errorf("invalid member ID length") } memberID = string(data[offset : offset+memberIDLength]) offset += memberIDLength } // Parse Group Instance ID (nullable string) - for JoinGroup v5+ var groupInstanceID string if apiVersion >= 5 { if offset+2 > len(data) { return nil, fmt.Errorf("missing group instance ID length") } instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:])) offset += 2 if instanceIDLength == -1 { groupInstanceID = "" // null string } else if instanceIDLength >= 0 { if offset+int(instanceIDLength) > len(data) { return nil, fmt.Errorf("invalid group instance ID length") } groupInstanceID = string(data[offset : offset+int(instanceIDLength)]) offset += int(instanceIDLength) } } // Parse Protocol Type if len(data) < offset+2 { return nil, fmt.Errorf("JoinGroup request missing protocol type") } protocolTypeLength := binary.BigEndian.Uint16(data[offset : offset+2]) offset += 2 if len(data) < offset+int(protocolTypeLength) { return nil, fmt.Errorf("JoinGroup request protocol type too short") } protocolType := string(data[offset : offset+int(protocolTypeLength)]) offset += int(protocolTypeLength) // Parse Group Protocols array if len(data) < offset+4 { return nil, fmt.Errorf("JoinGroup request missing group protocols") } protocolsCount := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 fmt.Printf("DEBUG: JoinGroup - GroupID: %s, SessionTimeout: %d, RebalanceTimeout: %d, MemberID: %s, ProtocolType: %s, ProtocolsCount: %d\n", groupID, sessionTimeout, rebalanceTimeout, memberID, protocolType, protocolsCount) protocols := make([]GroupProtocol, 0, protocolsCount) for i := uint32(0); i < protocolsCount && offset < len(data); i++ { // Parse protocol name if len(data) < offset+2 { break } protocolNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) offset += 2 if len(data) < offset+int(protocolNameLength) { break } protocolName := string(data[offset : offset+int(protocolNameLength)]) offset += int(protocolNameLength) // Parse protocol metadata if len(data) < offset+4 { break } metadataLength := binary.BigEndian.Uint32(data[offset : offset+4]) offset += 4 var metadata []byte if metadataLength > 0 && len(data) >= offset+int(metadataLength) { metadata = make([]byte, metadataLength) copy(metadata, data[offset:offset+int(metadataLength)]) offset += int(metadataLength) } protocols = append(protocols, GroupProtocol{ Name: protocolName, Metadata: metadata, }) fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength) } return &JoinGroupRequest{ GroupID: groupID, SessionTimeout: sessionTimeout, RebalanceTimeout: rebalanceTimeout, MemberID: memberID, GroupInstanceID: groupInstanceID, ProtocolType: protocolType, GroupProtocols: protocols, }, nil } func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { // Estimate response size estimatedSize := 0 // CorrelationID(4) + (optional throttle 4) + error_code(2) + generation_id(4) if response.Version >= 2 { estimatedSize = 4 + 4 + 2 + 4 } else { estimatedSize = 4 + 2 + 4 } estimatedSize += 2 + len(response.GroupProtocol) // protocol string estimatedSize += 2 + len(response.GroupLeader) // leader string estimatedSize += 2 + len(response.MemberID) // member id string estimatedSize += 4 // members array count for _, member := range response.Members { // MemberID string estimatedSize += 2 + len(member.MemberID) if response.Version >= 5 { // GroupInstanceID string estimatedSize += 2 + len(member.GroupInstanceID) } // Metadata bytes (4 + len) estimatedSize += 4 + len(member.Metadata) } result := make([]byte, 0, estimatedSize) // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) // JoinGroup v2 adds throttle_time_ms if response.Version >= 2 { throttleTimeBytes := make([]byte, 4) binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling result = append(result, throttleTimeBytes...) } // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) result = append(result, errorCodeBytes...) // Generation ID (4 bytes) generationBytes := make([]byte, 4) binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID)) result = append(result, generationBytes...) // Group protocol (string) protocolLength := make([]byte, 2) binary.BigEndian.PutUint16(protocolLength, uint16(len(response.GroupProtocol))) result = append(result, protocolLength...) result = append(result, []byte(response.GroupProtocol)...) // Group leader (string) leaderLength := make([]byte, 2) binary.BigEndian.PutUint16(leaderLength, uint16(len(response.GroupLeader))) result = append(result, leaderLength...) result = append(result, []byte(response.GroupLeader)...) // Member ID (string) memberIDLength := make([]byte, 2) binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID))) result = append(result, memberIDLength...) result = append(result, []byte(response.MemberID)...) // Members array (4 bytes count + members) memberCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members))) result = append(result, memberCountBytes...) for _, member := range response.Members { // Member ID (string) memberLength := make([]byte, 2) binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID))) result = append(result, memberLength...) result = append(result, []byte(member.MemberID)...) if response.Version >= 5 { // Group instance ID (string) - can be empty instanceIDLength := make([]byte, 2) binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) result = append(result, instanceIDLength...) if len(member.GroupInstanceID) > 0 { result = append(result, []byte(member.GroupInstanceID)...) } } // Metadata (bytes) metadataLength := make([]byte, 4) binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata))) result = append(result, metadataLength...) result = append(result, member.Metadata...) } return result } func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode int16) []byte { response := JoinGroupResponse{ CorrelationID: correlationID, ErrorCode: errorCode, GenerationID: -1, GroupProtocol: "", GroupLeader: "", MemberID: "", Version: 2, Members: []JoinGroupMember{}, } return h.buildJoinGroupResponse(response) } // buildMinimalJoinGroupResponse creates a minimal hardcoded response for testing func (h *Handler) buildMinimalJoinGroupResponse(correlationID uint32, apiVersion uint16) []byte { // Create the absolute minimal JoinGroup response that should work with kafka-go response := make([]byte, 0, 64) // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) // Throttle time (4 bytes) - v2+ only if apiVersion >= 2 { response = append(response, 0, 0, 0, 0) // No throttling } // Error code (2 bytes) - 0 = success response = append(response, 0, 0) // Generation ID (4 bytes) - use 1 response = append(response, 0, 0, 0, 1) // Group protocol (STRING) - "range" response = append(response, 0, 5) // length response = append(response, []byte("range")...) // Group leader (STRING) - "test-member" response = append(response, 0, 11) // length response = append(response, []byte("test-member")...) // Member ID (STRING) - "test-member" (same as leader) response = append(response, 0, 11) // length response = append(response, []byte("test-member")...) // Members array (4 bytes count + members) response = append(response, 0, 0, 0, 1) // 1 member // Member 0: // Member ID (STRING) - "test-member" response = append(response, 0, 11) // length response = append(response, []byte("test-member")...) // Member metadata (BYTES) - empty response = append(response, 0, 0, 0, 0) // 0 bytes fmt.Printf("DEBUG: JoinGroup minimal response (%d bytes): %x\n", len(response), response) return response } // extractSubscriptionFromProtocols - legacy method for backward compatibility func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string { return h.extractSubscriptionFromProtocolsEnhanced(protocols) } // extractSubscriptionFromProtocolsEnhanced uses improved metadata parsing with better error handling func (h *Handler) extractSubscriptionFromProtocolsEnhanced(protocols []GroupProtocol) []string { // Analyze protocol metadata for debugging debugInfo := AnalyzeProtocolMetadata(protocols) for _, info := range debugInfo { if info.ParsedOK { fmt.Printf("DEBUG: Protocol %s parsed successfully: version=%d, topics=%v\n", info.Strategy, info.Version, info.Topics) } else { fmt.Printf("DEBUG: Protocol %s parse failed: %s\n", info.Strategy, info.ParseError) } } // Extract topics using enhanced parsing topics := ExtractTopicsFromMetadata(protocols, h.getAvailableTopics()) fmt.Printf("DEBUG: Enhanced subscription extraction result: %v\n", topics) return topics } func (h *Handler) parseConsumerProtocolMetadata(metadata []byte) []string { if len(metadata) < 6 { // version(2) + topics_count(4) return nil } offset := 0 // Parse version (2 bytes) version := binary.BigEndian.Uint16(metadata[offset : offset+2]) offset += 2 // Parse topics array if len(metadata) < offset+4 { return nil } topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4]) offset += 4 fmt.Printf("DEBUG: Consumer protocol metadata - Version: %d, TopicsCount: %d\n", version, topicsCount) topics := make([]string, 0, topicsCount) for i := uint32(0); i < topicsCount && offset < len(metadata); i++ { // Parse topic name if len(metadata) < offset+2 { break } topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2]) offset += 2 if len(metadata) < offset+int(topicNameLength) { break } topicName := string(metadata[offset : offset+int(topicNameLength)]) offset += int(topicNameLength) topics = append(topics, topicName) fmt.Printf("DEBUG: Consumer subscribed to topic: %s\n", topicName) } return topics } func (h *Handler) updateGroupSubscription(group *consumer.ConsumerGroup) { // Update group's subscribed topics from all members group.SubscribedTopics = make(map[string]bool) for _, member := range group.Members { for _, topic := range member.Subscription { group.SubscribedTopics[topic] = true } } } // SyncGroup API (key 14) - Consumer group coordination completion // Called by group members after JoinGroup to get partition assignments // SyncGroupRequest represents a SyncGroup request from a Kafka client type SyncGroupRequest struct { GroupID string GenerationID int32 MemberID string GroupInstanceID string GroupAssignments []GroupAssignment // Only from group leader } // GroupAssignment represents partition assignment for a group member type GroupAssignment struct { MemberID string Assignment []byte // Serialized assignment data } // SyncGroupResponse represents a SyncGroup response to a Kafka client type SyncGroupResponse struct { CorrelationID uint32 ErrorCode int16 Assignment []byte // Serialized partition assignment for this member } // Additional error codes for SyncGroup // Error codes for SyncGroup are imported from errors.go func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // DEBUG: Hex dump the request to understand format dumpLen := len(requestBody) if dumpLen > 100 { dumpLen = 100 } fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) // Parse SyncGroup request request, err := h.parseSyncGroupRequest(requestBody, apiVersion) if err != nil { fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err) return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n", request.GroupID, request.MemberID, request.GenerationID) // Validate request if request.GroupID == "" || request.MemberID == "" { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil } group.Mu.Lock() defer group.Mu.Unlock() // Update group's last activity group.LastActivity = time.Now() // Validate member exists member, exists := group.Members[request.MemberID] if !exists { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil } // Validate generation if request.GenerationID != group.Generation { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration, apiVersion), nil } // Check if this is the group leader with assignments if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { // Leader is providing assignments - process and store them err = h.processGroupAssignments(group, request.GroupAssignments) if err != nil { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil } // Move group to stable state group.State = consumer.GroupStateStable // Mark all members as stable for _, m := range group.Members { m.State = consumer.MemberStateStable } } else if group.State == consumer.GroupStateCompletingRebalance { // Non-leader member waiting for assignments // Assignments should already be processed by leader } else { // Trigger partition assignment using built-in strategy topicPartitions := h.getTopicPartitions(group) group.AssignPartitions(topicPartitions) group.State = consumer.GroupStateStable for _, m := range group.Members { m.State = consumer.MemberStateStable } } // Get assignment for this member assignment := h.serializeMemberAssignment(member.Assignment) // Build response response := SyncGroupResponse{ CorrelationID: correlationID, ErrorCode: ErrorCodeNone, Assignment: assignment, } return h.buildSyncGroupResponse(response, apiVersion), nil } func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGroupRequest, error) { if len(data) < 8 { return nil, fmt.Errorf("request too short") } offset := 0 // SyncGroup v3 body starts with GroupID according to Kafka spec // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 if offset+groupIDLength > len(data) { return nil, fmt.Errorf("invalid group ID length") } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength // Generation ID (4 bytes) if offset+4 > len(data) { return nil, fmt.Errorf("missing generation ID") } generationID := int32(binary.BigEndian.Uint32(data[offset:])) offset += 4 // MemberID (string) if offset+2 > len(data) { return nil, fmt.Errorf("missing member ID length") } memberIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 if offset+memberIDLength > len(data) { return nil, fmt.Errorf("invalid member ID length") } memberID := string(data[offset : offset+memberIDLength]) offset += memberIDLength // GroupInstanceID (nullable string) - for SyncGroup v3+ var groupInstanceID string if apiVersion >= 3 { if offset+2 > len(data) { return nil, fmt.Errorf("missing group instance ID length") } instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:])) offset += 2 if instanceIDLength == -1 { groupInstanceID = "" // null string } else if instanceIDLength >= 0 { if offset+int(instanceIDLength) > len(data) { return nil, fmt.Errorf("invalid group instance ID length") } groupInstanceID = string(data[offset : offset+int(instanceIDLength)]) offset += int(instanceIDLength) } } // For simplicity, we'll parse basic fields // In a full implementation, we'd parse the full group assignments array return &SyncGroupRequest{ GroupID: groupID, GenerationID: generationID, MemberID: memberID, GroupInstanceID: groupInstanceID, GroupAssignments: []GroupAssignment{}, }, nil } func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse, apiVersion uint16) []byte { estimatedSize := 16 + len(response.Assignment) result := make([]byte, 0, estimatedSize) // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) // SyncGroup v1+ has throttle_time_ms at the beginning // SyncGroup v0 does NOT include throttle_time_ms if apiVersion >= 1 { // Throttle time (4 bytes, 0 = no throttling) result = append(result, 0, 0, 0, 0) } // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) result = append(result, errorCodeBytes...) // Assignment (bytes) assignmentLength := make([]byte, 4) binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment))) result = append(result, assignmentLength...) result = append(result, response.Assignment...) return result } func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte { response := SyncGroupResponse{ CorrelationID: correlationID, ErrorCode: errorCode, Assignment: []byte{}, } return h.buildSyncGroupResponse(response, apiVersion) } func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error { // In a full implementation, we'd deserialize the assignment data // and update each member's partition assignment // For now, we'll trigger our own assignment logic topicPartitions := h.getTopicPartitions(group) group.AssignPartitions(topicPartitions) return nil } func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 { topicPartitions := make(map[string][]int32) // Get partition info for all subscribed topics for topic := range group.SubscribedTopics { // Check if topic exists using SeaweedMQ handler if h.seaweedMQHandler.TopicExists(topic) { // For now, assume 1 partition per topic (can be extended later) // In a real implementation, this would query SeaweedMQ for actual partition count partitions := []int32{0} topicPartitions[topic] = partitions } else { // Default to single partition if topic not found topicPartitions[topic] = []int32{0} } } return topicPartitions } func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssignment) []byte { // Build ConsumerGroupMemberAssignment format exactly as Sarama expects: // Version(2) + Topics array + UserData bytes // Group assignments by topic topicAssignments := make(map[string][]int32) for _, assignment := range assignments { topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition) } result := make([]byte, 0, 64) // Version (2 bytes) - use version 1 result = append(result, 0, 1) // Number of topics (4 bytes) - array length numTopicsBytes := make([]byte, 4) binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments))) result = append(result, numTopicsBytes...) // Topics - each topic follows Kafka string + int32 array format for topic, partitions := range topicAssignments { // Topic name as Kafka string: length(2) + content topicLenBytes := make([]byte, 2) binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic))) result = append(result, topicLenBytes...) result = append(result, []byte(topic)...) // Partitions as int32 array: length(4) + elements numPartitionsBytes := make([]byte, 4) binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions))) result = append(result, numPartitionsBytes...) // Partitions (4 bytes each) for _, partition := range partitions { partitionBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionBytes, uint32(partition)) result = append(result, partitionBytes...) } } // UserData as Kafka bytes: length(4) + data (empty in our case) // For empty user data, just put length = 0 result = append(result, 0, 0, 0, 0) fmt.Printf("DEBUG: Generated assignment bytes (%d): %x\n", len(result), result) return result } // getAvailableTopics returns list of available topics for subscription metadata func (h *Handler) getAvailableTopics() []string { return h.seaweedMQHandler.ListTopics() }