1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/flexible_versions.go
2025-09-13 18:49:04 -07:00

402 lines
10 KiB
Go

package protocol
import (
"encoding/binary"
"fmt"
)
// FlexibleVersions provides utilities for handling Kafka flexible versions protocol
// Flexible versions use compact arrays/strings and tagged fields for backward compatibility
// CompactArrayLength encodes a length for compact arrays
// Compact arrays encode length as length+1, where 0 means empty array
func CompactArrayLength(length uint32) []byte {
if length == 0 {
return []byte{0} // Empty array
}
return EncodeUvarint(length + 1)
}
// DecodeCompactArrayLength decodes a compact array length
// Returns the actual length and number of bytes consumed
func DecodeCompactArrayLength(data []byte) (uint32, int, error) {
if len(data) == 0 {
return 0, 0, fmt.Errorf("no data for compact array length")
}
if data[0] == 0 {
return 0, 1, nil // Empty array
}
length, consumed, err := DecodeUvarint(data)
if err != nil {
return 0, 0, fmt.Errorf("decode compact array length: %w", err)
}
if length == 0 {
return 0, consumed, fmt.Errorf("invalid compact array length encoding")
}
return length - 1, consumed, nil
}
// CompactStringLength encodes a length for compact strings
// Compact strings encode length as length+1, where 0 means null string
func CompactStringLength(length int) []byte {
if length < 0 {
return []byte{0} // Null string
}
return EncodeUvarint(uint32(length + 1))
}
// DecodeCompactStringLength decodes a compact string length
// Returns the actual length (-1 for null), and number of bytes consumed
func DecodeCompactStringLength(data []byte) (int, int, error) {
if len(data) == 0 {
return 0, 0, fmt.Errorf("no data for compact string length")
}
if data[0] == 0 {
return -1, 1, nil // Null string
}
length, consumed, err := DecodeUvarint(data)
if err != nil {
return 0, 0, fmt.Errorf("decode compact string length: %w", err)
}
if length == 0 {
return 0, consumed, fmt.Errorf("invalid compact string length encoding")
}
return int(length - 1), consumed, nil
}
// EncodeUvarint encodes an unsigned integer using variable-length encoding
// This is used for compact arrays, strings, and tagged fields
func EncodeUvarint(value uint32) []byte {
var buf []byte
for value >= 0x80 {
buf = append(buf, byte(value)|0x80)
value >>= 7
}
buf = append(buf, byte(value))
return buf
}
// DecodeUvarint decodes a variable-length unsigned integer
// Returns the decoded value and number of bytes consumed
func DecodeUvarint(data []byte) (uint32, int, error) {
var value uint32
var shift uint
var consumed int
for i, b := range data {
consumed = i + 1
value |= uint32(b&0x7F) << shift
if (b & 0x80) == 0 {
return value, consumed, nil
}
shift += 7
if shift >= 32 {
return 0, consumed, fmt.Errorf("uvarint overflow")
}
}
return 0, consumed, fmt.Errorf("incomplete uvarint")
}
// TaggedField represents a tagged field in flexible versions
type TaggedField struct {
Tag uint32
Data []byte
}
// TaggedFields represents a collection of tagged fields
type TaggedFields struct {
Fields []TaggedField
}
// EncodeTaggedFields encodes tagged fields for flexible versions
func (tf *TaggedFields) Encode() []byte {
if len(tf.Fields) == 0 {
return []byte{0} // Empty tagged fields
}
var buf []byte
// Number of tagged fields
buf = append(buf, EncodeUvarint(uint32(len(tf.Fields)))...)
for _, field := range tf.Fields {
// Tag
buf = append(buf, EncodeUvarint(field.Tag)...)
// Size
buf = append(buf, EncodeUvarint(uint32(len(field.Data)))...)
// Data
buf = append(buf, field.Data...)
}
return buf
}
// DecodeTaggedFields decodes tagged fields from flexible versions
func DecodeTaggedFields(data []byte) (*TaggedFields, int, error) {
if len(data) == 0 {
return &TaggedFields{}, 0, fmt.Errorf("no data for tagged fields")
}
if data[0] == 0 {
return &TaggedFields{}, 1, nil // Empty tagged fields
}
offset := 0
// Number of tagged fields
numFields, consumed, err := DecodeUvarint(data[offset:])
if err != nil {
return nil, 0, fmt.Errorf("decode tagged fields count: %w", err)
}
offset += consumed
fields := make([]TaggedField, numFields)
for i := uint32(0); i < numFields; i++ {
// Tag
tag, consumed, err := DecodeUvarint(data[offset:])
if err != nil {
return nil, 0, fmt.Errorf("decode tagged field %d tag: %w", i, err)
}
offset += consumed
// Size
size, consumed, err := DecodeUvarint(data[offset:])
if err != nil {
return nil, 0, fmt.Errorf("decode tagged field %d size: %w", i, err)
}
offset += consumed
// Data
if offset+int(size) > len(data) {
return nil, 0, fmt.Errorf("tagged field %d data truncated", i)
}
fields[i] = TaggedField{
Tag: tag,
Data: data[offset : offset+int(size)],
}
offset += int(size)
}
return &TaggedFields{Fields: fields}, offset, nil
}
// IsFlexibleVersion determines if an API version uses flexible versions
// This is API-specific and based on when each API adopted flexible versions
func IsFlexibleVersion(apiKey, apiVersion uint16) bool {
switch apiKey {
case 18: // ApiVersions
return apiVersion >= 3
case 3: // Metadata
return apiVersion >= 9
case 1: // Fetch
return apiVersion >= 12
case 0: // Produce
return apiVersion >= 9
case 11: // JoinGroup
return apiVersion >= 6
case 14: // SyncGroup
return apiVersion >= 4
case 8: // OffsetCommit
return apiVersion >= 8
case 9: // OffsetFetch
return apiVersion >= 6
case 10: // FindCoordinator
return apiVersion >= 3
case 12: // Heartbeat
return apiVersion >= 4
case 13: // LeaveGroup
return apiVersion >= 4
case 19: // CreateTopics
return apiVersion >= 2
case 20: // DeleteTopics
return apiVersion >= 4
default:
return false
}
}
// FlexibleString encodes a string for flexible versions (compact format)
func FlexibleString(s string) []byte {
if s == "" {
return []byte{0} // Null string
}
var buf []byte
buf = append(buf, CompactStringLength(len(s))...)
buf = append(buf, []byte(s)...)
return buf
}
// FlexibleNullableString encodes a nullable string for flexible versions
func FlexibleNullableString(s *string) []byte {
if s == nil {
return []byte{0} // Null string
}
return FlexibleString(*s)
}
// DecodeFlexibleString decodes a flexible string
// Returns the string (empty for null) and bytes consumed
func DecodeFlexibleString(data []byte) (string, int, error) {
length, consumed, err := DecodeCompactStringLength(data)
if err != nil {
return "", 0, err
}
if length < 0 {
return "", consumed, nil // Null string -> empty string
}
if consumed+length > len(data) {
return "", 0, fmt.Errorf("string data truncated")
}
return string(data[consumed : consumed+length]), consumed + length, nil
}
// FlexibleVersionHeader handles the request header parsing for flexible versions
type FlexibleVersionHeader struct {
APIKey uint16
APIVersion uint16
CorrelationID uint32
ClientID *string
TaggedFields *TaggedFields
}
// parseRegularHeader parses a regular (non-flexible) Kafka request header
func parseRegularHeader(data []byte) (*FlexibleVersionHeader, []byte, error) {
if len(data) < 8 {
return nil, nil, fmt.Errorf("header too short")
}
header := &FlexibleVersionHeader{}
offset := 0
// API Key (2 bytes)
header.APIKey = binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
// API Version (2 bytes)
header.APIVersion = binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
// Correlation ID (4 bytes)
header.CorrelationID = binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
// Regular versions use standard strings
if len(data) < offset+2 {
return nil, nil, fmt.Errorf("missing client_id length")
}
clientIDLen := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
offset += 2
if clientIDLen >= 0 {
if len(data) < offset+int(clientIDLen) {
return nil, nil, fmt.Errorf("client_id truncated")
}
clientID := string(data[offset : offset+int(clientIDLen)])
header.ClientID = &clientID
offset += int(clientIDLen)
}
return header, data[offset:], nil
}
// ParseRequestHeader parses a Kafka request header, handling both regular and flexible versions
func ParseRequestHeader(data []byte) (*FlexibleVersionHeader, []byte, error) {
if len(data) < 8 {
return nil, nil, fmt.Errorf("header too short")
}
header := &FlexibleVersionHeader{}
offset := 0
// API Key (2 bytes)
header.APIKey = binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
// API Version (2 bytes)
header.APIVersion = binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
// Correlation ID (4 bytes)
header.CorrelationID = binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
// Client ID handling depends on flexible version
isFlexible := IsFlexibleVersion(header.APIKey, header.APIVersion)
if isFlexible {
// Flexible versions use compact strings
clientID, consumed, err := DecodeFlexibleString(data[offset:])
if err != nil {
return nil, nil, fmt.Errorf("decode flexible client_id: %w", err)
}
offset += consumed
if clientID != "" {
header.ClientID = &clientID
}
// Parse tagged fields in header
taggedFields, consumed, err := DecodeTaggedFields(data[offset:])
if err != nil {
// If tagged fields parsing fails, this might be a regular header sent by kafka-go
// Fall back to regular header parsing
return parseRegularHeader(data)
}
offset += consumed
header.TaggedFields = taggedFields
} else {
// Regular versions use standard strings
if len(data) < offset+2 {
return nil, nil, fmt.Errorf("missing client_id length")
}
clientIDLen := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
offset += 2
if clientIDLen >= 0 {
if len(data) < offset+int(clientIDLen) {
return nil, nil, fmt.Errorf("client_id truncated")
}
clientID := string(data[offset : offset+int(clientIDLen)])
header.ClientID = &clientID
offset += int(clientIDLen)
}
// No tagged fields in regular versions
}
return header, data[offset:], nil
}
// EncodeFlexibleResponse encodes a response with proper flexible version formatting
func EncodeFlexibleResponse(correlationID uint32, data []byte, hasTaggedFields bool) []byte {
response := make([]byte, 4)
binary.BigEndian.PutUint32(response, correlationID)
response = append(response, data...)
if hasTaggedFields {
// Add empty tagged fields for flexible responses
response = append(response, 0)
}
return response
}