1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/consumer_group_metadata.go
chrislu 343dd4b2e7 fix: Handle null user data in JoinGroup protocol metadata parsing
- Handle 0xFFFFFFFF (-1) as null/empty user data per Kafka protocol convention
- Limit Metadata API to v6 for kafka-go compatibility (was v7)
- This fixes 'unreasonable user data length: 4294967295' error

Progress: Consumer group protocol and message production now working!
 Produce API - working
 JoinGroup/SyncGroup - working
 Protocol metadata parsing - fixed

Next: Fix Fetch response format (61 unread bytes during message consumption)
2025-09-15 10:46:59 -07:00

305 lines
8.8 KiB
Go

package protocol
import (
"encoding/binary"
"fmt"
"net"
"strings"
)
// ConsumerProtocolMetadata represents parsed consumer protocol metadata
type ConsumerProtocolMetadata struct {
Version int16 // Protocol metadata version
Topics []string // Subscribed topic names
UserData []byte // Optional user data
AssignmentStrategy string // Preferred assignment strategy
}
// ConnectionContext holds connection-specific information for requests
type ConnectionContext struct {
RemoteAddr net.Addr // Client's remote address
LocalAddr net.Addr // Server's local address
ConnectionID string // Connection identifier
}
// ExtractClientHost extracts the client hostname/IP from connection context
func ExtractClientHost(connCtx *ConnectionContext) string {
if connCtx == nil || connCtx.RemoteAddr == nil {
return "unknown"
}
// Extract host portion from address
if tcpAddr, ok := connCtx.RemoteAddr.(*net.TCPAddr); ok {
return tcpAddr.IP.String()
}
// Fallback: parse string representation
addrStr := connCtx.RemoteAddr.String()
if host, _, err := net.SplitHostPort(addrStr); err == nil {
return host
}
// Last resort: return full address
return addrStr
}
// ParseConsumerProtocolMetadata parses consumer protocol metadata with enhanced error handling
func ParseConsumerProtocolMetadata(metadata []byte, strategyName string) (*ConsumerProtocolMetadata, error) {
if len(metadata) < 2 {
return &ConsumerProtocolMetadata{
Version: 0,
Topics: []string{},
UserData: []byte{},
AssignmentStrategy: strategyName,
}, nil
}
result := &ConsumerProtocolMetadata{
AssignmentStrategy: strategyName,
}
offset := 0
// Parse version (2 bytes)
if len(metadata) < offset+2 {
return nil, fmt.Errorf("metadata too short for version field")
}
result.Version = int16(binary.BigEndian.Uint16(metadata[offset : offset+2]))
offset += 2
// Parse topics array
if len(metadata) < offset+4 {
return nil, fmt.Errorf("metadata too short for topics count")
}
topicsCount := binary.BigEndian.Uint32(metadata[offset : offset+4])
offset += 4
// Validate topics count (reasonable limit)
if topicsCount > 10000 {
return nil, fmt.Errorf("unreasonable topics count: %d", topicsCount)
}
result.Topics = make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(metadata); i++ {
// Parse topic name length
if len(metadata) < offset+2 {
return nil, fmt.Errorf("metadata too short for topic %d name length", i)
}
topicNameLength := binary.BigEndian.Uint16(metadata[offset : offset+2])
offset += 2
// Validate topic name length
if topicNameLength > 1000 {
return nil, fmt.Errorf("unreasonable topic name length: %d", topicNameLength)
}
if len(metadata) < offset+int(topicNameLength) {
return nil, fmt.Errorf("metadata too short for topic %d name data", i)
}
topicName := string(metadata[offset : offset+int(topicNameLength)])
offset += int(topicNameLength)
// Validate topic name (basic validation)
if len(topicName) == 0 {
continue // Skip empty topic names
}
result.Topics = append(result.Topics, topicName)
}
// Parse user data if remaining bytes exist
if len(metadata) >= offset+4 {
userDataLength := binary.BigEndian.Uint32(metadata[offset : offset+4])
offset += 4
// Handle -1 (0xFFFFFFFF) as null/empty user data (Kafka protocol convention)
if userDataLength == 0xFFFFFFFF {
result.UserData = []byte{}
return result, nil
}
// Validate user data length
if userDataLength > 100000 { // 100KB limit
return nil, fmt.Errorf("unreasonable user data length: %d", userDataLength)
}
if len(metadata) >= offset+int(userDataLength) {
result.UserData = make([]byte, userDataLength)
copy(result.UserData, metadata[offset:offset+int(userDataLength)])
}
}
return result, nil
}
// GenerateConsumerProtocolMetadata creates protocol metadata for a consumer subscription
func GenerateConsumerProtocolMetadata(topics []string, userData []byte) []byte {
// Calculate total size needed
size := 2 + 4 + 4 // version + topics_count + user_data_length
for _, topic := range topics {
size += 2 + len(topic) // topic_name_length + topic_name
}
size += len(userData)
metadata := make([]byte, 0, size)
// Version (2 bytes) - use version 1
metadata = append(metadata, 0, 1)
// Topics count (4 bytes)
topicsCount := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCount, uint32(len(topics)))
metadata = append(metadata, topicsCount...)
// Topics (string array)
for _, topic := range topics {
topicLen := make([]byte, 2)
binary.BigEndian.PutUint16(topicLen, uint16(len(topic)))
metadata = append(metadata, topicLen...)
metadata = append(metadata, []byte(topic)...)
}
// UserData length and data (4 bytes + data)
userDataLen := make([]byte, 4)
binary.BigEndian.PutUint32(userDataLen, uint32(len(userData)))
metadata = append(metadata, userDataLen...)
metadata = append(metadata, userData...)
return metadata
}
// ValidateAssignmentStrategy checks if an assignment strategy is supported
func ValidateAssignmentStrategy(strategy string) bool {
supportedStrategies := map[string]bool{
"range": true,
"roundrobin": true,
"sticky": true,
"cooperative-sticky": false, // Not yet implemented
}
return supportedStrategies[strategy]
}
// ExtractTopicsFromMetadata extracts topic list from protocol metadata with fallback
func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []string) []string {
for _, protocol := range protocols {
if ValidateAssignmentStrategy(protocol.Name) {
parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
if err != nil {
fmt.Printf("DEBUG: Failed to parse protocol metadata: %v\n", err)
continue
}
if len(parsed.Topics) > 0 {
fmt.Printf("DEBUG: Extracted %d topics from protocol\n", len(parsed.Topics))
return parsed.Topics
}
}
}
// Fallback to provided topics or default
if len(fallbackTopics) > 0 {
fmt.Printf("DEBUG: Using fallback topics (%d topics)\n", len(fallbackTopics))
return fallbackTopics
}
fmt.Printf("DEBUG: No topics found, using default test topic\n")
return []string{"test-topic"}
}
// SelectBestProtocol chooses the best assignment protocol from available options
func SelectBestProtocol(protocols []GroupProtocol, groupProtocols []string) string {
// Priority order: sticky > roundrobin > range
protocolPriority := []string{"sticky", "roundrobin", "range"}
// Find supported protocols in client's list
clientProtocols := make(map[string]bool)
for _, protocol := range protocols {
if ValidateAssignmentStrategy(protocol.Name) {
clientProtocols[protocol.Name] = true
}
}
// Find supported protocols in group's list
groupProtocolSet := make(map[string]bool)
for _, protocol := range groupProtocols {
groupProtocolSet[protocol] = true
}
// Select highest priority protocol that both client and group support
for _, preferred := range protocolPriority {
if clientProtocols[preferred] && (len(groupProtocols) == 0 || groupProtocolSet[preferred]) {
return preferred
}
}
// Fallback to first supported protocol from client
for _, protocol := range protocols {
if ValidateAssignmentStrategy(protocol.Name) {
return protocol.Name
}
}
// Last resort
return "range"
}
// SanitizeConsumerGroupID validates and sanitizes consumer group ID
func SanitizeConsumerGroupID(groupID string) (string, error) {
if len(groupID) == 0 {
return "", fmt.Errorf("empty group ID")
}
if len(groupID) > 255 {
return "", fmt.Errorf("group ID too long: %d characters (max 255)", len(groupID))
}
// Basic validation: no control characters
for _, char := range groupID {
if char < 32 || char == 127 {
return "", fmt.Errorf("group ID contains invalid characters")
}
}
return strings.TrimSpace(groupID), nil
}
// ProtocolMetadataDebugInfo returns debug information about protocol metadata
type ProtocolMetadataDebugInfo struct {
Strategy string
Version int16
TopicCount int
Topics []string
UserDataSize int
ParsedOK bool
ParseError string
}
// AnalyzeProtocolMetadata provides detailed debug information about protocol metadata
func AnalyzeProtocolMetadata(protocols []GroupProtocol) []ProtocolMetadataDebugInfo {
result := make([]ProtocolMetadataDebugInfo, 0, len(protocols))
for _, protocol := range protocols {
info := ProtocolMetadataDebugInfo{
Strategy: protocol.Name,
}
parsed, err := ParseConsumerProtocolMetadata(protocol.Metadata, protocol.Name)
if err != nil {
info.ParsedOK = false
info.ParseError = err.Error()
} else {
info.ParsedOK = true
info.Version = parsed.Version
info.TopicCount = len(parsed.Topics)
info.Topics = parsed.Topics
info.UserDataSize = len(parsed.UserData)
}
result = append(result, info)
}
return result
}