1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/handler.go
2025-09-18 00:43:29 -07:00

2888 lines
96 KiB
Go

package protocol
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// TopicInfo holds basic information about a topic
type TopicInfo struct {
Name string
Partitions int32
CreatedAt int64
}
// TopicPartitionKey uniquely identifies a topic partition
type TopicPartitionKey struct {
Topic string
Partition int32
}
// TopicMetadata holds schema and configuration information for a topic
type TopicMetadata struct {
TopicName string `json:"topic_name"`
IsSchematized bool `json:"is_schematized"`
SchemaFormat string `json:"schema_format,omitempty"`
SchemaContent string `json:"schema_content,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// CachedTopicMetadata holds topic metadata with cache timestamp
type CachedTopicMetadata struct {
Metadata *TopicMetadata
CachedAt time.Time
}
const (
// KafkaMetadataFile is the filename for Kafka-specific metadata stored alongside SMQ's topic.conf
// This is separate from SMQ's topic.conf and contains Kafka-specific schema and configuration
KafkaMetadataFile = "kafka_metadata.json"
// DefaultKafkaNamespace is the default namespace for Kafka topics in SeaweedMQ
DefaultKafkaNamespace = "kafka"
// TopicMetadataCacheTTL is how long to cache topic metadata
TopicMetadataCacheTTL = 5 * time.Minute
)
// SeaweedMQHandlerInterface defines the interface for SeaweedMQ integration
type SeaweedMQHandlerInterface interface {
TopicExists(topic string) bool
ListTopics() []string
CreateTopic(topic string, partitions int32) error
DeleteTopic(topic string) error
GetOrCreateLedger(topic string, partition int32) *offset.Ledger
GetLedger(topic string, partition int32) *offset.Ledger
ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error)
// GetFilerClient returns a filer client for accessing SeaweedMQ metadata (optional)
GetFilerClient() filer_pb.SeaweedFilerClient
// GetBrokerAddresses returns the discovered SMQ broker addresses for Metadata responses
GetBrokerAddresses() []string
Close() error
}
// Handler processes Kafka protocol requests from clients using SeaweedMQ
type Handler struct {
// SeaweedMQ integration
seaweedMQHandler SeaweedMQHandlerInterface
// SMQ offset storage for consumer group offsets
smqOffsetStorage *offset.SMQOffsetStorage
// Consumer group coordination
groupCoordinator *consumer.GroupCoordinator
// Coordinator registry for distributed coordinator assignment
coordinatorRegistry CoordinatorRegistryInterface
// Schema management (optional, for schematized topics)
schemaManager *schema.Manager
useSchema bool
brokerClient *schema.BrokerClient
// Topic metadata cache with TTL
topicMetadataCache map[string]*CachedTopicMetadata
metadataCacheMu sync.RWMutex
filerClient filer_pb.SeaweedFilerClient
// SMQ broker addresses discovered from masters for Metadata responses
smqBrokerAddresses []string
// Gateway address for coordinator registry
gatewayAddress string
// Connection context for tracking client information
connContext *ConnectionContext
}
// NewHandler creates a basic Kafka handler with in-memory storage
// WARNING: This is for testing ONLY - never use in production!
// For production use with persistent storage, use NewSeaweedMQBrokerHandler instead
func NewHandler() *Handler {
// Production safety check - prevent accidental production use
// Comment out for testing: os.Getenv can be used for runtime checks
panic("NewHandler() with in-memory storage should NEVER be used in production! Use NewSeaweedMQBrokerHandler() with SeaweedMQ masters for production, or NewTestHandler() for tests.")
}
// NewTestHandler and NewSimpleTestHandler moved to handler_test.go (test-only file)
// All test-related types and implementations moved to handler_test.go (test-only file)
// NewSeaweedMQHandler creates a new handler with SeaweedMQ integration
func NewSeaweedMQHandler(agentAddress string) (*Handler, error) {
smqHandler, err := integration.NewSeaweedMQHandler(agentAddress)
if err != nil {
return nil, err
}
return &Handler{
seaweedMQHandler: smqHandler,
groupCoordinator: consumer.NewGroupCoordinator(),
topicMetadataCache: make(map[string]*CachedTopicMetadata),
smqBrokerAddresses: nil, // Will be set by SetSMQBrokerAddresses() when server starts
}, nil
}
// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration
func NewSeaweedMQBrokerHandler(masters string, filerGroup string, clientHost string) (*Handler, error) {
// Set up SeaweedMQ integration
smqHandler, err := integration.NewSeaweedMQBrokerHandler(masters, filerGroup, clientHost)
if err != nil {
return nil, err
}
// The integration layer already handles master address parsing and filer discovery
// Get filer client from SMQ handler for metadata access
filerClient := smqHandler.GetFilerClient()
if filerClient == nil {
return nil, fmt.Errorf("no filer client available from SMQ handler - filer discovery may have failed")
}
// Create SMQ offset storage using the proper filer address from integration layer
filerAddress := smqHandler.GetFilerAddress()
if filerAddress == "" {
return nil, fmt.Errorf("no filer address available from SMQ handler - filer discovery may have failed")
}
smqOffsetStorage, err := offset.NewSMQOffsetStorage(filerAddress)
if err != nil {
return nil, fmt.Errorf("failed to create SMQ offset storage with filer %s: %w", filerAddress, err)
}
// filerClient is already obtained above
return &Handler{
seaweedMQHandler: smqHandler,
smqOffsetStorage: smqOffsetStorage,
groupCoordinator: consumer.NewGroupCoordinator(),
topicMetadataCache: make(map[string]*CachedTopicMetadata),
filerClient: filerClient,
smqBrokerAddresses: nil, // Will be set by SetSMQBrokerAddresses() when server starts
}, nil
}
// AddTopicForTesting creates a topic for testing purposes
// This delegates to the underlying SeaweedMQ handler
func (h *Handler) AddTopicForTesting(topicName string, partitions int32) {
if h.seaweedMQHandler != nil {
h.seaweedMQHandler.CreateTopic(topicName, partitions)
}
}
// Delegate methods to SeaweedMQ handler
// GetOrCreateLedger delegates to SeaweedMQ handler
func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
return h.seaweedMQHandler.GetOrCreateLedger(topic, partition)
}
// GetLedger delegates to SeaweedMQ handler
func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
return h.seaweedMQHandler.GetLedger(topic, partition)
}
// Close shuts down the handler and all connections
func (h *Handler) Close() error {
// Close group coordinator
if h.groupCoordinator != nil {
h.groupCoordinator.Close()
}
// Close broker client if present
if h.brokerClient != nil {
if err := h.brokerClient.Close(); err != nil {
Warning("Failed to close broker client: %v", err)
}
}
// Close SeaweedMQ handler if present
if h.seaweedMQHandler != nil {
return h.seaweedMQHandler.Close()
}
return nil
}
// StoreRecordBatch stores a record batch for later retrieval during Fetch operations
func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) {
// Record batch storage is now handled by the SeaweedMQ handler
Debug("StoreRecordBatch delegated to SeaweedMQ handler - partition:%d, offset:%d",
partition, baseOffset)
}
// GetRecordBatch retrieves a stored record batch that contains the requested offset
func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) {
// Record batch retrieval is now handled by the SeaweedMQ handler
Debug("GetRecordBatch delegated to SeaweedMQ handler - partition:%d, offset:%d",
partition, offset)
return nil, false
}
// getRecordCountFromBatch extracts the record count from a Kafka record batch
func (h *Handler) getRecordCountFromBatch(batch []byte) int32 {
// Kafka record batch format:
// base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) +
// attributes (2) + last_offset_delta (4) + first_timestamp (8) + max_timestamp (8) +
// producer_id (8) + producer_epoch (2) + base_sequence (4) + records_count (4) + records...
// The record count is at offset 57 (8+4+4+1+4+2+4+8+8+8+2+4 = 57)
if len(batch) < 61 { // 57 + 4 bytes for record count
return 0
}
recordCount := binary.BigEndian.Uint32(batch[57:61])
return int32(recordCount)
}
// SetSMQBrokerAddresses updates the SMQ broker addresses used in Metadata responses
func (h *Handler) SetSMQBrokerAddresses(brokerAddresses []string) {
h.smqBrokerAddresses = brokerAddresses
}
// GetSMQBrokerAddresses returns the SMQ broker addresses
func (h *Handler) GetSMQBrokerAddresses() []string {
// First try to get from the SeaweedMQ handler (preferred)
if h.seaweedMQHandler != nil {
if brokerAddresses := h.seaweedMQHandler.GetBrokerAddresses(); len(brokerAddresses) > 0 {
return brokerAddresses
}
}
// Fallback to manually set addresses
if len(h.smqBrokerAddresses) > 0 {
return h.smqBrokerAddresses
}
// Final fallback for testing
return []string{"localhost:17777"}
}
// GetGatewayAddress returns the current gateway address as a string (for coordinator registry)
func (h *Handler) GetGatewayAddress() string {
if h.gatewayAddress != "" {
return h.gatewayAddress
}
// Fallback for testing
return "localhost:9092"
}
// SetGatewayAddress sets the gateway address for coordinator registry
func (h *Handler) SetGatewayAddress(address string) {
h.gatewayAddress = address
}
// SetCoordinatorRegistry sets the coordinator registry for this handler
func (h *Handler) SetCoordinatorRegistry(registry CoordinatorRegistryInterface) {
h.coordinatorRegistry = registry
}
// GetCoordinatorRegistry returns the coordinator registry
func (h *Handler) GetCoordinatorRegistry() CoordinatorRegistryInterface {
return h.coordinatorRegistry
}
// parseBrokerAddress parses a broker address string (host:port) into host and port
func (h *Handler) parseBrokerAddress(address string) (host string, port int, err error) {
// Split by the last colon to handle IPv6 addresses
lastColon := strings.LastIndex(address, ":")
if lastColon == -1 {
return "", 0, fmt.Errorf("invalid broker address format: %s", address)
}
host = address[:lastColon]
portStr := address[lastColon+1:]
port, err = strconv.Atoi(portStr)
if err != nil {
return "", 0, fmt.Errorf("invalid port in broker address %s: %v", address, err)
}
return host, port, nil
}
// HandleConn processes a single client connection
func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr())
// Record connection metrics
RecordConnectionMetrics()
// Set connection context for this connection
h.connContext = &ConnectionContext{
RemoteAddr: conn.RemoteAddr(),
LocalAddr: conn.LocalAddr(),
ConnectionID: connectionID,
}
defer func() {
Debug("[%s] Connection closing", connectionID)
RecordDisconnectionMetrics()
h.connContext = nil // Clear connection context
conn.Close()
}()
r := bufio.NewReader(conn)
w := bufio.NewWriter(conn)
defer w.Flush()
// Use default timeout config
timeoutConfig := DefaultTimeoutConfig()
for {
// Check if context is cancelled
select {
case <-ctx.Done():
Debug("[%s] Context cancelled, closing connection", connectionID)
return ctx.Err()
default:
}
// Set a read deadline for the connection based on context or default timeout
var readDeadline time.Time
var timeoutDuration time.Duration
if deadline, ok := ctx.Deadline(); ok {
readDeadline = deadline
timeoutDuration = time.Until(deadline)
Debug("[%s] Using context deadline: %v", connectionID, timeoutDuration)
} else {
// Use configurable read timeout instead of hardcoded 5 seconds
timeoutDuration = timeoutConfig.ReadTimeout
readDeadline = time.Now().Add(timeoutDuration)
Debug("[%s] Using config timeout: %v", connectionID, timeoutDuration)
}
if err := conn.SetReadDeadline(readDeadline); err != nil {
Debug("[%s] Failed to set read deadline: %v", connectionID, err)
return fmt.Errorf("set read deadline: %w", err)
}
// Check context before reading
select {
case <-ctx.Done():
Debug("[%s] Context cancelled before reading message header", connectionID)
// Give a small delay to ensure proper cleanup
time.Sleep(100 * time.Millisecond)
return ctx.Err()
default:
// If context is close to being cancelled, set a very short timeout
if deadline, ok := ctx.Deadline(); ok {
timeUntilDeadline := time.Until(deadline)
if timeUntilDeadline < 2*time.Second && timeUntilDeadline > 0 {
shortDeadline := time.Now().Add(500 * time.Millisecond)
if err := conn.SetReadDeadline(shortDeadline); err == nil {
Debug("[%s] Context deadline approaching, using 500ms timeout", connectionID)
}
}
}
}
// Read message size (4 bytes)
Debug("[%s] About to read message size header", connectionID)
var sizeBytes [4]byte
if _, err := io.ReadFull(r, sizeBytes[:]); err != nil {
if err == io.EOF {
Debug("[%s] Client closed connection (clean EOF)", connectionID)
return nil
}
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// Idle timeout while waiting for next request; keep connection open
Debug("[%s] Read timeout waiting for request, continuing", connectionID)
continue
}
Debug("[%s] Read error: %v", connectionID, err)
return fmt.Errorf("read message size: %w", err)
}
// Successfully read the message size
size := binary.BigEndian.Uint32(sizeBytes[:])
Debug("[%s] Read message size header: %d bytes", connectionID, size)
if size == 0 || size > 1024*1024 { // 1MB limit
// Use standardized error for message size limit
Debug("[%s] Invalid message size: %d (limit: 1MB)", connectionID, size)
// Send error response for message too large
errorResponse := BuildErrorResponse(0, ErrorCodeMessageTooLarge) // correlation ID 0 since we can't parse it yet
if writeErr := h.writeResponseWithTimeout(w, errorResponse, timeoutConfig.WriteTimeout); writeErr != nil {
Debug("[%s] Failed to send message too large response: %v", connectionID, writeErr)
}
return fmt.Errorf("message size %d exceeds limit", size)
}
// Set read deadline for message body
if err := conn.SetReadDeadline(time.Now().Add(timeoutConfig.ReadTimeout)); err != nil {
Debug("[%s] Failed to set message read deadline: %v", connectionID, err)
}
// Read the message
messageBuf := make([]byte, size)
if _, err := io.ReadFull(r, messageBuf); err != nil {
errorCode := HandleTimeoutError(err, "read")
Debug("[%s] Error reading message body: %v (code: %d)", connectionID, err, errorCode)
return fmt.Errorf("read message: %w", err)
}
// Parse at least the basic header to get API key and correlation ID
if len(messageBuf) < 8 {
return fmt.Errorf("message too short")
}
apiKey := binary.BigEndian.Uint16(messageBuf[0:2])
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
apiName := getAPIName(apiKey)
// Validate API version against what we support
if err := h.validateAPIVersion(apiKey, apiVersion); err != nil {
// Return proper Kafka error response for unsupported version
response, writeErr := h.buildUnsupportedVersionResponse(correlationID, apiKey, apiVersion)
if writeErr != nil {
return fmt.Errorf("build error response: %w", writeErr)
}
// Send error response and continue to next request
if writeErr := h.writeResponseWithTimeout(w, response, timeoutConfig.WriteTimeout); writeErr != nil {
Debug("[%s] Failed to send unsupported version response: %v", connectionID, writeErr)
return fmt.Errorf("send error response: %w", writeErr)
}
continue
}
// Parse header using flexible version utilities for validation and client ID extraction
header, requestBody, parseErr := ParseRequestHeader(messageBuf)
if parseErr != nil {
// Fall back to basic header parsing if flexible version parsing fails
Debug("Flexible header parsing failed, using basic parsing: %v", parseErr)
// Basic header parsing fallback (original logic)
bodyOffset := 8
if len(messageBuf) < bodyOffset+2 {
return fmt.Errorf("invalid header: missing client_id length")
}
clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2]))
bodyOffset += 2
if clientIDLen >= 0 {
if len(messageBuf) < bodyOffset+int(clientIDLen) {
return fmt.Errorf("invalid header: client_id truncated")
}
bodyOffset += int(clientIDLen)
}
requestBody = messageBuf[bodyOffset:]
} else {
// Validate parsed header matches what we already extracted
if header.APIKey != apiKey || header.APIVersion != apiVersion || header.CorrelationID != correlationID {
Debug("Header parsing mismatch - using basic parsing as fallback")
// Fall back to basic parsing rather than failing
bodyOffset := 8
if len(messageBuf) < bodyOffset+2 {
return fmt.Errorf("invalid header: missing client_id length")
}
clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2]))
bodyOffset += 2
if clientIDLen >= 0 {
if len(messageBuf) < bodyOffset+int(clientIDLen) {
return fmt.Errorf("invalid header: client_id truncated")
}
bodyOffset += int(clientIDLen)
}
requestBody = messageBuf[bodyOffset:]
} else if header.ClientID != nil {
// Log client ID if available and parsing was successful
Debug("Client ID: %s", *header.ClientID)
}
}
// Handle the request based on API key and version
var response []byte
var err error
// Record request start time for latency tracking
requestStart := time.Now()
Debug("API REQUEST - Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(apiKey), apiVersion, correlationID)
switch apiKey {
case 18: // ApiVersions
response, err = h.handleApiVersions(correlationID, apiVersion)
case 3: // Metadata
response, err = h.handleMetadata(correlationID, apiVersion, requestBody)
case 2: // ListOffsets
Debug("*** LISTOFFSETS REQUEST RECEIVED *** Correlation: %d, Version: %d", correlationID, apiVersion)
response, err = h.handleListOffsets(correlationID, apiVersion, requestBody)
case 19: // CreateTopics
response, err = h.handleCreateTopics(correlationID, apiVersion, requestBody)
case 20: // DeleteTopics
response, err = h.handleDeleteTopics(correlationID, requestBody)
case 0: // Produce
response, err = h.handleProduce(correlationID, apiVersion, requestBody)
case 1: // Fetch
response, err = h.handleFetch(ctx, correlationID, apiVersion, requestBody)
case 11: // JoinGroup
response, err = h.handleJoinGroup(correlationID, apiVersion, requestBody)
case 14: // SyncGroup
response, err = h.handleSyncGroup(correlationID, apiVersion, requestBody)
case 8: // OffsetCommit
response, err = h.handleOffsetCommit(correlationID, requestBody)
case 9: // OffsetFetch
response, err = h.handleOffsetFetch(correlationID, apiVersion, requestBody)
case 10: // FindCoordinator
response, err = h.handleFindCoordinator(correlationID, apiVersion, requestBody)
case 12: // Heartbeat
response, err = h.handleHeartbeat(correlationID, requestBody)
case 13: // LeaveGroup
response, err = h.handleLeaveGroup(correlationID, apiVersion, requestBody)
case 15: // DescribeGroups
Debug("DescribeGroups request received, correlation: %d, version: %d", correlationID, apiVersion)
response, err = h.handleDescribeGroups(correlationID, apiVersion, requestBody)
case 16: // ListGroups
Debug("ListGroups request received, correlation: %d, version: %d", correlationID, apiVersion)
response, err = h.handleListGroups(correlationID, apiVersion, requestBody)
case 32: // DescribeConfigs
Debug("DescribeConfigs request received, correlation: %d, version: %d", correlationID, apiVersion)
response, err = h.handleDescribeConfigs(correlationID, apiVersion, requestBody)
default:
Warning("Unsupported API key: %d (%s) v%d - Correlation: %d", apiKey, apiName, apiVersion, correlationID)
err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion)
}
// Record metrics based on success/error
requestLatency := time.Since(requestStart)
if err != nil {
RecordErrorMetrics(apiKey, requestLatency)
return fmt.Errorf("handle request: %w", err)
}
// Send response with timeout handling
Debug("[%s] Sending %s response: %d bytes", connectionID, getAPIName(apiKey), len(response))
if err := h.writeResponseWithTimeout(w, response, timeoutConfig.WriteTimeout); err != nil {
errorCode := HandleTimeoutError(err, "write")
Error("[%s] Error sending response: %v (code: %d)", connectionID, err, errorCode)
RecordErrorMetrics(apiKey, requestLatency)
return fmt.Errorf("send response: %w", err)
}
// Record successful request metrics
RecordRequestMetrics(apiKey, requestLatency)
// Minimal flush logging
// Debug("API %d flushed", apiKey)
}
}
func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([]byte, error) {
// Build ApiVersions response supporting flexible versions (v3+)
isFlexible := IsFlexibleVersion(18, apiVersion)
response := make([]byte, 0, 128)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Error code (0 = no error)
response = append(response, 0, 0)
// Number of API keys - use compact or regular array format based on version
apiKeysCount := uint32(16)
if isFlexible {
// Compact array format for flexible versions
response = append(response, CompactArrayLength(apiKeysCount)...)
} else {
// Regular array format for older versions
response = append(response, 0, 0, 0, 16) // 16 API keys
}
// API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 18) // API key 18
response = append(response, 0, 0) // min version 0
response = append(response, 0, 3) // max version 3
if isFlexible {
// per-element tagged fields (empty)
response = append(response, 0)
}
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
if isFlexible {
response = append(response, 0)
}
// API Key 2 (ListOffsets): limit to v2 (implemented and tested)
response = append(response, 0, 2) // API key 2
response = append(response, 0, 0) // min version 0
response = append(response, 0, 2) // max version 2
if isFlexible {
response = append(response, 0)
}
// API Key 19 (CreateTopics): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 19) // API key 19
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
if isFlexible {
response = append(response, 0)
}
// API Key 20 (DeleteTopics): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 20) // API key 20
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
if isFlexible {
response = append(response, 0)
}
// API Key 0 (Produce): api_key(2) + min_version(2) + max_version(2)
// Support v7 for Sarama compatibility (Kafka 2.1.0)
response = append(response, 0, 0) // API key 0
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
if isFlexible {
response = append(response, 0)
}
// API Key 1 (Fetch): limit to v7 (current handler semantics)
response = append(response, 0, 1) // API key 1
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
if isFlexible {
response = append(response, 0)
}
// API Key 11 (JoinGroup): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 11) // API key 11
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
if isFlexible {
response = append(response, 0)
}
// API Key 14 (SyncGroup): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 14) // API key 14
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
if isFlexible {
response = append(response, 0)
}
// API Key 8 (OffsetCommit): limit to v2 for current implementation
response = append(response, 0, 8) // API key 8
response = append(response, 0, 0) // min version 0
response = append(response, 0, 2) // max version 2
if isFlexible {
response = append(response, 0)
}
// API Key 9 (OffsetFetch): supports up to v5 (with leader epoch and throttle time)
response = append(response, 0, 9) // API key 9
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
if isFlexible {
response = append(response, 0)
}
// API Key 10 (FindCoordinator): limit to v2 (implemented)
response = append(response, 0, 10) // API key 10
response = append(response, 0, 0) // min version 0
response = append(response, 0, 2) // max version 2
if isFlexible {
response = append(response, 0)
}
// API Key 12 (Heartbeat): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 12) // API key 12
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
if isFlexible {
response = append(response, 0)
}
// API Key 13 (LeaveGroup): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 13) // API key 13
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
if isFlexible {
response = append(response, 0)
}
// API Key 15 (DescribeGroups): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 15) // API key 15
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
if isFlexible {
response = append(response, 0)
}
// API Key 16 (ListGroups): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 16) // API key 16
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
if isFlexible {
response = append(response, 0)
}
// ApiVersions v1+ include throttle_time_ms
if apiVersion >= 1 {
response = append(response, 0, 0, 0, 0) // throttle_time_ms = 0
}
// Add tagged fields for flexible versions
if isFlexible {
// Empty tagged fields for now (response-level)
response = append(response, 0)
}
Debug("ApiVersions v%d response: %d bytes", apiVersion, len(response))
return response, nil
}
// handleMetadataV0 implements the Metadata API response in version 0 format.
// v0 response layout:
// correlation_id(4) + brokers(ARRAY) + topics(ARRAY)
// broker: node_id(4) + host(STRING) + port(4)
// topic: error_code(2) + name(STRING) + partitions(ARRAY)
// partition: error_code(2) + partition_id(4) + leader(4) + replicas(ARRAY<int32>) + isr(ARRAY<int32>)
func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]byte, error) {
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4)
response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions)
// Use gateway address for Kafka protocol compatibility
gatewayAddr := h.GetGatewayAddress()
host, port, err := h.parseGatewayAddress(gatewayAddr)
if err != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, err)
// Fallback to default
host, port = "localhost", 9092
}
Debug("Advertising Kafka gateway (v0) at %s:%d", host, port)
// Host (STRING: 2 bytes length + bytes)
hostLen := uint16(len(host))
response = append(response, byte(hostLen>>8), byte(hostLen))
response = append(response, []byte(host)...)
// Port (4 bytes)
portBytes := make([]byte, 4)
binary.BigEndian.PutUint32(portBytes, uint32(port))
response = append(response, portBytes...)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
Debug("🔍 METADATA v0 REQUEST - Requested: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
}
// Topics array length (4 bytes)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn)))
response = append(response, topicsCountBytes...)
// Topic entries
for _, topicName := range topicsToReturn {
// error_code(2) = 0
response = append(response, 0, 0)
// name (STRING)
nameBytes := []byte(topicName)
nameLen := uint16(len(nameBytes))
response = append(response, byte(nameLen>>8), byte(nameLen))
response = append(response, nameBytes...)
// partitions array length (4 bytes) - 1 partition
response = append(response, 0, 0, 0, 1)
// partition: error_code(2) + partition_id(4) + leader(4)
response = append(response, 0, 0) // error_code
response = append(response, 0, 0, 0, 0) // partition_id = 0
response = append(response, 0, 0, 0, 1) // leader = 1 (this broker)
// replicas: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 1)
// isr: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 1)
}
Debug("Metadata v0 response for %d topics: %v", len(topicsToReturn), topicsToReturn)
Debug("*** METADATA v0 RESPONSE DETAILS ***")
Debug("Response size: %d bytes", len(response))
Debug("Kafka Gateway: %s", h.GetGatewayAddress())
Debug("Topics: %v", topicsToReturn)
for i, topic := range topicsToReturn {
Debug("Topic[%d]: %s (1 partition)", i, topic)
}
Debug("*** END METADATA v0 RESPONSE ***")
return response, nil
}
func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) {
// Simplified Metadata v1 implementation - based on working v0 + v1 additions
// v1 adds: ControllerID (after brokers), Rack (for brokers), IsInternal (for topics)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
Debug("🔍 METADATA v1 REQUEST - Requested: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
}
// Build response using same approach as v0 but with v1 additions
response := make([]byte, 0, 256)
// Correlation ID (4 bytes)
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING)
response = append(response, 0, 0, 0, 1) // node_id = 1
// Use gateway address for Kafka protocol compatibility
gatewayAddr := h.GetGatewayAddress()
host, port, err := h.parseGatewayAddress(gatewayAddr)
if err != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, err)
// Fallback to default
host, port = "localhost", 9092
}
Debug("Advertising Kafka gateway (v1) at %s:%d", host, port)
// Host (STRING: 2 bytes length + bytes)
hostLen := uint16(len(host))
response = append(response, byte(hostLen>>8), byte(hostLen))
response = append(response, []byte(host)...)
// Port (4 bytes)
portBytes := make([]byte, 4)
binary.BigEndian.PutUint32(portBytes, uint32(port))
response = append(response, portBytes...)
// Rack (STRING: 2 bytes length + bytes) - v1 addition, non-nullable empty string
response = append(response, 0, 0) // empty string
// ControllerID (4 bytes) - v1 addition
response = append(response, 0, 0, 0, 1) // controller_id = 1
// Topics array length (4 bytes)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn)))
response = append(response, topicsCountBytes...)
// Topics
for _, topicName := range topicsToReturn {
// error_code (2 bytes)
response = append(response, 0, 0)
// topic name (STRING: 2 bytes length + bytes)
topicLen := uint16(len(topicName))
response = append(response, byte(topicLen>>8), byte(topicLen))
response = append(response, []byte(topicName)...)
// is_internal (1 byte) - v1 addition
response = append(response, 0) // false
// partitions array length (4 bytes) - 1 partition
response = append(response, 0, 0, 0, 1)
// partition 0: error_code(2) + partition_id(4) + leader_id(4) + replicas(ARRAY) + isr(ARRAY)
response = append(response, 0, 0) // error_code
response = append(response, 0, 0, 0, 0) // partition_id = 0
response = append(response, 0, 0, 0, 1) // leader_id = 1
// replicas: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 1)
// isr: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 1)
}
Debug("Metadata v1 response for %d topics: %v", len(topicsToReturn), topicsToReturn)
Debug("Metadata v1 response size: %d bytes", len(response))
return response, nil
}
// HandleMetadataV2 implements Metadata API v2 with ClusterID field
func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v2 adds ClusterID field (nullable string)
// v2 response layout: correlation_id(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
Debug("🔍 METADATA v2 REQUEST - Requested: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
}
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// Brokers array (4 bytes length + brokers) - 1 broker (this gateway)
binary.Write(&buf, binary.BigEndian, int32(1))
// Use gateway address for Kafka protocol compatibility
gatewayAddr := h.GetGatewayAddress()
host, port, err := h.parseGatewayAddress(gatewayAddr)
if err != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, err)
// Fallback to default
host, port = "localhost", 9092
}
Debug("Advertising Kafka gateway (v2) at %s:%d", host, port)
nodeID := int32(1) // Single gateway node
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
// Host (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(port))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2 addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
}
response := buf.Bytes()
Debug("Advertising Kafka gateway: %s", h.GetGatewayAddress())
Debug("Metadata v2 response for %d topics: %v", len(topicsToReturn), topicsToReturn)
return response, nil
}
// HandleMetadataV3V4 implements Metadata API v3/v4 with ThrottleTimeMs field
func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v3/v4 adds ThrottleTimeMs field at the beginning
// v3/v4 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
}
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
// Brokers array (4 bytes length + brokers) - 1 broker (this gateway)
binary.Write(&buf, binary.BigEndian, int32(1))
// Use gateway address for Kafka protocol compatibility
gatewayAddr := h.GetGatewayAddress()
host, port, err := h.parseGatewayAddress(gatewayAddr)
if err != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, err)
// Fallback to default
host, port = "localhost", 9092
}
Debug("Advertising Kafka gateway (v3/v4) at %s:%d", host, port)
nodeID := int32(1) // Single gateway node
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
// Host (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(port))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
}
response := buf.Bytes()
return response, nil
}
// HandleMetadataV5V6 implements Metadata API v5/v6 with OfflineReplicas field
func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v5/v6 adds OfflineReplicas field to partitions
// v5/v6 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
Debug("🔍 METADATA v5/v6 REQUEST - Requested: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
}
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
// Brokers array (4 bytes length + brokers) - 1 broker (this gateway)
binary.Write(&buf, binary.BigEndian, int32(1))
// Use gateway address for Kafka protocol compatibility
gatewayAddr := h.GetGatewayAddress()
host, port, err := h.parseGatewayAddress(gatewayAddr)
if err != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, err)
// Fallback to default
host, port = "localhost", 9092
}
Debug("Advertising Kafka gateway (v5/v6) at %s:%d", host, port)
nodeID := int32(1) // Single gateway node
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
// Host (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(port))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// OfflineReplicas array (4 bytes length + nodes) - v5+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas
}
response := buf.Bytes()
Debug("Advertising Kafka gateway: %s", h.GetGatewayAddress())
Debug("Metadata v5/v6 response for %d topics: %v", len(topicsToReturn), topicsToReturn)
return response, nil
}
// HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field
func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) {
// Metadata v7 adds LeaderEpoch field to partitions
// v7 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY)
// Each partition now includes: error_code(2) + partition_index(4) + leader_id(4) + leader_epoch(4) + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
Debug("🔍 METADATA v7 REQUEST - Requested: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
if len(requestedTopics) == 0 {
topicsToReturn = h.seaweedMQHandler.ListTopics()
} else {
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
}
}
}
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
// Brokers array (4 bytes length + brokers) - 1 broker (this gateway)
binary.Write(&buf, binary.BigEndian, int32(1))
// Use gateway address for Kafka protocol compatibility
gatewayAddr := h.GetGatewayAddress()
host, port, err := h.parseGatewayAddress(gatewayAddr)
if err != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, err)
// Fallback to default
host, port = "localhost", 9092
}
Debug("Advertising Kafka gateway (v7) at %s:%d", host, port)
nodeID := int32(1) // Single gateway node
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
// Host (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(port))
// Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition
// Use -1 length to indicate null
binary.Write(&buf, binary.BigEndian, int16(-1)) // Null cluster ID
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
for _, topicName := range topicsToReturn {
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// IsInternal (1 byte) - v1+ addition
buf.WriteByte(0) // false
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// LeaderEpoch (4 bytes) - v7+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // Leader epoch 0
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// OfflineReplicas array (4 bytes length + nodes) - v5+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas
}
response := buf.Bytes()
Debug("Advertising Kafka gateway: %s", h.GetGatewayAddress())
Debug("Metadata v7 response for %d topics: %v", len(topicsToReturn), topicsToReturn)
return response, nil
}
func (h *Handler) parseMetadataTopics(requestBody []byte) []string {
// Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32),
// while older assumptions may have included a client_id string first.
if len(requestBody) < 4 {
return []string{}
}
// Try path A: interpret first 4 bytes as topics_count
offset := 0
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
if topicsCount == 0xFFFFFFFF { // -1 means all topics
return []string{}
}
if topicsCount <= 1000000 { // sane bound
offset += 4
topics := make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ {
nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if offset+nameLen > len(requestBody) {
break
}
topics = append(topics, string(requestBody[offset:offset+nameLen]))
offset += nameLen
}
return topics
}
// Path B: assume leading client_id string then topics_count
if len(requestBody) < 6 {
return []string{}
}
clientIDLen := int(binary.BigEndian.Uint16(requestBody[0:2]))
offset = 2 + clientIDLen
if len(requestBody) < offset+4 {
return []string{}
}
topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
if topicsCount == 0xFFFFFFFF {
return []string{}
}
topics := make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ {
nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if offset+nameLen > len(requestBody) {
break
}
topics = append(topics, string(requestBody[offset:offset+nameLen]))
offset += nameLen
}
return topics
}
func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
Debug("ListOffsets v%d request hex dump (first 100 bytes): %x", apiVersion, requestBody[:min(100, len(requestBody))])
// Parse minimal request to understand what's being asked (header already stripped)
offset := 0
// v1+ has replica_id(4)
if apiVersion >= 1 {
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("ListOffsets v%d request missing replica_id", apiVersion)
}
replicaID := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4]))
offset += 4
Debug("ListOffsets v%d - replica_id: %d", apiVersion, replicaID)
}
// v2+ adds isolation_level(1)
if apiVersion >= 2 {
if len(requestBody) < offset+1 {
return nil, fmt.Errorf("ListOffsets v%d request missing isolation_level", apiVersion)
}
isolationLevel := requestBody[offset]
offset += 1
Debug("ListOffsets v%d - isolation_level: %d", apiVersion, isolationLevel)
}
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("ListOffsets request missing topics count")
}
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling) - v2+ only
if apiVersion >= 2 {
response = append(response, 0, 0, 0, 0)
}
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each requested topic
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize)+4 {
break
}
topicName := requestBody[offset : offset+int(topicNameSize)]
offset += int(topicNameSize)
// Parse partitions count for this topic
partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Response: topic_name_size(2) + topic_name + partitions_array
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, topicName...)
partitionsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount)
response = append(response, partitionsCountBytes...)
// Process each partition
for j := uint32(0); j < partitionsCount && offset+12 <= len(requestBody); j++ {
// Parse partition request: partition_id(4) + timestamp(8)
partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4])
timestamp := int64(binary.BigEndian.Uint64(requestBody[offset+4 : offset+12]))
offset += 12
// Response: partition_id(4) + error_code(2) + timestamp(8) + offset(8)
partitionIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionIDBytes, partitionID)
response = append(response, partitionIDBytes...)
// Error code (0 = no error)
response = append(response, 0, 0)
// Get the ledger for this topic-partition
ledger := h.GetOrCreateLedger(string(topicName), int32(partitionID))
var responseTimestamp int64
var responseOffset int64
switch timestamp {
case -2: // earliest offset
responseOffset = ledger.GetEarliestOffset()
if responseOffset == ledger.GetHighWaterMark() {
// No messages yet, return current time
responseTimestamp = time.Now().UnixNano()
} else {
// Get timestamp of earliest message
if ts, _, err := ledger.GetRecord(responseOffset); err == nil {
responseTimestamp = ts
} else {
responseTimestamp = time.Now().UnixNano()
}
}
case -1: // latest offset
responseOffset = ledger.GetLatestOffset()
if responseOffset == 0 && ledger.GetHighWaterMark() == 0 {
// No messages yet
responseTimestamp = time.Now().UnixNano()
responseOffset = 0
} else {
// Get timestamp of latest message
if ts, _, err := ledger.GetRecord(responseOffset); err == nil {
responseTimestamp = ts
} else {
responseTimestamp = time.Now().UnixNano()
}
}
default: // specific timestamp - find offset by timestamp
responseOffset = ledger.FindOffsetByTimestamp(timestamp)
responseTimestamp = timestamp
}
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(responseTimestamp))
response = append(response, timestampBytes...)
offsetBytes := make([]byte, 8)
binary.BigEndian.PutUint64(offsetBytes, uint64(responseOffset))
response = append(response, offsetBytes...)
}
}
return response, nil
}
func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
Debug("*** CREATETOPICS REQUEST RECEIVED *** Correlation: %d, Version: %d", correlationID, apiVersion)
Debug("CreateTopics - Request body size: %d bytes", len(requestBody))
if len(requestBody) < 2 {
return nil, fmt.Errorf("CreateTopics request too short")
}
// Parse based on API version
switch apiVersion {
case 0, 1:
Debug("CreateTopics - Routing to v0/v1 handler")
response, err := h.handleCreateTopicsV0V1(correlationID, requestBody)
Debug("CreateTopics - v0/v1 handler returned, response size: %d bytes, err: %v", len(response), err)
return response, err
case 2, 3, 4:
// kafka-go sends v2-4 in regular format, not compact
Debug("CreateTopics - Routing to v2-4 handler")
response, err := h.handleCreateTopicsV2To4(correlationID, requestBody)
Debug("CreateTopics - v2-4 handler returned, response size: %d bytes, err: %v", len(response), err)
return response, err
case 5:
// v5+ uses flexible format with compact arrays
Debug("CreateTopics - Routing to v5+ handler")
response, err := h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody)
Debug("CreateTopics - v5+ handler returned, response size: %d bytes, err: %v", len(response), err)
return response, err
default:
return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion)
}
}
// handleCreateTopicsV2To4 handles CreateTopics API versions 2-4 (auto-detect regular vs compact format)
func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []byte) ([]byte, error) {
// Auto-detect format: kafka-go sends regular format, tests send compact format
if len(requestBody) < 1 {
return nil, fmt.Errorf("CreateTopics v2-4 request too short")
}
// Detect format by checking first byte
// Compact format: first byte is compact array length (usually 0x02 for 1 topic)
// Regular format: first 4 bytes are regular array count (usually 0x00000001 for 1 topic)
isCompactFormat := false
if len(requestBody) >= 4 {
// Check if this looks like a regular 4-byte array count
regularCount := binary.BigEndian.Uint32(requestBody[0:4])
// If the "regular count" is very large (> 1000), it's probably compact format
// Also check if first byte is small (typical compact array length)
if regularCount > 1000 || (requestBody[0] <= 10 && requestBody[0] > 0) {
isCompactFormat = true
}
} else if requestBody[0] <= 10 && requestBody[0] > 0 {
isCompactFormat = true
}
if isCompactFormat {
Debug("CreateTopics v2-4 - Detected compact format")
// Delegate to the compact format handler
response, err := h.handleCreateTopicsV2Plus(correlationID, 2, requestBody)
Debug("CreateTopics v2-4 - Compact format handler returned, response size: %d bytes, err: %v", len(response), err)
return response, err
}
Debug("CreateTopics v2-4 - Detected regular format")
// Handle regular format
offset := 0
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v2-4 request too short for topics array")
}
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
Debug("CreateTopics v2-4 - Topics count: %d, remaining bytes: %d", topicsCount, len(requestBody)-offset)
// Parse topics
topics := make([]struct {
name string
partitions uint32
replication uint16
}, 0, topicsCount)
for i := uint32(0); i < topicsCount; i++ {
if len(requestBody) < offset+2 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated topic name length")
}
nameLen := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(nameLen) {
return nil, fmt.Errorf("CreateTopics v2-4: truncated topic name")
}
topicName := string(requestBody[offset : offset+int(nameLen)])
offset += int(nameLen)
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated num_partitions")
}
numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
if len(requestBody) < offset+2 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated replication_factor")
}
replication := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
// Assignments array (array of partition assignments) - skip contents
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated assignments count")
}
assignments := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
for j := uint32(0); j < assignments; j++ {
// partition_id (int32) + replicas (array int32)
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated assignment partition id")
}
offset += 4
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated replicas count")
}
replicasCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// skip replica ids
offset += int(replicasCount) * 4
}
// Configs array (array of (name,value) strings) - skip contents
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated configs count")
}
configs := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
for j := uint32(0); j < configs; j++ {
// name (string)
if len(requestBody) < offset+2 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated config name length")
}
nameLen := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2 + int(nameLen)
// value (nullable string)
if len(requestBody) < offset+2 {
return nil, fmt.Errorf("CreateTopics v2-4: truncated config value length")
}
valueLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if valueLen >= 0 {
offset += int(valueLen)
}
}
topics = append(topics, struct {
name string
partitions uint32
replication uint16
}{topicName, numPartitions, replication})
}
// timeout_ms
if len(requestBody) >= offset+4 {
_ = binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
}
// validate_only (boolean)
if len(requestBody) >= offset+1 {
_ = requestBody[offset]
offset += 1
}
// Build response
response := make([]byte, 0, 128)
// Correlation ID
cid := make([]byte, 4)
binary.BigEndian.PutUint32(cid, correlationID)
response = append(response, cid...)
// throttle_time_ms (4 bytes)
response = append(response, 0, 0, 0, 0)
// topics array count (int32)
countBytes := make([]byte, 4)
binary.BigEndian.PutUint32(countBytes, uint32(len(topics)))
response = append(response, countBytes...)
// per-topic responses
for _, t := range topics {
// topic name (string)
nameLen := make([]byte, 2)
binary.BigEndian.PutUint16(nameLen, uint16(len(t.name)))
response = append(response, nameLen...)
response = append(response, []byte(t.name)...)
// error_code (int16)
var errCode uint16 = 0
if h.seaweedMQHandler.TopicExists(t.name) {
errCode = 36 // TOPIC_ALREADY_EXISTS
} else if t.partitions == 0 {
errCode = 37 // INVALID_PARTITIONS
} else if t.replication == 0 {
errCode = 38 // INVALID_REPLICATION_FACTOR
} else {
if err := h.seaweedMQHandler.CreateTopic(t.name, int32(t.partitions)); err != nil {
errCode = 1 // UNKNOWN_SERVER_ERROR
}
}
eb := make([]byte, 2)
binary.BigEndian.PutUint16(eb, errCode)
response = append(response, eb...)
// error_message (nullable string) -> null
response = append(response, 0xFF, 0xFF)
}
Debug("CreateTopics v2-4 - Regular format handler completed, response size: %d bytes", len(response))
return response, nil
}
func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byte) ([]byte, error) {
Debug("CreateTopics v0/v1 - parsing request of %d bytes", len(requestBody))
if len(requestBody) < 4 {
return nil, fmt.Errorf("CreateTopics v0/v1 request too short")
}
offset := 0
// Parse topics array (regular array format: count + topics)
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
Debug("CreateTopics v0/v1 - Topics count: %d", topicsCount)
// Build response
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Topics array count (4 bytes in v0/v1)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each topic
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
// Parse topic name (regular string: length + bytes)
if len(requestBody) < offset+2 {
break
}
topicNameLength := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameLength) {
break
}
topicName := string(requestBody[offset : offset+int(topicNameLength)])
offset += int(topicNameLength)
// Parse num_partitions (4 bytes)
if len(requestBody) < offset+4 {
break
}
numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Parse replication_factor (2 bytes)
if len(requestBody) < offset+2 {
break
}
replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
// Parse assignments array (4 bytes count, then assignments)
if len(requestBody) < offset+4 {
break
}
assignmentsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Skip assignments for now (simplified)
for j := uint32(0); j < assignmentsCount && offset < len(requestBody); j++ {
// Skip partition_id (4 bytes)
if len(requestBody) >= offset+4 {
offset += 4
}
// Skip replicas array (4 bytes count + replica_ids)
if len(requestBody) >= offset+4 {
replicasCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
offset += int(replicasCount) * 4 // Skip replica IDs
}
}
// Parse configs array (4 bytes count, then configs)
if len(requestBody) >= offset+4 {
configsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Skip configs (simplified)
for j := uint32(0); j < configsCount && offset < len(requestBody); j++ {
// Skip config name (string: 2 bytes length + bytes)
if len(requestBody) >= offset+2 {
configNameLength := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2 + int(configNameLength)
}
// Skip config value (string: 2 bytes length + bytes)
if len(requestBody) >= offset+2 {
configValueLength := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2 + int(configValueLength)
}
}
}
Debug("CreateTopics v0/v1 - Parsed topic: %s, partitions: %d, replication: %d",
topicName, numPartitions, replicationFactor)
// Build response for this topic
// Topic name (string: length + bytes)
topicNameLengthBytes := make([]byte, 2)
binary.BigEndian.PutUint16(topicNameLengthBytes, uint16(len(topicName)))
response = append(response, topicNameLengthBytes...)
response = append(response, []byte(topicName)...)
// Determine error code and message
var errorCode uint16 = 0
// Use SeaweedMQ integration
if h.seaweedMQHandler.TopicExists(topicName) {
errorCode = 36 // TOPIC_ALREADY_EXISTS
} else if numPartitions <= 0 {
errorCode = 37 // INVALID_PARTITIONS
} else if replicationFactor <= 0 {
errorCode = 38 // INVALID_REPLICATION_FACTOR
} else {
// Create the topic in SeaweedMQ
if err := h.seaweedMQHandler.CreateTopic(topicName, int32(numPartitions)); err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
}
}
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, errorCode)
response = append(response, errorCodeBytes...)
}
// Parse timeout_ms (4 bytes) - at the end of request
if len(requestBody) >= offset+4 {
timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4])
Debug("CreateTopics v0/v1 - timeout_ms: %d", timeoutMs)
offset += 4
}
// Parse validate_only (1 byte) - only in v1
if len(requestBody) >= offset+1 {
validateOnly := requestBody[offset] != 0
Debug("CreateTopics v0/v1 - validate_only: %v", validateOnly)
}
return response, nil
}
// handleCreateTopicsV2Plus handles CreateTopics API versions 2+ (flexible versions with compact arrays/strings)
// For simplicity and consistency with existing response builder, this parses the flexible request,
// converts it into the non-flexible v2-v4 body format, and reuses handleCreateTopicsV2To4 to build the response.
func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
Debug("CreateTopics V2+ (flexible) - parsing request of %d bytes (version %d)", len(requestBody), apiVersion)
offset := 0
// Topics (compact array)
topicsCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topics compact array: %w", apiVersion, err)
}
offset += consumed
type topicSpec struct {
name string
partitions uint32
replication uint16
}
topics := make([]topicSpec, 0, topicsCount)
for i := uint32(0); i < topicsCount; i++ {
// Topic name (compact string)
name, consumed, err := DecodeFlexibleString(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] name: %w", apiVersion, i, err)
}
offset += consumed
if len(requestBody) < offset+6 {
return nil, fmt.Errorf("CreateTopics v%d: truncated partitions/replication for topic[%d]", apiVersion, i)
}
partitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
replication := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
// Configs (compact array) - skip entries
cfgCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] configs array: %w", apiVersion, i, err)
}
offset += consumed
for j := uint32(0); j < cfgCount; j++ {
// name (compact string)
_, consumed, err := DecodeFlexibleString(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] name: %w", apiVersion, i, j, err)
}
offset += consumed
// value (nullable compact string)
_, consumed, err = DecodeFlexibleString(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] value: %w", apiVersion, i, j, err)
}
offset += consumed
// tagged fields for each config
_, consumed, err = DecodeTaggedFields(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] tagged fields: %w", apiVersion, i, j, err)
}
offset += consumed
}
// Tagged fields for topic
_, consumed, err = DecodeTaggedFields(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] tagged fields: %w", apiVersion, i, err)
}
offset += consumed
topics = append(topics, topicSpec{name: name, partitions: partitions, replication: replication})
}
// timeout_ms (int32)
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v%d: missing timeout_ms", apiVersion)
}
timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// validate_only (boolean)
if len(requestBody) < offset+1 {
return nil, fmt.Errorf("CreateTopics v%d: missing validate_only flag", apiVersion)
}
validateOnly := requestBody[offset] != 0
offset += 1
// Tagged fields (top-level)
if _, consumed, err = DecodeTaggedFields(requestBody[offset:]); err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode top-level tagged fields: %w", apiVersion, err)
}
// offset += consumed // Not needed further
// Reconstruct a non-flexible v2-like request body and reuse existing handler
// Format: topics(ARRAY) + timeout_ms(INT32) + validate_only(BOOLEAN)
var legacyBody []byte
// topics count (int32)
legacyBody = append(legacyBody, 0, 0, 0, byte(len(topics)))
if len(topics) > 0 {
legacyBody[len(legacyBody)-1] = byte(len(topics))
}
for _, t := range topics {
// topic name (STRING)
nameLen := uint16(len(t.name))
legacyBody = append(legacyBody, byte(nameLen>>8), byte(nameLen))
legacyBody = append(legacyBody, []byte(t.name)...)
// num_partitions (INT32)
legacyBody = append(legacyBody, byte(t.partitions>>24), byte(t.partitions>>16), byte(t.partitions>>8), byte(t.partitions))
// replication_factor (INT16)
legacyBody = append(legacyBody, byte(t.replication>>8), byte(t.replication))
// assignments array (INT32 count = 0)
legacyBody = append(legacyBody, 0, 0, 0, 0)
// configs array (INT32 count = 0)
legacyBody = append(legacyBody, 0, 0, 0, 0)
}
// timeout_ms
legacyBody = append(legacyBody, byte(timeoutMs>>24), byte(timeoutMs>>16), byte(timeoutMs>>8), byte(timeoutMs))
// validate_only
if validateOnly {
legacyBody = append(legacyBody, 1)
} else {
legacyBody = append(legacyBody, 0)
}
// Build response directly instead of delegating to avoid circular dependency
response := make([]byte, 0, 128)
// Correlation ID
cid := make([]byte, 4)
binary.BigEndian.PutUint32(cid, correlationID)
response = append(response, cid...)
// throttle_time_ms (4 bytes)
response = append(response, 0, 0, 0, 0)
// topics array count (int32)
countBytes := make([]byte, 4)
binary.BigEndian.PutUint32(countBytes, uint32(len(topics)))
response = append(response, countBytes...)
// For each topic
for _, t := range topics {
// topic name (string)
response = append(response, 0, byte(len(t.name)))
response = append(response, []byte(t.name)...)
// error_code (int16)
var errCode uint16 = 0
if h.seaweedMQHandler.TopicExists(t.name) {
errCode = 36 // TOPIC_ALREADY_EXISTS
} else if t.partitions == 0 {
errCode = 37 // INVALID_PARTITIONS
} else if t.replication == 0 {
errCode = 38 // INVALID_REPLICATION_FACTOR
} else {
if err := h.seaweedMQHandler.CreateTopic(t.name, int32(t.partitions)); err != nil {
errCode = 1 // UNKNOWN_SERVER_ERROR
}
}
eb := make([]byte, 2)
binary.BigEndian.PutUint16(eb, errCode)
response = append(response, eb...)
// error_message (nullable string) -> null
response = append(response, 0xFF, 0xFF)
}
return response, nil
}
func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse minimal DeleteTopics request
// Request format: client_id + timeout(4) + topics_array
if len(requestBody) < 6 { // client_id_size(2) + timeout(4)
return nil, fmt.Errorf("DeleteTopics request too short")
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
return nil, fmt.Errorf("DeleteTopics request missing data")
}
// Skip timeout
offset += 4
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each topic (using SeaweedMQ handler)
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize) {
break
}
topicName := string(requestBody[offset : offset+int(topicNameSize)])
offset += int(topicNameSize)
// Response: topic_name + error_code(2) + error_message
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, []byte(topicName)...)
// Check if topic exists and delete it
var errorCode uint16 = 0
var errorMessage string = ""
// Use SeaweedMQ integration
if !h.seaweedMQHandler.TopicExists(topicName) {
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
errorMessage = "Unknown topic"
} else {
// Delete the topic from SeaweedMQ
if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorMessage = err.Error()
}
}
// Error code
response = append(response, byte(errorCode>>8), byte(errorCode))
// Error message (nullable string)
if errorMessage == "" {
response = append(response, 0xFF, 0xFF) // null string
} else {
errorMsgLen := uint16(len(errorMessage))
response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen))
response = append(response, []byte(errorMessage)...)
}
}
return response, nil
}
// validateAPIVersion checks if we support the requested API version
func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error {
supportedVersions := map[uint16][2]uint16{
18: {0, 3}, // ApiVersions: v0-v3
3: {0, 7}, // Metadata: v0-v7
0: {0, 7}, // Produce: v0-v7
1: {0, 7}, // Fetch: v0-v7
2: {0, 2}, // ListOffsets: v0-v2
19: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation)
20: {0, 4}, // DeleteTopics: v0-v4
10: {0, 2}, // FindCoordinator: v0-v2
11: {0, 7}, // JoinGroup: v0-v7
14: {0, 5}, // SyncGroup: v0-v5
8: {0, 2}, // OffsetCommit: v0-v2
9: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation)
12: {0, 4}, // Heartbeat: v0-v4
13: {0, 4}, // LeaveGroup: v0-v4
15: {0, 5}, // DescribeGroups: v0-v5
16: {0, 4}, // ListGroups: v0-v4
32: {0, 4}, // DescribeConfigs: v0-v4
}
if versionRange, exists := supportedVersions[apiKey]; exists {
minVer, maxVer := versionRange[0], versionRange[1]
if apiVersion < minVer || apiVersion > maxVer {
return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)",
apiVersion, apiKey, minVer, maxVer)
}
return nil
}
return fmt.Errorf("unsupported API key: %d", apiKey)
}
// buildUnsupportedVersionResponse creates a proper Kafka error response
func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, apiVersion uint16) ([]byte, error) {
errorMsg := fmt.Sprintf("Unsupported version %d for API key %d", apiVersion, apiKey)
return BuildErrorResponseWithMessage(correlationID, ErrorCodeUnsupportedVersion, errorMsg), nil
}
// handleMetadata routes to the appropriate version-specific handler
func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
switch apiVersion {
case 0:
return h.HandleMetadataV0(correlationID, requestBody)
case 1:
return h.HandleMetadataV1(correlationID, requestBody)
case 2:
return h.HandleMetadataV2(correlationID, requestBody)
case 3, 4:
return h.HandleMetadataV3V4(correlationID, requestBody)
case 5, 6:
return h.HandleMetadataV5V6(correlationID, requestBody)
case 7:
return h.HandleMetadataV7(correlationID, requestBody)
default:
return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}
}
// getAPIName returns a human-readable name for Kafka API keys (for debugging)
func getAPIName(apiKey uint16) string {
switch apiKey {
case 0:
return "Produce"
case 1:
return "Fetch"
case 2:
return "ListOffsets"
case 3:
return "Metadata"
case 8:
return "OffsetCommit"
case 9:
return "OffsetFetch"
case 10:
return "FindCoordinator"
case 11:
return "JoinGroup"
case 12:
return "Heartbeat"
case 13:
return "LeaveGroup"
case 14:
return "SyncGroup"
case 15:
return "DescribeGroups"
case 16:
return "ListGroups"
case 18:
return "ApiVersions"
case 19:
return "CreateTopics"
case 20:
return "DeleteTopics"
case 32:
return "DescribeConfigs"
default:
return "Unknown"
}
}
// handleDescribeConfigs handles DescribeConfigs API requests (API key 32)
func (h *Handler) handleDescribeConfigs(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
Debug("DescribeConfigs v%d - parsing request body (%d bytes)", apiVersion, len(requestBody))
// Parse request to extract resources
resources, err := h.parseDescribeConfigsRequest(requestBody)
if err != nil {
Error("DescribeConfigs parsing error: %v", err)
return nil, fmt.Errorf("failed to parse DescribeConfigs request: %w", err)
}
Debug("DescribeConfigs parsed %d resources", len(resources))
// Build response
response := make([]byte, 0, 2048)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (0ms)
throttleBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throttleBytes, 0)
response = append(response, throttleBytes...)
// Resources array length
resourcesBytes := make([]byte, 4)
binary.BigEndian.PutUint32(resourcesBytes, uint32(len(resources)))
response = append(response, resourcesBytes...)
// For each resource, return appropriate configs
for _, resource := range resources {
resourceResponse := h.buildDescribeConfigsResourceResponse(resource, apiVersion)
response = append(response, resourceResponse...)
}
Debug("DescribeConfigs v%d response constructed, size: %d bytes", apiVersion, len(response))
return response, nil
}
// writeResponseWithTimeout writes a Kafka response with timeout handling
func (h *Handler) writeResponseWithTimeout(w *bufio.Writer, response []byte, timeout time.Duration) error {
// Note: bufio.Writer doesn't support direct timeout setting
// Timeout handling should be done at the connection level before calling this function
// Write response size (4 bytes)
responseSizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response)))
if _, err := w.Write(responseSizeBytes); err != nil {
return fmt.Errorf("write response size: %w", err)
}
// Write response data
if _, err := w.Write(response); err != nil {
return fmt.Errorf("write response data: %w", err)
}
// Flush the buffer
if err := w.Flush(); err != nil {
return fmt.Errorf("flush response: %w", err)
}
return nil
}
// EnableSchemaManagement enables schema management with the given configuration
func (h *Handler) EnableSchemaManagement(config schema.ManagerConfig) error {
manager, err := schema.NewManagerWithHealthCheck(config)
if err != nil {
return fmt.Errorf("failed to create schema manager: %w", err)
}
h.schemaManager = manager
h.useSchema = true
fmt.Printf("Schema management enabled with registry: %s\n", config.RegistryURL)
return nil
}
// EnableBrokerIntegration enables mq.broker integration for schematized messages
func (h *Handler) EnableBrokerIntegration(brokers []string) error {
if !h.IsSchemaEnabled() {
return fmt.Errorf("schema management must be enabled before broker integration")
}
brokerClient := schema.NewBrokerClient(schema.BrokerClientConfig{
Brokers: brokers,
SchemaManager: h.schemaManager,
})
h.brokerClient = brokerClient
fmt.Printf("Broker integration enabled with brokers: %v\n", brokers)
return nil
}
// DisableSchemaManagement disables schema management and broker integration
func (h *Handler) DisableSchemaManagement() {
if h.brokerClient != nil {
h.brokerClient.Close()
h.brokerClient = nil
fmt.Println("Broker integration disabled")
}
h.schemaManager = nil
h.useSchema = false
fmt.Println("Schema management disabled")
}
// IsSchemaEnabled returns whether schema management is enabled
func (h *Handler) IsSchemaEnabled() bool {
return h.useSchema && h.schemaManager != nil
}
// IsBrokerIntegrationEnabled returns true if broker integration is enabled
func (h *Handler) IsBrokerIntegrationEnabled() bool {
return h.IsSchemaEnabled() && h.brokerClient != nil
}
// commitOffsetToSMQ commits offset using SMQ storage
func (h *Handler) commitOffsetToSMQ(key offset.ConsumerOffsetKey, offsetValue int64, metadata string) error {
if h.smqOffsetStorage == nil {
return fmt.Errorf("SMQ offset storage not initialized")
}
// Save to SMQ storage - use current timestamp and size 0 as placeholders
// since SMQ storage primarily tracks the committed offset
return h.smqOffsetStorage.SaveConsumerOffset(key, offsetValue, time.Now().UnixNano(), 0)
}
// fetchOffsetFromSMQ fetches offset using SMQ storage
func (h *Handler) fetchOffsetFromSMQ(key offset.ConsumerOffsetKey) (int64, string, error) {
if h.smqOffsetStorage == nil {
return -1, "", fmt.Errorf("SMQ offset storage not initialized")
}
entries, err := h.smqOffsetStorage.LoadConsumerOffsets(key)
if err != nil {
return -1, "", err
}
if len(entries) == 0 {
return -1, "", nil // No committed offset
}
// Return the committed offset (metadata is not stored in SMQ format)
return entries[0].KafkaOffset, "", nil
}
// DescribeConfigsResource represents a resource in a DescribeConfigs request
type DescribeConfigsResource struct {
ResourceType int8 // 2 = Topic, 4 = Broker
ResourceName string
ConfigNames []string // Empty means return all configs
}
// parseDescribeConfigsRequest parses a DescribeConfigs request body
func (h *Handler) parseDescribeConfigsRequest(requestBody []byte) ([]DescribeConfigsResource, error) {
if len(requestBody) < 4 {
return nil, fmt.Errorf("request too short")
}
offset := 0
resourcesLength := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4]))
offset += 4
// Validate resources length to prevent panic
if resourcesLength < 0 || resourcesLength > 100 { // Reasonable limit
return nil, fmt.Errorf("invalid resources length: %d", resourcesLength)
}
resources := make([]DescribeConfigsResource, 0, resourcesLength)
for i := int32(0); i < resourcesLength; i++ {
if offset+1 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for resource type")
}
// Resource type (1 byte)
resourceType := int8(requestBody[offset])
offset++
// Resource name (string)
if offset+2 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for resource name length")
}
nameLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
// Validate name length to prevent panic
if nameLength < 0 || nameLength > 1000 { // Reasonable limit
return nil, fmt.Errorf("invalid resource name length: %d", nameLength)
}
if offset+nameLength > len(requestBody) {
return nil, fmt.Errorf("insufficient data for resource name")
}
resourceName := string(requestBody[offset : offset+nameLength])
offset += nameLength
// Config names array (optional filter)
if offset+4 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for config names length")
}
configNamesLength := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4]))
offset += 4
// Validate config names length to prevent panic
// Note: -1 means null/empty array in Kafka protocol
if configNamesLength < -1 || configNamesLength > 1000 { // Reasonable limit
return nil, fmt.Errorf("invalid config names length: %d", configNamesLength)
}
// Handle null array case
if configNamesLength == -1 {
configNamesLength = 0
}
configNames := make([]string, 0, configNamesLength)
for j := int32(0); j < configNamesLength; j++ {
if offset+2 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for config name length")
}
configNameLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
// Validate config name length to prevent panic
if configNameLength < 0 || configNameLength > 500 { // Reasonable limit
return nil, fmt.Errorf("invalid config name length: %d", configNameLength)
}
if offset+configNameLength > len(requestBody) {
return nil, fmt.Errorf("insufficient data for config name")
}
configName := string(requestBody[offset : offset+configNameLength])
offset += configNameLength
configNames = append(configNames, configName)
}
resources = append(resources, DescribeConfigsResource{
ResourceType: resourceType,
ResourceName: resourceName,
ConfigNames: configNames,
})
}
return resources, nil
}
// buildDescribeConfigsResourceResponse builds the response for a single resource
func (h *Handler) buildDescribeConfigsResourceResponse(resource DescribeConfigsResource, apiVersion uint16) []byte {
response := make([]byte, 0, 512)
// Error code (0 = no error)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, 0)
response = append(response, errorCodeBytes...)
// Error message (null string = -1 length)
errorMsgBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorMsgBytes, 0xFFFF) // -1 as uint16
response = append(response, errorMsgBytes...)
// Resource type
response = append(response, byte(resource.ResourceType))
// Resource name
nameBytes := make([]byte, 2+len(resource.ResourceName))
binary.BigEndian.PutUint16(nameBytes[0:2], uint16(len(resource.ResourceName)))
copy(nameBytes[2:], []byte(resource.ResourceName))
response = append(response, nameBytes...)
// Get configs for this resource
configs := h.getConfigsForResource(resource)
// Config entries array length
configCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(configCountBytes, uint32(len(configs)))
response = append(response, configCountBytes...)
// Add each config entry
for _, config := range configs {
configBytes := h.buildConfigEntry(config, apiVersion)
response = append(response, configBytes...)
}
return response
}
// ConfigEntry represents a single configuration entry
type ConfigEntry struct {
Name string
Value string
ReadOnly bool
IsDefault bool
Sensitive bool
}
// getConfigsForResource returns appropriate configs for a resource
func (h *Handler) getConfigsForResource(resource DescribeConfigsResource) []ConfigEntry {
switch resource.ResourceType {
case 2: // Topic
return h.getTopicConfigs(resource.ResourceName, resource.ConfigNames)
case 4: // Broker
return h.getBrokerConfigs(resource.ConfigNames)
default:
return []ConfigEntry{}
}
}
// getTopicConfigs returns topic-level configurations
func (h *Handler) getTopicConfigs(topicName string, requestedConfigs []string) []ConfigEntry {
// Default topic configs that admin clients commonly request
allConfigs := map[string]ConfigEntry{
"cleanup.policy": {
Name: "cleanup.policy",
Value: "delete",
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"retention.ms": {
Name: "retention.ms",
Value: "604800000", // 7 days in milliseconds
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"retention.bytes": {
Name: "retention.bytes",
Value: "-1", // Unlimited
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"segment.ms": {
Name: "segment.ms",
Value: "86400000", // 1 day in milliseconds
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"max.message.bytes": {
Name: "max.message.bytes",
Value: "1048588", // ~1MB
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"min.insync.replicas": {
Name: "min.insync.replicas",
Value: "1",
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
}
// If specific configs requested, filter to those
if len(requestedConfigs) > 0 {
filteredConfigs := make([]ConfigEntry, 0, len(requestedConfigs))
for _, configName := range requestedConfigs {
if config, exists := allConfigs[configName]; exists {
filteredConfigs = append(filteredConfigs, config)
}
}
return filteredConfigs
}
// Return all configs
configs := make([]ConfigEntry, 0, len(allConfigs))
for _, config := range allConfigs {
configs = append(configs, config)
}
return configs
}
// getBrokerConfigs returns broker-level configurations
func (h *Handler) getBrokerConfigs(requestedConfigs []string) []ConfigEntry {
// Default broker configs that admin clients commonly request
allConfigs := map[string]ConfigEntry{
"log.retention.hours": {
Name: "log.retention.hours",
Value: "168", // 7 days
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"log.segment.bytes": {
Name: "log.segment.bytes",
Value: "1073741824", // 1GB
ReadOnly: false,
IsDefault: true,
Sensitive: false,
},
"num.network.threads": {
Name: "num.network.threads",
Value: "3",
ReadOnly: true,
IsDefault: true,
Sensitive: false,
},
"num.io.threads": {
Name: "num.io.threads",
Value: "8",
ReadOnly: true,
IsDefault: true,
Sensitive: false,
},
}
// If specific configs requested, filter to those
if len(requestedConfigs) > 0 {
filteredConfigs := make([]ConfigEntry, 0, len(requestedConfigs))
for _, configName := range requestedConfigs {
if config, exists := allConfigs[configName]; exists {
filteredConfigs = append(filteredConfigs, config)
}
}
return filteredConfigs
}
// Return all configs
configs := make([]ConfigEntry, 0, len(allConfigs))
for _, config := range allConfigs {
configs = append(configs, config)
}
return configs
}
// buildConfigEntry builds the wire format for a single config entry
func (h *Handler) buildConfigEntry(config ConfigEntry, apiVersion uint16) []byte {
entry := make([]byte, 0, 256)
// Config name
nameBytes := make([]byte, 2+len(config.Name))
binary.BigEndian.PutUint16(nameBytes[0:2], uint16(len(config.Name)))
copy(nameBytes[2:], []byte(config.Name))
entry = append(entry, nameBytes...)
// Config value
valueBytes := make([]byte, 2+len(config.Value))
binary.BigEndian.PutUint16(valueBytes[0:2], uint16(len(config.Value)))
copy(valueBytes[2:], []byte(config.Value))
entry = append(entry, valueBytes...)
// Read only flag
if config.ReadOnly {
entry = append(entry, 1)
} else {
entry = append(entry, 0)
}
// Is default flag (only for version 0)
if apiVersion == 0 {
if config.IsDefault {
entry = append(entry, 1)
} else {
entry = append(entry, 0)
}
}
// Config source (for versions 1-3)
if apiVersion >= 1 && apiVersion <= 3 {
// ConfigSource: 1 = DYNAMIC_TOPIC_CONFIG, 2 = DYNAMIC_BROKER_CONFIG, 4 = STATIC_BROKER_CONFIG, 5 = DEFAULT_CONFIG
configSource := int8(5) // DEFAULT_CONFIG for all our configs since they're defaults
entry = append(entry, byte(configSource))
}
// Sensitive flag
if config.Sensitive {
entry = append(entry, 1)
} else {
entry = append(entry, 0)
}
// Config synonyms (for versions 1-3)
if apiVersion >= 1 && apiVersion <= 3 {
// Empty synonyms array (4 bytes for array length = 0)
synonymsLength := make([]byte, 4)
binary.BigEndian.PutUint32(synonymsLength, 0)
entry = append(entry, synonymsLength...)
}
// Config type (for version 3 only)
if apiVersion == 3 {
configType := int8(1) // STRING type for all our configs
entry = append(entry, byte(configType))
}
// Config documentation (for version 3 only)
if apiVersion == 3 {
// Null documentation (length = -1)
docLength := make([]byte, 2)
binary.BigEndian.PutUint16(docLength, 0xFFFF) // -1 as uint16
entry = append(entry, docLength...)
}
return entry
}
// getTopicMetadata retrieves topic metadata from filer with TTL cache
func (h *Handler) getTopicMetadata(topicName string) (*TopicMetadata, error) {
// Check cache first
h.metadataCacheMu.RLock()
if cached, exists := h.topicMetadataCache[topicName]; exists {
if time.Since(cached.CachedAt) < TopicMetadataCacheTTL {
h.metadataCacheMu.RUnlock()
return cached.Metadata, nil
}
}
h.metadataCacheMu.RUnlock()
// Fetch from filer
metadata, err := h.fetchTopicMetadataFromFiler(topicName)
if err != nil {
return nil, err
}
// Cache the result
h.metadataCacheMu.Lock()
h.topicMetadataCache[topicName] = &CachedTopicMetadata{
Metadata: metadata,
CachedAt: time.Now(),
}
h.metadataCacheMu.Unlock()
return metadata, nil
}
// fetchTopicMetadataFromFiler retrieves topic metadata from SeaweedMQ filer
func (h *Handler) fetchTopicMetadataFromFiler(topicName string) (*TopicMetadata, error) {
if h.filerClient == nil {
// If no filer client, return default metadata
return &TopicMetadata{
TopicName: topicName,
IsSchematized: false,
Properties: make(map[string]string),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}, nil
}
// Create topic reference for SeaweedMQ (using default Kafka namespace)
t := topic.NewTopic(DefaultKafkaNamespace, topicName)
// Try to read Kafka metadata file (stored alongside SMQ's topic.conf)
data, err := filer.ReadInsideFiler(h.filerClient, t.Dir(), KafkaMetadataFile)
if err != nil {
// If metadata file doesn't exist, return default metadata
return &TopicMetadata{
TopicName: topicName,
IsSchematized: false,
Properties: make(map[string]string),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}, nil
}
// Parse the metadata JSON
var metadata TopicMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
return nil, fmt.Errorf("failed to parse topic metadata for topic %s: %w", topicName, err)
}
// Ensure the topic name matches
metadata.TopicName = topicName
return &metadata, nil
}
// isSchematizedTopicFromMetadata checks if a topic is schematized using cached metadata
func (h *Handler) isSchematizedTopicFromMetadata(topicName string) bool {
metadata, err := h.getTopicMetadata(topicName)
if err != nil {
// Fallback to the existing schema detection logic
return h.isSchematizedTopic(topicName)
}
return metadata.IsSchematized
}