mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Fix NewPersistentLedger calls (returns 1 value, not 2) - Fix GetStats calls (returns 3 values, not 4) - Remove error handling for NewPersistentLedger since it doesn't return errors - All Kafka integration modules now compile successfully
326 lines
8.5 KiB
Go
326 lines
8.5 KiB
Go
package integration
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// PersistentKafkaHandler integrates Kafka protocol with persistent SMQ storage
|
|
type PersistentKafkaHandler struct {
|
|
brokers []string
|
|
|
|
// SMQ integration components
|
|
publisher *SMQPublisher
|
|
subscriber *SMQSubscriber
|
|
|
|
// Offset storage
|
|
offsetStorage *offset.SMQOffsetStorage
|
|
|
|
// Topic registry
|
|
topicsMu sync.RWMutex
|
|
topics map[string]*TopicInfo
|
|
|
|
// Ledgers for offset tracking (persistent)
|
|
ledgersMu sync.RWMutex
|
|
ledgers map[string]*offset.PersistentLedger // key: topic-partition
|
|
}
|
|
|
|
// TopicInfo holds information about a Kafka topic
|
|
type TopicInfo struct {
|
|
Name string
|
|
Partitions int32
|
|
CreatedAt int64
|
|
RecordType *schema_pb.RecordType
|
|
}
|
|
|
|
// NewPersistentKafkaHandler creates a new handler with full SMQ integration
|
|
func NewPersistentKafkaHandler(brokers []string) (*PersistentKafkaHandler, error) {
|
|
// Create SMQ publisher
|
|
publisher, err := NewSMQPublisher(brokers)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create SMQ publisher: %w", err)
|
|
}
|
|
|
|
// Create SMQ subscriber
|
|
subscriber, err := NewSMQSubscriber(brokers)
|
|
if err != nil {
|
|
publisher.Close()
|
|
return nil, fmt.Errorf("failed to create SMQ subscriber: %w", err)
|
|
}
|
|
|
|
// Create offset storage
|
|
// Use first broker as filer address for offset storage
|
|
filerAddress := brokers[0]
|
|
offsetStorage, err := offset.NewSMQOffsetStorage(filerAddress)
|
|
if err != nil {
|
|
publisher.Close()
|
|
subscriber.Close()
|
|
return nil, fmt.Errorf("failed to create offset storage: %w", err)
|
|
}
|
|
|
|
return &PersistentKafkaHandler{
|
|
brokers: brokers,
|
|
publisher: publisher,
|
|
subscriber: subscriber,
|
|
offsetStorage: offsetStorage,
|
|
topics: make(map[string]*TopicInfo),
|
|
ledgers: make(map[string]*offset.PersistentLedger),
|
|
}, nil
|
|
}
|
|
|
|
// ProduceMessage handles Kafka produce requests with persistent offset tracking
|
|
func (h *PersistentKafkaHandler) ProduceMessage(
|
|
topic string,
|
|
partition int32,
|
|
key []byte,
|
|
value *schema_pb.RecordValue,
|
|
recordType *schema_pb.RecordType,
|
|
) (int64, error) {
|
|
|
|
// Ensure topic exists
|
|
if err := h.ensureTopicExists(topic, recordType); err != nil {
|
|
return -1, fmt.Errorf("failed to ensure topic exists: %w", err)
|
|
}
|
|
|
|
// Publish to SMQ with offset tracking
|
|
kafkaOffset, err := h.publisher.PublishMessage(topic, partition, key, value, recordType)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("failed to publish message: %w", err)
|
|
}
|
|
|
|
return kafkaOffset, nil
|
|
}
|
|
|
|
// FetchMessages handles Kafka fetch requests with SMQ subscription
|
|
func (h *PersistentKafkaHandler) FetchMessages(
|
|
topic string,
|
|
partition int32,
|
|
fetchOffset int64,
|
|
maxBytes int32,
|
|
consumerGroup string,
|
|
) ([]*KafkaMessage, error) {
|
|
|
|
// Fetch messages from SMQ subscriber
|
|
messages, err := h.subscriber.FetchMessages(topic, partition, fetchOffset, maxBytes, consumerGroup)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch messages: %w", err)
|
|
}
|
|
|
|
return messages, nil
|
|
}
|
|
|
|
// GetOrCreateLedger returns a persistent ledger for the topic-partition
|
|
func (h *PersistentKafkaHandler) GetOrCreateLedger(topic string, partition int32) (*offset.PersistentLedger, error) {
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
|
|
h.ledgersMu.RLock()
|
|
if ledger, exists := h.ledgers[key]; exists {
|
|
h.ledgersMu.RUnlock()
|
|
return ledger, nil
|
|
}
|
|
h.ledgersMu.RUnlock()
|
|
|
|
h.ledgersMu.Lock()
|
|
defer h.ledgersMu.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if ledger, exists := h.ledgers[key]; exists {
|
|
return ledger, nil
|
|
}
|
|
|
|
// Create persistent ledger
|
|
ledger := offset.NewPersistentLedger(key, h.offsetStorage)
|
|
|
|
h.ledgers[key] = ledger
|
|
return ledger, nil
|
|
}
|
|
|
|
// GetLedger returns the ledger for a topic-partition (may be nil)
|
|
func (h *PersistentKafkaHandler) GetLedger(topic string, partition int32) *offset.PersistentLedger {
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
|
|
h.ledgersMu.RLock()
|
|
defer h.ledgersMu.RUnlock()
|
|
|
|
return h.ledgers[key]
|
|
}
|
|
|
|
// CreateTopic creates a new Kafka topic
|
|
func (h *PersistentKafkaHandler) CreateTopic(name string, partitions int32, recordType *schema_pb.RecordType) error {
|
|
h.topicsMu.Lock()
|
|
defer h.topicsMu.Unlock()
|
|
|
|
if _, exists := h.topics[name]; exists {
|
|
return nil // Topic already exists
|
|
}
|
|
|
|
h.topics[name] = &TopicInfo{
|
|
Name: name,
|
|
Partitions: partitions,
|
|
CreatedAt: getCurrentTimeNanos(),
|
|
RecordType: recordType,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TopicExists checks if a topic exists
|
|
func (h *PersistentKafkaHandler) TopicExists(name string) bool {
|
|
h.topicsMu.RLock()
|
|
defer h.topicsMu.RUnlock()
|
|
|
|
_, exists := h.topics[name]
|
|
return exists
|
|
}
|
|
|
|
// GetTopicInfo returns information about a topic
|
|
func (h *PersistentKafkaHandler) GetTopicInfo(name string) *TopicInfo {
|
|
h.topicsMu.RLock()
|
|
defer h.topicsMu.RUnlock()
|
|
|
|
return h.topics[name]
|
|
}
|
|
|
|
// ListTopics returns all topic names
|
|
func (h *PersistentKafkaHandler) ListTopics() []string {
|
|
h.topicsMu.RLock()
|
|
defer h.topicsMu.RUnlock()
|
|
|
|
topics := make([]string, 0, len(h.topics))
|
|
for name := range h.topics {
|
|
topics = append(topics, name)
|
|
}
|
|
return topics
|
|
}
|
|
|
|
// GetHighWaterMark returns the high water mark for a topic-partition
|
|
func (h *PersistentKafkaHandler) GetHighWaterMark(topic string, partition int32) (int64, error) {
|
|
ledger, err := h.GetOrCreateLedger(topic, partition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return ledger.GetHighWaterMark(), nil
|
|
}
|
|
|
|
// GetEarliestOffset returns the earliest offset for a topic-partition
|
|
func (h *PersistentKafkaHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
|
|
ledger, err := h.GetOrCreateLedger(topic, partition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return ledger.GetEarliestOffset(), nil
|
|
}
|
|
|
|
// GetLatestOffset returns the latest offset for a topic-partition
|
|
func (h *PersistentKafkaHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
|
|
ledger, err := h.GetOrCreateLedger(topic, partition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return ledger.GetLatestOffset(), nil
|
|
}
|
|
|
|
// CommitOffset commits a consumer group offset
|
|
func (h *PersistentKafkaHandler) CommitOffset(
|
|
topic string,
|
|
partition int32,
|
|
offset int64,
|
|
consumerGroup string,
|
|
) error {
|
|
return h.subscriber.CommitOffset(topic, partition, offset, consumerGroup)
|
|
}
|
|
|
|
// FetchOffset retrieves a committed consumer group offset
|
|
func (h *PersistentKafkaHandler) FetchOffset(
|
|
topic string,
|
|
partition int32,
|
|
consumerGroup string,
|
|
) (int64, error) {
|
|
// For now, return -1 (no committed offset)
|
|
// In a full implementation, this would query SMQ for the committed offset
|
|
return -1, nil
|
|
}
|
|
|
|
// GetStats returns comprehensive statistics about the handler
|
|
func (h *PersistentKafkaHandler) GetStats() map[string]interface{} {
|
|
stats := make(map[string]interface{})
|
|
|
|
// Topic stats
|
|
h.topicsMu.RLock()
|
|
topicStats := make(map[string]interface{})
|
|
for name, info := range h.topics {
|
|
topicStats[name] = map[string]interface{}{
|
|
"partitions": info.Partitions,
|
|
"created_at": info.CreatedAt,
|
|
}
|
|
}
|
|
h.topicsMu.RUnlock()
|
|
|
|
stats["topics"] = topicStats
|
|
stats["topic_count"] = len(topicStats)
|
|
|
|
// Ledger stats
|
|
h.ledgersMu.RLock()
|
|
ledgerStats := make(map[string]interface{})
|
|
for key, ledger := range h.ledgers {
|
|
entryCount, earliestTime, latestTime := ledger.GetStats()
|
|
nextOffset := ledger.GetHighWaterMark()
|
|
ledgerStats[key] = map[string]interface{}{
|
|
"entry_count": entryCount,
|
|
"earliest_time": earliestTime,
|
|
"latest_time": latestTime,
|
|
"next_offset": nextOffset,
|
|
"high_water_mark": ledger.GetHighWaterMark(),
|
|
}
|
|
}
|
|
h.ledgersMu.RUnlock()
|
|
|
|
stats["ledgers"] = ledgerStats
|
|
stats["ledger_count"] = len(ledgerStats)
|
|
|
|
return stats
|
|
}
|
|
|
|
// Close shuts down the handler and all connections
|
|
func (h *PersistentKafkaHandler) Close() error {
|
|
var lastErr error
|
|
|
|
if err := h.publisher.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
if err := h.subscriber.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
if err := h.offsetStorage.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// ensureTopicExists creates a topic if it doesn't exist
|
|
func (h *PersistentKafkaHandler) ensureTopicExists(name string, recordType *schema_pb.RecordType) error {
|
|
if h.TopicExists(name) {
|
|
return nil
|
|
}
|
|
|
|
return h.CreateTopic(name, 1, recordType) // Default to 1 partition
|
|
}
|
|
|
|
// getCurrentTimeNanos returns current time in nanoseconds
|
|
func getCurrentTimeNanos() int64 {
|
|
return time.Now().UnixNano()
|
|
}
|
|
|
|
// RestoreAllLedgers restores all ledgers from persistent storage on startup
|
|
func (h *PersistentKafkaHandler) RestoreAllLedgers() error {
|
|
// This would scan SMQ for all topic-partitions and restore their ledgers
|
|
// For now, ledgers are created on-demand
|
|
return nil
|
|
}
|