package integration import ( "context" "encoding/binary" "fmt" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) // SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage type SeaweedMQHandler struct { agentClient *AgentClient // For agent-based connections brokerClient *BrokerClient // For broker-based connections useBroker bool // Flag to determine which client to use // Master client for service discovery masterClient *wdclient.MasterClient // Discovered broker addresses (for Metadata responses) brokerAddresses []string // Topic registry - still keep track of Kafka topics topicsMu sync.RWMutex topics map[string]*KafkaTopicInfo // Offset ledgers for Kafka offset translation ledgersMu sync.RWMutex ledgers map[TopicPartitionKey]*offset.Ledger } // KafkaTopicInfo holds Kafka-specific topic information type KafkaTopicInfo struct { Name string Partitions int32 CreatedAt int64 // SeaweedMQ integration SeaweedTopic *schema_pb.Topic } // TopicPartitionKey uniquely identifies a topic partition type TopicPartitionKey struct { Topic string Partition int32 } // NewSeaweedMQHandler creates a new handler with SeaweedMQ integration func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) { agentClient, err := NewAgentClient(agentAddress) if err != nil { return nil, fmt.Errorf("failed to create agent client: %v", err) } // Test the connection if err := agentClient.HealthCheck(); err != nil { agentClient.Close() return nil, fmt.Errorf("agent health check failed: %v", err) } return &SeaweedMQHandler{ agentClient: agentClient, useBroker: false, topics: make(map[string]*KafkaTopicInfo), ledgers: make(map[TopicPartitionKey]*offset.Ledger), }, nil } // GetStoredRecords retrieves records from SeaweedMQ storage // This implements the core integration between Kafka Fetch API and SeaweedMQ storage func (h *SeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) { // Verify topic exists if !h.TopicExists(topic) { return nil, fmt.Errorf("topic %s does not exist", topic) } // Get the offset ledger to translate Kafka offsets to SeaweedMQ timestamps ledger := h.GetLedger(topic, partition) if ledger == nil { // No messages yet, return empty return nil, nil } highWaterMark := ledger.GetHighWaterMark() // If fromOffset is at or beyond high water mark, no records to return if fromOffset >= highWaterMark { return nil, nil } // Calculate how many records to fetch, respecting the limit recordsToFetch := int(highWaterMark - fromOffset) if maxRecords > 0 && recordsToFetch > maxRecords { recordsToFetch = maxRecords } if recordsToFetch > 100 { recordsToFetch = 100 // Reasonable batch size limit } // Get or create subscriber session for this topic/partition var seaweedRecords []*SeaweedRecord var err error // Read records using appropriate client (broker or agent) if h.useBroker && h.brokerClient != nil { brokerSubscriber, subErr := h.brokerClient.GetOrCreateSubscriber(topic, partition, fromOffset) if subErr != nil { return nil, fmt.Errorf("failed to get broker subscriber: %v", subErr) } seaweedRecords, err = h.brokerClient.ReadRecords(brokerSubscriber, recordsToFetch) } else if h.agentClient != nil { agentSubscriber, subErr := h.agentClient.GetOrCreateSubscriber(topic, partition, fromOffset) if subErr != nil { return nil, fmt.Errorf("failed to get agent subscriber: %v", subErr) } seaweedRecords, err = h.agentClient.ReadRecords(agentSubscriber, recordsToFetch) } else { return nil, fmt.Errorf("no SeaweedMQ client available") } if err != nil { return nil, fmt.Errorf("failed to read records: %v", err) } // Convert SeaweedMQ records to SMQRecord interface with proper Kafka offsets smqRecords := make([]offset.SMQRecord, 0, len(seaweedRecords)) for i, seaweedRecord := range seaweedRecords { kafkaOffset := fromOffset + int64(i) smqRecord := &SeaweedSMQRecord{ key: seaweedRecord.Key, value: seaweedRecord.Value, timestamp: seaweedRecord.Timestamp, offset: kafkaOffset, } smqRecords = append(smqRecords, smqRecord) } return smqRecords, nil } // SeaweedSMQRecord implements the offset.SMQRecord interface for SeaweedMQ records type SeaweedSMQRecord struct { key []byte value []byte timestamp int64 offset int64 } // GetKey returns the record key func (r *SeaweedSMQRecord) GetKey() []byte { return r.key } // GetValue returns the record value func (r *SeaweedSMQRecord) GetValue() []byte { return r.value } // GetTimestamp returns the record timestamp func (r *SeaweedSMQRecord) GetTimestamp() int64 { return r.timestamp } // GetOffset returns the Kafka offset for this record func (r *SeaweedSMQRecord) GetOffset() int64 { return r.offset } // GetFilerClient returns a filer client for accessing SeaweedMQ metadata func (h *SeaweedMQHandler) GetFilerClient() filer_pb.SeaweedFilerClient { if h.useBroker && h.brokerClient != nil { return h.brokerClient.filerClient } // Agent client doesn't have filer access return nil } // GetFilerAddress returns the filer address used by this handler func (h *SeaweedMQHandler) GetFilerAddress() string { if h.useBroker && h.brokerClient != nil { return h.brokerClient.GetFilerAddress() } // Agent client doesn't have filer access return "" } // GetBrokerAddresses returns the discovered SMQ broker addresses func (h *SeaweedMQHandler) GetBrokerAddresses() []string { return h.brokerAddresses } // Close shuts down the handler and all connections func (h *SeaweedMQHandler) Close() error { if h.useBroker && h.brokerClient != nil { return h.brokerClient.Close() } else if h.agentClient != nil { return h.agentClient.Close() } return nil } // CreateTopic creates a new topic in both Kafka registry and SeaweedMQ func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { h.topicsMu.Lock() defer h.topicsMu.Unlock() // Check if topic already exists if _, exists := h.topics[name]; exists { return fmt.Errorf("topic %s already exists", name) } // Create SeaweedMQ topic reference seaweedTopic := &schema_pb.Topic{ Namespace: "kafka", Name: name, } glog.V(1).Infof("🆕 Creating topic %s with %d partitions in SeaweedMQ broker", name, partitions) // Configure topic with SeaweedMQ broker via gRPC if len(h.brokerAddresses) > 0 { brokerAddress := h.brokerAddresses[0] // Use first available broker glog.V(1).Infof("📞 Configuring topic %s with broker %s", name, brokerAddress) // Create gRPC dial option for broker connection grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) err := pb.WithBrokerGrpcClient(false, brokerAddress, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ Topic: seaweedTopic, PartitionCount: partitions, }) if err != nil { glog.Errorf("❌ Failed to configure topic %s with broker: %v", name, err) return fmt.Errorf("configure topic with broker: %w", err) } glog.V(1).Infof("✅ Successfully configured topic %s with broker", name) return nil }) if err != nil { return fmt.Errorf("failed to configure topic %s with broker %s: %w", name, brokerAddress, err) } } else { glog.Warningf("⚠️ No brokers available - creating topic %s in gateway memory only (testing mode)", name) } // Create Kafka topic info topicInfo := &KafkaTopicInfo{ Name: name, Partitions: partitions, CreatedAt: time.Now().UnixNano(), SeaweedTopic: seaweedTopic, } // Store in registry h.topics[name] = topicInfo // Initialize offset ledgers for all partitions for partitionID := int32(0); partitionID < partitions; partitionID++ { key := TopicPartitionKey{Topic: name, Partition: partitionID} h.ledgersMu.Lock() h.ledgers[key] = offset.NewLedger() h.ledgersMu.Unlock() } glog.V(1).Infof("🎉 Topic %s created successfully with %d partitions", name, partitions) return nil } // DeleteTopic removes a topic from both Kafka registry and SeaweedMQ func (h *SeaweedMQHandler) DeleteTopic(name string) error { h.topicsMu.Lock() defer h.topicsMu.Unlock() topicInfo, exists := h.topics[name] if !exists { return fmt.Errorf("topic %s does not exist", name) } // Close all publisher sessions for this topic for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { if h.useBroker && h.brokerClient != nil { h.brokerClient.ClosePublisher(name, partitionID) } else if h.agentClient != nil { h.agentClient.ClosePublisher(name, partitionID) } } // Remove from registry delete(h.topics, name) // Clean up offset ledgers h.ledgersMu.Lock() for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { key := TopicPartitionKey{Topic: name, Partition: partitionID} delete(h.ledgers, key) } h.ledgersMu.Unlock() return nil } // TopicExists checks if a topic exists func (h *SeaweedMQHandler) TopicExists(name string) bool { h.topicsMu.RLock() defer h.topicsMu.RUnlock() _, exists := h.topics[name] return exists } // GetTopicInfo returns information about a topic func (h *SeaweedMQHandler) GetTopicInfo(name string) (*KafkaTopicInfo, bool) { h.topicsMu.RLock() defer h.topicsMu.RUnlock() info, exists := h.topics[name] return info, exists } // ListTopics returns all topic names func (h *SeaweedMQHandler) ListTopics() []string { h.topicsMu.RLock() defer h.topicsMu.RUnlock() topics := make([]string, 0, len(h.topics)) for name := range h.topics { topics = append(topics, name) } return topics } // ProduceRecord publishes a record to SeaweedMQ and updates Kafka offset tracking func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) { // Verify topic exists if !h.TopicExists(topic) { return 0, fmt.Errorf("topic %s does not exist", topic) } // Get current timestamp timestamp := time.Now().UnixNano() // Publish to SeaweedMQ var publishErr error if h.useBroker && h.brokerClient != nil { _, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp) } else if h.agentClient != nil { _, publishErr = h.agentClient.PublishRecord(topic, partition, key, value, timestamp) } else { publishErr = fmt.Errorf("no client available") } if publishErr != nil { return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", publishErr) } // Update Kafka offset ledger ledger := h.GetOrCreateLedger(topic, partition) kafkaOffset := ledger.AssignOffsets(1) // Assign one Kafka offset // Map SeaweedMQ sequence to Kafka offset if err := ledger.AppendRecord(kafkaOffset, timestamp, int32(len(value))); err != nil { // Log the error but don't fail the produce operation fmt.Printf("Warning: failed to update offset ledger: %v\n", err) } return kafkaOffset, nil } // GetOrCreateLedger returns the offset ledger for a topic-partition func (h *SeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { key := TopicPartitionKey{Topic: topic, Partition: partition} // Try to get existing ledger h.ledgersMu.RLock() ledger, exists := h.ledgers[key] h.ledgersMu.RUnlock() if exists { return ledger } // Create new ledger h.ledgersMu.Lock() defer h.ledgersMu.Unlock() // Double-check after acquiring write lock if ledger, exists := h.ledgers[key]; exists { return ledger } // Create and store new ledger ledger = offset.NewLedger() h.ledgers[key] = ledger return ledger } // GetLedger returns the offset ledger for a topic-partition, or nil if not found func (h *SeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger { key := TopicPartitionKey{Topic: topic, Partition: partition} h.ledgersMu.RLock() defer h.ledgersMu.RUnlock() return h.ledgers[key] } // FetchRecords retrieves records from SeaweedMQ for a Kafka fetch request func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffset int64, maxBytes int32) ([]byte, error) { // Verify topic exists if !h.TopicExists(topic) { return nil, fmt.Errorf("topic %s does not exist", topic) } ledger := h.GetLedger(topic, partition) if ledger == nil { // No messages yet, return empty record batch return []byte{}, nil } highWaterMark := ledger.GetHighWaterMark() // If fetch offset is at or beyond high water mark, no records to return if fetchOffset >= highWaterMark { return []byte{}, nil } // Get or create subscriber session for this topic/partition var seaweedRecords []*SeaweedRecord var err error // Calculate how many records to fetch recordsToFetch := int(highWaterMark - fetchOffset) if recordsToFetch > 100 { recordsToFetch = 100 // Limit batch size } // Read records using appropriate client if h.useBroker && h.brokerClient != nil { brokerSubscriber, subErr := h.brokerClient.GetOrCreateSubscriber(topic, partition, fetchOffset) if subErr != nil { return nil, fmt.Errorf("failed to get broker subscriber: %v", subErr) } seaweedRecords, err = h.brokerClient.ReadRecords(brokerSubscriber, recordsToFetch) } else if h.agentClient != nil { agentSubscriber, subErr := h.agentClient.GetOrCreateSubscriber(topic, partition, fetchOffset) if subErr != nil { return nil, fmt.Errorf("failed to get agent subscriber: %v", subErr) } seaweedRecords, err = h.agentClient.ReadRecords(agentSubscriber, recordsToFetch) } else { return nil, fmt.Errorf("no client available") } if err != nil { // If no records available, return empty batch instead of error return []byte{}, nil } // Map SeaweedMQ records to Kafka offsets and update ledger kafkaRecords, err := h.mapSeaweedToKafkaOffsets(topic, partition, seaweedRecords, fetchOffset) if err != nil { return nil, fmt.Errorf("failed to map offsets: %v", err) } // Convert mapped records to Kafka record batch format return h.convertSeaweedToKafkaRecordBatch(kafkaRecords, fetchOffset, maxBytes) } // constructKafkaRecordBatch creates a Kafka-compatible record batch func (h *SeaweedMQHandler) constructKafkaRecordBatch(ledger *offset.Ledger, fetchOffset, highWaterMark int64, maxBytes int32) ([]byte, error) { recordsToFetch := highWaterMark - fetchOffset if recordsToFetch <= 0 { return []byte{}, nil } // Limit records to prevent overly large batches if recordsToFetch > 100 { recordsToFetch = 100 } // For Phase 2, create a stub record batch with placeholder data // This represents what would come from SeaweedMQ subscriber batch := make([]byte, 0, 512) // Record batch header baseOffsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(baseOffsetBytes, uint64(fetchOffset)) batch = append(batch, baseOffsetBytes...) // base offset // Batch length (placeholder, will be filled at end) batchLengthPos := len(batch) batch = append(batch, 0, 0, 0, 0) batch = append(batch, 0, 0, 0, 0) // partition leader epoch batch = append(batch, 2) // magic byte (version 2) // CRC placeholder batch = append(batch, 0, 0, 0, 0) // Batch attributes batch = append(batch, 0, 0) // Last offset delta lastOffsetDelta := uint32(recordsToFetch - 1) lastOffsetDeltaBytes := make([]byte, 4) binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) batch = append(batch, lastOffsetDeltaBytes...) // Timestamps currentTime := time.Now().UnixNano() firstTimestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(firstTimestampBytes, uint64(currentTime)) batch = append(batch, firstTimestampBytes...) maxTimestamp := currentTime + recordsToFetch*1000000 // 1ms apart maxTimestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp)) batch = append(batch, maxTimestampBytes...) // Producer info (simplified) batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // producer ID (-1) batch = append(batch, 0xFF, 0xFF) // producer epoch (-1) batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) // base sequence (-1) // Record count recordCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(recordCountBytes, uint32(recordsToFetch)) batch = append(batch, recordCountBytes...) // Add simple records (placeholders representing SeaweedMQ data) for i := int64(0); i < recordsToFetch; i++ { record := h.constructSingleRecord(i, fetchOffset+i) recordLength := byte(len(record)) batch = append(batch, recordLength) batch = append(batch, record...) } // Fill in the batch length batchLength := uint32(len(batch) - batchLengthPos - 4) binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) return batch, nil } // constructSingleRecord creates a single Kafka record func (h *SeaweedMQHandler) constructSingleRecord(index, offset int64) []byte { record := make([]byte, 0, 64) // Record attributes record = append(record, 0) // Timestamp delta (varint - simplified) record = append(record, byte(index)) // Offset delta (varint - simplified) record = append(record, byte(index)) // Key length (-1 = null key) record = append(record, 0xFF) // Value (represents data that would come from SeaweedMQ) value := fmt.Sprintf("seaweedmq-message-%d", offset) record = append(record, byte(len(value))) record = append(record, []byte(value)...) // Headers count (0) record = append(record, 0) return record } // mapSeaweedToKafkaOffsets maps SeaweedMQ records to proper Kafka offsets func (h *SeaweedMQHandler) mapSeaweedToKafkaOffsets(topic string, partition int32, seaweedRecords []*SeaweedRecord, startOffset int64) ([]*SeaweedRecord, error) { if len(seaweedRecords) == 0 { return seaweedRecords, nil } ledger := h.GetOrCreateLedger(topic, partition) mappedRecords := make([]*SeaweedRecord, 0, len(seaweedRecords)) // Assign the required offsets first (this ensures offsets are reserved in sequence) // Note: In a real scenario, these offsets would have been assigned during produce // but for fetch operations we need to ensure the ledger state is consistent for i, seaweedRecord := range seaweedRecords { currentKafkaOffset := startOffset + int64(i) // Create a copy of the record with proper Kafka offset assignment mappedRecord := &SeaweedRecord{ Key: seaweedRecord.Key, Value: seaweedRecord.Value, Timestamp: seaweedRecord.Timestamp, Sequence: currentKafkaOffset, // Use Kafka offset as sequence for consistency } // Update the offset ledger to track the mapping between SeaweedMQ sequence and Kafka offset recordSize := int32(len(seaweedRecord.Value)) if err := ledger.AppendRecord(currentKafkaOffset, seaweedRecord.Timestamp, recordSize); err != nil { // Log warning but continue processing fmt.Printf("Warning: failed to update offset ledger for topic %s partition %d offset %d: %v\n", topic, partition, currentKafkaOffset, err) } mappedRecords = append(mappedRecords, mappedRecord) } return mappedRecords, nil } // convertSeaweedToKafkaRecordBatch converts SeaweedMQ records to Kafka record batch format func (h *SeaweedMQHandler) convertSeaweedToKafkaRecordBatch(seaweedRecords []*SeaweedRecord, fetchOffset int64, maxBytes int32) ([]byte, error) { if len(seaweedRecords) == 0 { return []byte{}, nil } batch := make([]byte, 0, 512) // Record batch header baseOffsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(baseOffsetBytes, uint64(fetchOffset)) batch = append(batch, baseOffsetBytes...) // base offset // Batch length (placeholder, will be filled at end) batchLengthPos := len(batch) batch = append(batch, 0, 0, 0, 0) batch = append(batch, 0, 0, 0, 0) // partition leader epoch batch = append(batch, 2) // magic byte (version 2) // CRC placeholder batch = append(batch, 0, 0, 0, 0) // Batch attributes batch = append(batch, 0, 0) // Last offset delta lastOffsetDelta := uint32(len(seaweedRecords) - 1) lastOffsetDeltaBytes := make([]byte, 4) binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) batch = append(batch, lastOffsetDeltaBytes...) // Timestamps - use actual timestamps from SeaweedMQ records var firstTimestamp, maxTimestamp int64 if len(seaweedRecords) > 0 { firstTimestamp = seaweedRecords[0].Timestamp maxTimestamp = firstTimestamp for _, record := range seaweedRecords { if record.Timestamp > maxTimestamp { maxTimestamp = record.Timestamp } } } firstTimestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(firstTimestampBytes, uint64(firstTimestamp)) batch = append(batch, firstTimestampBytes...) maxTimestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp)) batch = append(batch, maxTimestampBytes...) // Producer info (simplified) batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // producer ID (-1) batch = append(batch, 0xFF, 0xFF) // producer epoch (-1) batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) // base sequence (-1) // Record count recordCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(recordCountBytes, uint32(len(seaweedRecords))) batch = append(batch, recordCountBytes...) // Add actual records from SeaweedMQ for i, seaweedRecord := range seaweedRecords { record := h.convertSingleSeaweedRecord(seaweedRecord, int64(i), fetchOffset) recordLength := byte(len(record)) batch = append(batch, recordLength) batch = append(batch, record...) // Check if we're approaching maxBytes limit if int32(len(batch)) > maxBytes*3/4 { // Leave room for remaining headers and stop adding records break } } // Fill in the batch length batchLength := uint32(len(batch) - batchLengthPos - 4) binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) return batch, nil } // convertSingleSeaweedRecord converts a single SeaweedMQ record to Kafka format func (h *SeaweedMQHandler) convertSingleSeaweedRecord(seaweedRecord *SeaweedRecord, index, baseOffset int64) []byte { record := make([]byte, 0, 64) // Record attributes record = append(record, 0) // Timestamp delta (varint - simplified) timestampDelta := seaweedRecord.Timestamp - baseOffset // Simple delta calculation if timestampDelta < 0 { timestampDelta = 0 } record = append(record, byte(timestampDelta&0xFF)) // Simplified varint encoding // Offset delta (varint - simplified) record = append(record, byte(index)) // Key length and key if len(seaweedRecord.Key) > 0 { record = append(record, byte(len(seaweedRecord.Key))) record = append(record, seaweedRecord.Key...) } else { // Null key record = append(record, 0xFF) } // Value length and value if len(seaweedRecord.Value) > 0 { record = append(record, byte(len(seaweedRecord.Value))) record = append(record, seaweedRecord.Value...) } else { // Empty value record = append(record, 0) } // Headers count (0) record = append(record, 0) return record } // NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration func NewSeaweedMQBrokerHandler(masters string, filerGroup string, clientHost string) (*SeaweedMQHandler, error) { if masters == "" { return nil, fmt.Errorf("masters required - SeaweedMQ infrastructure must be configured") } // Parse master addresses using SeaweedFS utilities masterServerAddresses := pb.ServerAddresses(masters).ToAddresses() if len(masterServerAddresses) == 0 { return nil, fmt.Errorf("no valid master addresses provided") } // Create master client for service discovery grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) masterDiscovery := pb.ServerAddresses(masters).ToServiceDiscovery() // Use provided client host for proper gRPC connection // This is critical for MasterClient to establish streaming connections clientHostAddr := pb.ServerAddress(clientHost) masterClient := wdclient.NewMasterClient(grpcDialOption, filerGroup, "kafka-gateway", clientHostAddr, "", "", *masterDiscovery) glog.V(1).Infof("🚀 Created MasterClient with clientHost=%s, masters=%s", clientHost, masters) // Start KeepConnectedToMaster in background to maintain connection glog.V(1).Infof("🔄 Starting KeepConnectedToMaster background goroutine...") ctx, cancel := context.WithCancel(context.Background()) go func() { defer cancel() masterClient.KeepConnectedToMaster(ctx) }() // Give the connection a moment to establish time.Sleep(2 * time.Second) glog.V(1).Infof("⏱️ Initial connection delay completed") // Discover brokers from masters using master client glog.V(1).Infof("🔄 About to call discoverBrokersWithMasterClient...") brokerAddresses, err := discoverBrokersWithMasterClient(masterClient, filerGroup) if err != nil { glog.Errorf("💥 Broker discovery failed: %v", err) return nil, fmt.Errorf("failed to discover brokers: %v", err) } glog.V(1).Infof("✨ Broker discovery returned: %v", brokerAddresses) if len(brokerAddresses) == 0 { return nil, fmt.Errorf("no brokers discovered from masters") } // Discover filers from masters using master client filerAddresses, err := discoverFilersWithMasterClient(masterClient, filerGroup) if err != nil { return nil, fmt.Errorf("failed to discover filers: %v", err) } // For now, use the first broker and first filer (can be enhanced later for load balancing) brokerAddress := brokerAddresses[0] var filerAddress string if len(filerAddresses) > 0 { filerAddress = filerAddresses[0] } // Create broker client with optional filer access brokerClient, err := NewBrokerClient(brokerAddress, filerAddress) if err != nil { return nil, fmt.Errorf("failed to create broker client: %v", err) } // Test the connection if err := brokerClient.HealthCheck(); err != nil { brokerClient.Close() return nil, fmt.Errorf("broker health check failed: %v", err) } return &SeaweedMQHandler{ brokerClient: brokerClient, useBroker: true, masterClient: masterClient, topics: make(map[string]*KafkaTopicInfo), ledgers: make(map[TopicPartitionKey]*offset.Ledger), brokerAddresses: brokerAddresses, // Store all discovered broker addresses }, nil } // discoverBrokersWithMasterClient queries masters for available brokers using reusable master client func discoverBrokersWithMasterClient(masterClient *wdclient.MasterClient, filerGroup string) ([]string, error) { var brokers []string glog.V(1).Infof("🔍 Starting broker discovery with MasterClient for filer group: %q", filerGroup) err := masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { glog.V(1).Infof("📞 Inside MasterClient.WithClient callback - client obtained successfully") resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.BrokerType, FilerGroup: filerGroup, Limit: 1000, }) if err != nil { glog.Errorf("❌ ListClusterNodes gRPC call failed: %v", err) return err } glog.V(1).Infof("✅ ListClusterNodes successful - found %d cluster nodes", len(resp.ClusterNodes)) // Extract broker addresses from response for _, node := range resp.ClusterNodes { if node.Address != "" { brokers = append(brokers, node.Address) glog.V(1).Infof("🌐 Discovered broker: %s", node.Address) } } return nil }) if err != nil { glog.Errorf("❌ MasterClient.WithClient failed: %v", err) } else { glog.V(1).Infof("🎉 Broker discovery completed successfully - found %d brokers: %v", len(brokers), brokers) } return brokers, err } // discoverFilersWithMasterClient queries masters for available filers using reusable master client func discoverFilersWithMasterClient(masterClient *wdclient.MasterClient, filerGroup string) ([]string, error) { var filers []string err := masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, FilerGroup: filerGroup, Limit: 1000, }) if err != nil { return err } // Extract filer addresses from response and convert to gRPC addresses for _, node := range resp.ClusterNodes { if node.Address != "" { // Convert HTTP address to gRPC address // SeaweedFS filer gRPC port is typically HTTP port + 10000 httpAddr := node.Address grpcAddr := pb.ServerToGrpcAddress(httpAddr) fmt.Printf("FILER DISCOVERY: Converted filer HTTP address %s to gRPC address %s\n", httpAddr, grpcAddr) filers = append(filers, grpcAddr) } } return nil }) return filers, err } // BrokerClient wraps the SeaweedMQ Broker gRPC client for Kafka gateway integration type BrokerClient struct { brokerAddress string conn *grpc.ClientConn client mq_pb.SeaweedMessagingClient // Filer client for metadata access filerAddress string filerConn *grpc.ClientConn filerClient filer_pb.SeaweedFilerClient // Publisher streams: topic-partition -> stream info publishersLock sync.RWMutex publishers map[string]*BrokerPublisherSession // Subscriber streams for offset tracking subscribersLock sync.RWMutex subscribers map[string]*BrokerSubscriberSession ctx context.Context cancel context.CancelFunc } // BrokerPublisherSession tracks a publishing stream to SeaweedMQ broker type BrokerPublisherSession struct { Topic string Partition int32 Stream mq_pb.SeaweedMessaging_PublishMessageClient } // BrokerSubscriberSession tracks a subscription stream for offset management type BrokerSubscriberSession struct { Topic string Partition int32 Stream mq_pb.SeaweedMessaging_SubscribeMessageClient } // NewBrokerClient creates a client that connects to a SeaweedMQ broker func NewBrokerClient(brokerAddress, filerAddress string) (*BrokerClient, error) { ctx, cancel := context.WithCancel(context.Background()) // Use background context for gRPC connections to prevent them from being canceled // when BrokerClient.Close() is called. This allows subscriber streams to continue // operating even during client shutdown, which is important for testing scenarios. dialCtx := context.Background() // Connect to broker conn, err := grpc.DialContext(dialCtx, brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { cancel() return nil, fmt.Errorf("failed to connect to broker %s: %v", brokerAddress, err) } client := mq_pb.NewSeaweedMessagingClient(conn) // Connect to filer if address provided var filerConn *grpc.ClientConn var filerClient filer_pb.SeaweedFilerClient if filerAddress != "" { filerConn, err = grpc.DialContext(dialCtx, filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { conn.Close() cancel() return nil, fmt.Errorf("failed to connect to filer %s: %v", filerAddress, err) } filerClient = filer_pb.NewSeaweedFilerClient(filerConn) } return &BrokerClient{ brokerAddress: brokerAddress, conn: conn, client: client, filerAddress: filerAddress, filerConn: filerConn, filerClient: filerClient, publishers: make(map[string]*BrokerPublisherSession), subscribers: make(map[string]*BrokerSubscriberSession), ctx: ctx, cancel: cancel, }, nil } // Close shuts down the broker client and all streams func (bc *BrokerClient) Close() error { bc.cancel() // Close all publisher streams bc.publishersLock.Lock() for key := range bc.publishers { delete(bc.publishers, key) } bc.publishersLock.Unlock() // Close all subscriber streams bc.subscribersLock.Lock() for key := range bc.subscribers { delete(bc.subscribers, key) } bc.subscribersLock.Unlock() // Close filer connection if it exists if bc.filerConn != nil { bc.filerConn.Close() } return bc.conn.Close() } // GetFilerAddress returns the filer address used by this broker client func (bc *BrokerClient) GetFilerAddress() string { return bc.filerAddress } // PublishRecord publishes a single record to SeaweedMQ broker func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) { session, err := bc.getOrCreatePublisher(topic, partition) if err != nil { return 0, err } // Send data message using broker API format dataMsg := &mq_pb.DataMessage{ Key: key, Value: value, TsNs: timestamp, } if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ Data: dataMsg, }, }); err != nil { return 0, fmt.Errorf("failed to send data: %v", err) } // Read acknowledgment resp, err := session.Stream.Recv() if err != nil { return 0, fmt.Errorf("failed to receive ack: %v", err) } if resp.Error != "" { return 0, fmt.Errorf("publish error: %s", resp.Error) } return resp.AckSequence, nil } // getOrCreatePublisher gets or creates a publisher stream for a topic-partition func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*BrokerPublisherSession, error) { key := fmt.Sprintf("%s-%d", topic, partition) // Try to get existing publisher bc.publishersLock.RLock() if session, exists := bc.publishers[key]; exists { bc.publishersLock.RUnlock() return session, nil } bc.publishersLock.RUnlock() // Create new publisher stream bc.publishersLock.Lock() defer bc.publishersLock.Unlock() // Double-check after acquiring write lock if session, exists := bc.publishers[key]; exists { return session, nil } // Create the stream stream, err := bc.client.PublishMessage(bc.ctx) if err != nil { return nil, fmt.Errorf("failed to create publish stream: %v", err) } // Get the actual partition assignment from the broker instead of using Kafka partition mapping actualPartition, err := bc.getActualPartitionAssignment(topic, partition) if err != nil { return nil, fmt.Errorf("failed to get actual partition assignment: %v", err) } // Send init message using the actual partition structure that the broker allocated if err := stream.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ Topic: &schema_pb.Topic{ Namespace: "kafka", Name: topic, }, Partition: actualPartition, AckInterval: 100, PublisherName: "kafka-gateway", }, }, }); err != nil { return nil, fmt.Errorf("failed to send init message: %v", err) } session := &BrokerPublisherSession{ Topic: topic, Partition: partition, Stream: stream, } bc.publishers[key] = session return session, nil } // getActualPartitionAssignment looks up the actual partition assignment from the broker configuration func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartition int32) (*schema_pb.Partition, error) { // Look up the topic configuration from the broker to get the actual partition assignments lookupResp, err := bc.client.LookupTopicBrokers(bc.ctx, &mq_pb.LookupTopicBrokersRequest{ Topic: &schema_pb.Topic{ Namespace: "kafka", Name: topic, }, }) if err != nil { return nil, fmt.Errorf("failed to lookup topic brokers: %v", err) } if len(lookupResp.BrokerPartitionAssignments) == 0 { return nil, fmt.Errorf("no partition assignments found for topic %s", topic) } totalPartitions := int32(len(lookupResp.BrokerPartitionAssignments)) if kafkaPartition >= totalPartitions { return nil, fmt.Errorf("kafka partition %d out of range, topic %s has %d partitions", kafkaPartition, topic, totalPartitions) } // Calculate expected range for this Kafka partition // Ring is divided equally among partitions, with last partition getting any remainder const ringSize = int32(2520) // MaxPartitionCount constant rangeSize := ringSize / totalPartitions expectedRangeStart := kafkaPartition * rangeSize var expectedRangeStop int32 if kafkaPartition == totalPartitions-1 { // Last partition gets the remainder to fill the entire ring expectedRangeStop = ringSize } else { expectedRangeStop = (kafkaPartition + 1) * rangeSize } glog.V(2).Infof("🔍 Looking for Kafka partition %d in topic %s: expected range [%d, %d] out of %d partitions", kafkaPartition, topic, expectedRangeStart, expectedRangeStop, totalPartitions) // Find the broker assignment that matches this range for _, assignment := range lookupResp.BrokerPartitionAssignments { if assignment.Partition == nil { continue } // Check if this assignment's range matches our expected range if assignment.Partition.RangeStart == expectedRangeStart && assignment.Partition.RangeStop == expectedRangeStop { glog.V(1).Infof("🎯 Found matching partition assignment for %s[%d]: {RingSize: %d, RangeStart: %d, RangeStop: %d, UnixTimeNs: %d}", topic, kafkaPartition, assignment.Partition.RingSize, assignment.Partition.RangeStart, assignment.Partition.RangeStop, assignment.Partition.UnixTimeNs) return assignment.Partition, nil } } // If no exact match found, log all available assignments for debugging glog.Warningf("❌ No partition assignment found for Kafka partition %d in topic %s with expected range [%d, %d]", kafkaPartition, topic, expectedRangeStart, expectedRangeStop) glog.Warningf("Available assignments:") for i, assignment := range lookupResp.BrokerPartitionAssignments { if assignment.Partition != nil { glog.Warningf(" Assignment[%d]: {RangeStart: %d, RangeStop: %d, RingSize: %d}", i, assignment.Partition.RangeStart, assignment.Partition.RangeStop, assignment.Partition.RingSize) } } return nil, fmt.Errorf("no broker assignment found for Kafka partition %d with expected range [%d, %d]", kafkaPartition, expectedRangeStart, expectedRangeStop) } // GetOrCreateSubscriber gets or creates a subscriber for offset tracking func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64) (*BrokerSubscriberSession, error) { key := fmt.Sprintf("%s-%d", topic, partition) bc.subscribersLock.RLock() if session, exists := bc.subscribers[key]; exists { bc.subscribersLock.RUnlock() return session, nil } bc.subscribersLock.RUnlock() // Create new subscriber stream bc.subscribersLock.Lock() defer bc.subscribersLock.Unlock() if session, exists := bc.subscribers[key]; exists { return session, nil } // Create a dedicated context for this subscriber that won't be canceled with the main BrokerClient context // This prevents subscriber streams from being canceled when BrokerClient.Close() is called during test cleanup subscriberCtx := context.Background() // Use background context instead of bc.ctx stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { return nil, fmt.Errorf("failed to create subscribe stream: %v", err) } // Get the actual partition assignment from the broker instead of using Kafka partition mapping actualPartition, err := bc.getActualPartitionAssignment(topic, partition) if err != nil { return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) } // Convert Kafka offset to appropriate SeaweedMQ OffsetType and parameters var offsetType schema_pb.OffsetType var startTimestamp int64 var startOffsetValue int64 if startOffset == 0 { // For Kafka offset 0 (read from beginning), use RESET_TO_EARLIEST offsetType = schema_pb.OffsetType_RESET_TO_EARLIEST startTimestamp = 0 // Not used with RESET_TO_EARLIEST startOffsetValue = 0 // Not used with RESET_TO_EARLIEST glog.V(1).Infof("Using RESET_TO_EARLIEST for Kafka offset 0") } else if startOffset == -1 { // Kafka offset -1 typically means "latest" offsetType = schema_pb.OffsetType_RESET_TO_LATEST startTimestamp = 0 // Not used with RESET_TO_LATEST startOffsetValue = 0 // Not used with RESET_TO_LATEST glog.V(1).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") } else { // For specific offsets, use native SeaweedMQ offset-based positioning offsetType = schema_pb.OffsetType_EXACT_OFFSET startTimestamp = 0 // Not used with EXACT_OFFSET startOffsetValue = startOffset // Use the Kafka offset directly glog.V(1).Infof("Using EXACT_OFFSET for Kafka offset %d (native offset-based positioning)", startOffset) } glog.V(1).Infof("🔍 Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)", topic, partition, startOffset, offsetType, startTimestamp) // Send init message using the actual partition structure that the broker allocated if err := stream.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ ConsumerGroup: "kafka-gateway", ConsumerId: fmt.Sprintf("kafka-gateway-%s-%d", topic, partition), ClientId: "kafka-gateway", Topic: &schema_pb.Topic{ Namespace: "kafka", Name: topic, }, PartitionOffset: &schema_pb.PartitionOffset{ Partition: actualPartition, StartTsNs: startTimestamp, StartOffset: startOffsetValue, }, OffsetType: offsetType, // Use the correct offset type SlidingWindowSize: 10, }, }, }); err != nil { return nil, fmt.Errorf("failed to send subscribe init: %v", err) } session := &BrokerSubscriberSession{ Topic: topic, Partition: partition, Stream: stream, } bc.subscribers[key] = session return session, nil } // ReadRecords reads available records from the subscriber stream func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords int) ([]*SeaweedRecord, error) { if session == nil { return nil, fmt.Errorf("subscriber session cannot be nil") } var records []*SeaweedRecord for len(records) < maxRecords { resp, err := session.Stream.Recv() if err != nil { // If we have some records, return them; otherwise return error if len(records) > 0 { return records, nil } return nil, fmt.Errorf("failed to receive record: %v", err) } if dataMsg := resp.GetData(); dataMsg != nil { record := &SeaweedRecord{ Key: dataMsg.Key, Value: dataMsg.Value, Timestamp: dataMsg.TsNs, Sequence: 0, // Will be set by offset ledger } records = append(records, record) } } return records, nil } // HealthCheck verifies the broker connection is working func (bc *BrokerClient) HealthCheck() error { // Create a timeout context for health check ctx, cancel := context.WithTimeout(bc.ctx, 2*time.Second) defer cancel() // Try to list topics as a health check _, err := bc.client.ListTopics(ctx, &mq_pb.ListTopicsRequest{}) if err != nil { return fmt.Errorf("broker health check failed: %v", err) } return nil } // ClosePublisher closes a specific publisher session func (bc *BrokerClient) ClosePublisher(topic string, partition int32) error { key := fmt.Sprintf("%s-%d", topic, partition) bc.publishersLock.Lock() defer bc.publishersLock.Unlock() session, exists := bc.publishers[key] if !exists { return nil // Already closed or never existed } if session.Stream != nil { session.Stream.CloseSend() } delete(bc.publishers, key) return nil }