1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/integration/persistent_handler.go
chrislu 8de1ce5497 Fix compilation errors in integration modules
- Fix NewPersistentLedger calls (returns 1 value, not 2)
- Fix GetStats calls (returns 3 values, not 4)
- Remove error handling for NewPersistentLedger since it doesn't return errors
- All Kafka integration modules now compile successfully
2025-09-12 21:30:14 -07:00

326 lines
8.5 KiB
Go

package integration
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// PersistentKafkaHandler integrates Kafka protocol with persistent SMQ storage
type PersistentKafkaHandler struct {
brokers []string
// SMQ integration components
publisher *SMQPublisher
subscriber *SMQSubscriber
// Offset storage
offsetStorage *offset.SMQOffsetStorage
// Topic registry
topicsMu sync.RWMutex
topics map[string]*TopicInfo
// Ledgers for offset tracking (persistent)
ledgersMu sync.RWMutex
ledgers map[string]*offset.PersistentLedger // key: topic-partition
}
// TopicInfo holds information about a Kafka topic
type TopicInfo struct {
Name string
Partitions int32
CreatedAt int64
RecordType *schema_pb.RecordType
}
// NewPersistentKafkaHandler creates a new handler with full SMQ integration
func NewPersistentKafkaHandler(brokers []string) (*PersistentKafkaHandler, error) {
// Create SMQ publisher
publisher, err := NewSMQPublisher(brokers)
if err != nil {
return nil, fmt.Errorf("failed to create SMQ publisher: %w", err)
}
// Create SMQ subscriber
subscriber, err := NewSMQSubscriber(brokers)
if err != nil {
publisher.Close()
return nil, fmt.Errorf("failed to create SMQ subscriber: %w", err)
}
// Create offset storage
// Use first broker as filer address for offset storage
filerAddress := brokers[0]
offsetStorage, err := offset.NewSMQOffsetStorage(filerAddress)
if err != nil {
publisher.Close()
subscriber.Close()
return nil, fmt.Errorf("failed to create offset storage: %w", err)
}
return &PersistentKafkaHandler{
brokers: brokers,
publisher: publisher,
subscriber: subscriber,
offsetStorage: offsetStorage,
topics: make(map[string]*TopicInfo),
ledgers: make(map[string]*offset.PersistentLedger),
}, nil
}
// ProduceMessage handles Kafka produce requests with persistent offset tracking
func (h *PersistentKafkaHandler) ProduceMessage(
topic string,
partition int32,
key []byte,
value *schema_pb.RecordValue,
recordType *schema_pb.RecordType,
) (int64, error) {
// Ensure topic exists
if err := h.ensureTopicExists(topic, recordType); err != nil {
return -1, fmt.Errorf("failed to ensure topic exists: %w", err)
}
// Publish to SMQ with offset tracking
kafkaOffset, err := h.publisher.PublishMessage(topic, partition, key, value, recordType)
if err != nil {
return -1, fmt.Errorf("failed to publish message: %w", err)
}
return kafkaOffset, nil
}
// FetchMessages handles Kafka fetch requests with SMQ subscription
func (h *PersistentKafkaHandler) FetchMessages(
topic string,
partition int32,
fetchOffset int64,
maxBytes int32,
consumerGroup string,
) ([]*KafkaMessage, error) {
// Fetch messages from SMQ subscriber
messages, err := h.subscriber.FetchMessages(topic, partition, fetchOffset, maxBytes, consumerGroup)
if err != nil {
return nil, fmt.Errorf("failed to fetch messages: %w", err)
}
return messages, nil
}
// GetOrCreateLedger returns a persistent ledger for the topic-partition
func (h *PersistentKafkaHandler) GetOrCreateLedger(topic string, partition int32) (*offset.PersistentLedger, error) {
key := fmt.Sprintf("%s-%d", topic, partition)
h.ledgersMu.RLock()
if ledger, exists := h.ledgers[key]; exists {
h.ledgersMu.RUnlock()
return ledger, nil
}
h.ledgersMu.RUnlock()
h.ledgersMu.Lock()
defer h.ledgersMu.Unlock()
// Double-check after acquiring write lock
if ledger, exists := h.ledgers[key]; exists {
return ledger, nil
}
// Create persistent ledger
ledger := offset.NewPersistentLedger(key, h.offsetStorage)
h.ledgers[key] = ledger
return ledger, nil
}
// GetLedger returns the ledger for a topic-partition (may be nil)
func (h *PersistentKafkaHandler) GetLedger(topic string, partition int32) *offset.PersistentLedger {
key := fmt.Sprintf("%s-%d", topic, partition)
h.ledgersMu.RLock()
defer h.ledgersMu.RUnlock()
return h.ledgers[key]
}
// CreateTopic creates a new Kafka topic
func (h *PersistentKafkaHandler) CreateTopic(name string, partitions int32, recordType *schema_pb.RecordType) error {
h.topicsMu.Lock()
defer h.topicsMu.Unlock()
if _, exists := h.topics[name]; exists {
return nil // Topic already exists
}
h.topics[name] = &TopicInfo{
Name: name,
Partitions: partitions,
CreatedAt: getCurrentTimeNanos(),
RecordType: recordType,
}
return nil
}
// TopicExists checks if a topic exists
func (h *PersistentKafkaHandler) TopicExists(name string) bool {
h.topicsMu.RLock()
defer h.topicsMu.RUnlock()
_, exists := h.topics[name]
return exists
}
// GetTopicInfo returns information about a topic
func (h *PersistentKafkaHandler) GetTopicInfo(name string) *TopicInfo {
h.topicsMu.RLock()
defer h.topicsMu.RUnlock()
return h.topics[name]
}
// ListTopics returns all topic names
func (h *PersistentKafkaHandler) ListTopics() []string {
h.topicsMu.RLock()
defer h.topicsMu.RUnlock()
topics := make([]string, 0, len(h.topics))
for name := range h.topics {
topics = append(topics, name)
}
return topics
}
// GetHighWaterMark returns the high water mark for a topic-partition
func (h *PersistentKafkaHandler) GetHighWaterMark(topic string, partition int32) (int64, error) {
ledger, err := h.GetOrCreateLedger(topic, partition)
if err != nil {
return 0, err
}
return ledger.GetHighWaterMark(), nil
}
// GetEarliestOffset returns the earliest offset for a topic-partition
func (h *PersistentKafkaHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
ledger, err := h.GetOrCreateLedger(topic, partition)
if err != nil {
return 0, err
}
return ledger.GetEarliestOffset(), nil
}
// GetLatestOffset returns the latest offset for a topic-partition
func (h *PersistentKafkaHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
ledger, err := h.GetOrCreateLedger(topic, partition)
if err != nil {
return 0, err
}
return ledger.GetLatestOffset(), nil
}
// CommitOffset commits a consumer group offset
func (h *PersistentKafkaHandler) CommitOffset(
topic string,
partition int32,
offset int64,
consumerGroup string,
) error {
return h.subscriber.CommitOffset(topic, partition, offset, consumerGroup)
}
// FetchOffset retrieves a committed consumer group offset
func (h *PersistentKafkaHandler) FetchOffset(
topic string,
partition int32,
consumerGroup string,
) (int64, error) {
// For now, return -1 (no committed offset)
// In a full implementation, this would query SMQ for the committed offset
return -1, nil
}
// GetStats returns comprehensive statistics about the handler
func (h *PersistentKafkaHandler) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
// Topic stats
h.topicsMu.RLock()
topicStats := make(map[string]interface{})
for name, info := range h.topics {
topicStats[name] = map[string]interface{}{
"partitions": info.Partitions,
"created_at": info.CreatedAt,
}
}
h.topicsMu.RUnlock()
stats["topics"] = topicStats
stats["topic_count"] = len(topicStats)
// Ledger stats
h.ledgersMu.RLock()
ledgerStats := make(map[string]interface{})
for key, ledger := range h.ledgers {
entryCount, earliestTime, latestTime := ledger.GetStats()
nextOffset := ledger.GetHighWaterMark()
ledgerStats[key] = map[string]interface{}{
"entry_count": entryCount,
"earliest_time": earliestTime,
"latest_time": latestTime,
"next_offset": nextOffset,
"high_water_mark": ledger.GetHighWaterMark(),
}
}
h.ledgersMu.RUnlock()
stats["ledgers"] = ledgerStats
stats["ledger_count"] = len(ledgerStats)
return stats
}
// Close shuts down the handler and all connections
func (h *PersistentKafkaHandler) Close() error {
var lastErr error
if err := h.publisher.Close(); err != nil {
lastErr = err
}
if err := h.subscriber.Close(); err != nil {
lastErr = err
}
if err := h.offsetStorage.Close(); err != nil {
lastErr = err
}
return lastErr
}
// ensureTopicExists creates a topic if it doesn't exist
func (h *PersistentKafkaHandler) ensureTopicExists(name string, recordType *schema_pb.RecordType) error {
if h.TopicExists(name) {
return nil
}
return h.CreateTopic(name, 1, recordType) // Default to 1 partition
}
// getCurrentTimeNanos returns current time in nanoseconds
func getCurrentTimeNanos() int64 {
return time.Now().UnixNano()
}
// RestoreAllLedgers restores all ledgers from persistent storage on startup
func (h *PersistentKafkaHandler) RestoreAllLedgers() error {
// This would scan SMQ for all topic-partitions and restore their ledgers
// For now, ledgers are created on-demand
return nil
}