1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/integration/smq_subscriber.go
chrislu 8de1ce5497 Fix compilation errors in integration modules
- Fix NewPersistentLedger calls (returns 1 value, not 2)
- Fix GetStats calls (returns 3 values, not 4)
- Remove error handling for NewPersistentLedger since it doesn't return errors
- All Kafka integration modules now compile successfully
2025-09-12 21:30:14 -07:00

404 lines
11 KiB
Go

package integration
import (
"context"
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
)
// SMQSubscriber handles subscribing to SeaweedMQ messages for Kafka fetch requests
type SMQSubscriber struct {
brokers []string
grpcDialOption grpc.DialOption
ctx context.Context
// Active subscriptions
subscriptionsLock sync.RWMutex
subscriptions map[string]*SubscriptionWrapper // key: topic-partition-consumerGroup
// Offset mapping
offsetMapper *offset.KafkaToSMQMapper
offsetStorage *offset.SMQOffsetStorage
}
// SubscriptionWrapper wraps a SMQ subscription with Kafka-specific metadata
type SubscriptionWrapper struct {
subscriber *sub_client.TopicSubscriber
kafkaTopic string
kafkaPartition int32
consumerGroup string
startOffset int64
// Message buffer for Kafka fetch responses
messageBuffer chan *KafkaMessage
isActive bool
createdAt time.Time
// Offset tracking
ledger *offset.PersistentLedger
lastFetchedOffset int64
}
// KafkaMessage represents a message converted from SMQ to Kafka format
type KafkaMessage struct {
Key []byte
Value []byte
Offset int64
Partition int32
Timestamp int64
Headers map[string][]byte
// Original SMQ data for reference
SMQTimestamp int64
SMQRecord *schema_pb.RecordValue
}
// NewSMQSubscriber creates a new SMQ subscriber for Kafka messages
func NewSMQSubscriber(brokers []string) (*SMQSubscriber, error) {
// Create offset storage
// Use first broker as filer address for offset storage
filerAddress := brokers[0]
offsetStorage, err := offset.NewSMQOffsetStorage(filerAddress)
if err != nil {
return nil, fmt.Errorf("failed to create offset storage: %w", err)
}
return &SMQSubscriber{
brokers: brokers,
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
ctx: context.Background(),
subscriptions: make(map[string]*SubscriptionWrapper),
offsetStorage: offsetStorage,
}, nil
}
// Subscribe creates a subscription for Kafka fetch requests
func (s *SMQSubscriber) Subscribe(
kafkaTopic string,
kafkaPartition int32,
startOffset int64,
consumerGroup string,
) (*SubscriptionWrapper, error) {
key := fmt.Sprintf("%s-%d-%s", kafkaTopic, kafkaPartition, consumerGroup)
s.subscriptionsLock.Lock()
defer s.subscriptionsLock.Unlock()
// Check if subscription already exists
if existing, exists := s.subscriptions[key]; exists {
return existing, nil
}
// Create persistent ledger for offset mapping
ledgerKey := fmt.Sprintf("%s-%d", kafkaTopic, kafkaPartition)
ledger := offset.NewPersistentLedger(ledgerKey, s.offsetStorage)
// Create offset mapper
offsetMapper := offset.NewKafkaToSMQMapper(ledger.Ledger)
// Convert Kafka offset to SMQ PartitionOffset
partitionOffset, offsetType, err := offsetMapper.CreateSMQSubscriptionRequest(
kafkaTopic, kafkaPartition, startOffset, consumerGroup)
if err != nil {
return nil, fmt.Errorf("failed to create SMQ subscription request: %w", err)
}
// Create SMQ subscriber configuration
subscriberConfig := &sub_client.SubscriberConfiguration{
ConsumerGroup: fmt.Sprintf("kafka-%s", consumerGroup),
ConsumerGroupInstanceId: fmt.Sprintf("kafka-%s-%s-%d", consumerGroup, kafkaTopic, kafkaPartition),
GrpcDialOption: s.grpcDialOption,
MaxPartitionCount: 1,
SlidingWindowSize: 100,
}
contentConfig := &sub_client.ContentConfiguration{
Topic: topic.NewTopic("kafka", kafkaTopic),
PartitionOffsets: []*schema_pb.PartitionOffset{partitionOffset},
OffsetType: offsetType,
}
// Create SMQ subscriber
subscriber := sub_client.NewTopicSubscriber(
s.ctx,
s.brokers,
subscriberConfig,
contentConfig,
make(chan sub_client.KeyedOffset, 100),
)
// Create subscription wrapper
wrapper := &SubscriptionWrapper{
subscriber: subscriber,
kafkaTopic: kafkaTopic,
kafkaPartition: kafkaPartition,
consumerGroup: consumerGroup,
startOffset: startOffset,
messageBuffer: make(chan *KafkaMessage, 1000),
isActive: true,
createdAt: time.Now(),
ledger: ledger,
lastFetchedOffset: startOffset - 1,
}
// Set up message handler
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
kafkaMsg := s.convertSMQToKafkaMessage(m, wrapper)
if kafkaMsg != nil {
select {
case wrapper.messageBuffer <- kafkaMsg:
wrapper.lastFetchedOffset = kafkaMsg.Offset
default:
// Buffer full, drop message (or implement backpressure)
}
}
})
// Start subscription in background
go func() {
if err := subscriber.Subscribe(); err != nil {
fmt.Printf("SMQ subscription error for %s: %v\n", key, err)
}
}()
s.subscriptions[key] = wrapper
return wrapper, nil
}
// FetchMessages retrieves messages for a Kafka fetch request
func (s *SMQSubscriber) FetchMessages(
kafkaTopic string,
kafkaPartition int32,
fetchOffset int64,
maxBytes int32,
consumerGroup string,
) ([]*KafkaMessage, error) {
key := fmt.Sprintf("%s-%d-%s", kafkaTopic, kafkaPartition, consumerGroup)
s.subscriptionsLock.RLock()
wrapper, exists := s.subscriptions[key]
s.subscriptionsLock.RUnlock()
if !exists {
// Create subscription if it doesn't exist
var err error
wrapper, err = s.Subscribe(kafkaTopic, kafkaPartition, fetchOffset, consumerGroup)
if err != nil {
return nil, fmt.Errorf("failed to create subscription: %w", err)
}
}
// Collect messages from buffer
var messages []*KafkaMessage
var totalBytes int32 = 0
timeout := time.After(100 * time.Millisecond) // Short timeout for fetch
for totalBytes < maxBytes && len(messages) < 1000 {
select {
case msg := <-wrapper.messageBuffer:
// Only include messages at or after the requested offset
if msg.Offset >= fetchOffset {
messages = append(messages, msg)
totalBytes += int32(len(msg.Key) + len(msg.Value) + 50) // Estimate overhead
}
case <-timeout:
// Timeout reached, return what we have
goto done
}
}
done:
return messages, nil
}
// convertSMQToKafkaMessage converts a SMQ message to Kafka format
func (s *SMQSubscriber) convertSMQToKafkaMessage(
smqMsg *mq_pb.SubscribeMessageResponse_Data,
wrapper *SubscriptionWrapper,
) *KafkaMessage {
// Unmarshal SMQ record
record := &schema_pb.RecordValue{}
if err := proto.Unmarshal(smqMsg.Data.Value, record); err != nil {
return nil
}
// Extract Kafka metadata from the record
kafkaOffsetField := record.Fields["_kafka_offset"]
kafkaPartitionField := record.Fields["_kafka_partition"]
kafkaTimestampField := record.Fields["_kafka_timestamp"]
if kafkaOffsetField == nil || kafkaPartitionField == nil {
// This might be a non-Kafka message, skip it
return nil
}
kafkaOffset := kafkaOffsetField.GetInt64Value()
kafkaPartition := kafkaPartitionField.GetInt32Value()
kafkaTimestamp := smqMsg.Data.TsNs
if kafkaTimestampField != nil {
kafkaTimestamp = kafkaTimestampField.GetInt64Value()
}
// Extract original message content (remove Kafka metadata)
originalRecord := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
for key, value := range record.Fields {
if !isKafkaMetadataField(key) {
originalRecord.Fields[key] = value
}
}
// Convert record back to bytes for Kafka
valueBytes, err := proto.Marshal(originalRecord)
if err != nil {
return nil
}
return &KafkaMessage{
Key: smqMsg.Data.Key,
Value: valueBytes,
Offset: kafkaOffset,
Partition: kafkaPartition,
Timestamp: kafkaTimestamp,
Headers: make(map[string][]byte),
SMQTimestamp: smqMsg.Data.TsNs,
SMQRecord: record,
}
}
// isKafkaMetadataField checks if a field is Kafka metadata
func isKafkaMetadataField(fieldName string) bool {
return fieldName == "_kafka_offset" ||
fieldName == "_kafka_partition" ||
fieldName == "_kafka_timestamp"
}
// GetSubscriptionStats returns statistics for a subscription
func (s *SMQSubscriber) GetSubscriptionStats(
kafkaTopic string,
kafkaPartition int32,
consumerGroup string,
) map[string]interface{} {
key := fmt.Sprintf("%s-%d-%s", kafkaTopic, kafkaPartition, consumerGroup)
s.subscriptionsLock.RLock()
wrapper, exists := s.subscriptions[key]
s.subscriptionsLock.RUnlock()
if !exists {
return map[string]interface{}{"exists": false}
}
return map[string]interface{}{
"exists": true,
"kafka_topic": wrapper.kafkaTopic,
"kafka_partition": wrapper.kafkaPartition,
"consumer_group": wrapper.consumerGroup,
"start_offset": wrapper.startOffset,
"last_fetched_offset": wrapper.lastFetchedOffset,
"buffer_size": len(wrapper.messageBuffer),
"is_active": wrapper.isActive,
"created_at": wrapper.createdAt,
}
}
// CommitOffset commits a consumer offset
func (s *SMQSubscriber) CommitOffset(
kafkaTopic string,
kafkaPartition int32,
offset int64,
consumerGroup string,
) error {
key := fmt.Sprintf("%s-%d-%s", kafkaTopic, kafkaPartition, consumerGroup)
s.subscriptionsLock.RLock()
wrapper, exists := s.subscriptions[key]
s.subscriptionsLock.RUnlock()
if !exists {
return fmt.Errorf("subscription not found: %s", key)
}
// Update the subscription's committed offset
// In a full implementation, this would persist the offset to SMQ
wrapper.lastFetchedOffset = offset
return nil
}
// CloseSubscription closes a specific subscription
func (s *SMQSubscriber) CloseSubscription(
kafkaTopic string,
kafkaPartition int32,
consumerGroup string,
) error {
key := fmt.Sprintf("%s-%d-%s", kafkaTopic, kafkaPartition, consumerGroup)
s.subscriptionsLock.Lock()
defer s.subscriptionsLock.Unlock()
wrapper, exists := s.subscriptions[key]
if !exists {
return nil // Already closed
}
wrapper.isActive = false
close(wrapper.messageBuffer)
delete(s.subscriptions, key)
return nil
}
// Close shuts down all subscriptions
func (s *SMQSubscriber) Close() error {
s.subscriptionsLock.Lock()
defer s.subscriptionsLock.Unlock()
for key, wrapper := range s.subscriptions {
wrapper.isActive = false
close(wrapper.messageBuffer)
delete(s.subscriptions, key)
}
return s.offsetStorage.Close()
}
// GetHighWaterMark returns the high water mark for a topic-partition
func (s *SMQSubscriber) GetHighWaterMark(kafkaTopic string, kafkaPartition int32) (int64, error) {
ledgerKey := fmt.Sprintf("%s-%d", kafkaTopic, kafkaPartition)
return s.offsetStorage.GetHighWaterMark(ledgerKey)
}
// GetEarliestOffset returns the earliest available offset for a topic-partition
func (s *SMQSubscriber) GetEarliestOffset(kafkaTopic string, kafkaPartition int32) (int64, error) {
ledgerKey := fmt.Sprintf("%s-%d", kafkaTopic, kafkaPartition)
entries, err := s.offsetStorage.LoadOffsetMappings(ledgerKey)
if err != nil {
return 0, err
}
if len(entries) == 0 {
return 0, nil
}
return entries[0].KafkaOffset, nil
}