package offset import ( "fmt" "strconv" "strings" "sync" ) // LedgerStorage interface for consumer offset persistence type LedgerStorage interface { SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) Close() error } // ConsumerOffsetKey represents the full key for consumer offsets type ConsumerOffsetKey struct { Topic string `json:"topic"` Partition int32 `json:"partition"` ConsumerGroup string `json:"consumer_group"` ConsumerGroupInstance string `json:"consumer_group_instance,omitempty"` // Optional static membership ID } func (k ConsumerOffsetKey) String() string { if k.ConsumerGroupInstance != "" { return fmt.Sprintf("%s:%d:%s:%s", k.Topic, k.Partition, k.ConsumerGroup, k.ConsumerGroupInstance) } return fmt.Sprintf("%s:%d:%s", k.Topic, k.Partition, k.ConsumerGroup) } // OffsetEntry is already defined in ledger.go // SeaweedMQ storage implementation using SeaweedMQ's ledgers type SeaweedMQStorage struct { ledgersMu sync.RWMutex ledgers map[string]*Ledger // key: ConsumerOffsetKey.String() } // NewSeaweedMQStorage creates a SeaweedMQ-compatible storage backend func NewSeaweedMQStorage() *SeaweedMQStorage { return &SeaweedMQStorage{ ledgers: make(map[string]*Ledger), } } func (s *SeaweedMQStorage) SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error { s.ledgersMu.Lock() defer s.ledgersMu.Unlock() keyStr := key.String() ledger, exists := s.ledgers[keyStr] if !exists { ledger = NewLedger() s.ledgers[keyStr] = ledger } return ledger.AppendRecord(kafkaOffset, smqTimestamp, size) } func (s *SeaweedMQStorage) LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) { s.ledgersMu.RLock() defer s.ledgersMu.RUnlock() keyStr := key.String() ledger, exists := s.ledgers[keyStr] if !exists { return []OffsetEntry{}, nil } entries := ledger.GetEntries() result := make([]OffsetEntry, len(entries)) for i, entry := range entries { result[i] = OffsetEntry{ KafkaOffset: entry.KafkaOffset, Timestamp: entry.Timestamp, Size: entry.Size, } } return result, nil } func (s *SeaweedMQStorage) GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) { s.ledgersMu.RLock() defer s.ledgersMu.RUnlock() keyStr := key.String() ledger, exists := s.ledgers[keyStr] if !exists { return 0, nil } return ledger.GetHighWaterMark(), nil } func (s *SeaweedMQStorage) Close() error { s.ledgersMu.Lock() defer s.ledgersMu.Unlock() // Ledgers don't need explicit closing in this implementation s.ledgers = make(map[string]*Ledger) return nil } // parseTopicPartitionToConsumerKey parses a "topic-partition" string to ConsumerOffsetKey func parseTopicPartitionToConsumerKey(topicPartition string) ConsumerOffsetKey { // Default parsing logic for "topic-partition" format // Find the last dash to separate topic from partition lastDash := strings.LastIndex(topicPartition, "-") if lastDash == -1 { // No dash found, assume entire string is topic with partition 0 return ConsumerOffsetKey{ Topic: topicPartition, Partition: 0, ConsumerGroup: "__persistent_ledger__", // Special consumer group for PersistentLedger } } topic := topicPartition[:lastDash] partitionStr := topicPartition[lastDash+1:] partition, err := strconv.Atoi(partitionStr) if err != nil { // If partition parsing fails, assume partition 0 return ConsumerOffsetKey{ Topic: topicPartition, Partition: 0, ConsumerGroup: "__persistent_ledger__", } } return ConsumerOffsetKey{ Topic: topic, Partition: int32(partition), ConsumerGroup: "__persistent_ledger__", // Special consumer group for PersistentLedger } } // PersistentLedger wraps a Ledger with SeaweedMQ persistence type PersistentLedger struct { Ledger *Ledger TopicPartition string ConsumerKey ConsumerOffsetKey Storage LedgerStorage } // NewPersistentLedger creates a new persistent ledger func NewPersistentLedger(topicPartition string, storage LedgerStorage) *PersistentLedger { // Parse topicPartition string to extract topic and partition // Format: "topic-partition" (e.g., "my-topic-0") consumerKey := parseTopicPartitionToConsumerKey(topicPartition) pl := &PersistentLedger{ Ledger: NewLedger(), TopicPartition: topicPartition, ConsumerKey: consumerKey, Storage: storage, } // Load existing mappings using new consumer offset method if entries, err := storage.LoadConsumerOffsets(consumerKey); err == nil { for _, entry := range entries { pl.Ledger.AppendRecord(entry.KafkaOffset, entry.Timestamp, entry.Size) } } return pl } // AddEntry adds an offset mapping and persists it func (pl *PersistentLedger) AddEntry(kafkaOffset, smqTimestamp int64, size int32) error { // Add to memory ledger if err := pl.Ledger.AppendRecord(kafkaOffset, smqTimestamp, size); err != nil { return err } // Persist to storage using new consumer offset method return pl.Storage.SaveConsumerOffset(pl.ConsumerKey, kafkaOffset, smqTimestamp, size) } // GetEntries returns all entries from the ledger func (pl *PersistentLedger) GetEntries() []OffsetEntry { return pl.Ledger.GetEntries() } // AssignOffsets reserves a range of consecutive Kafka offsets func (pl *PersistentLedger) AssignOffsets(count int64) int64 { return pl.Ledger.AssignOffsets(count) } // AppendRecord adds a record to the ledger (compatibility method) func (pl *PersistentLedger) AppendRecord(kafkaOffset, timestamp int64, size int32) error { return pl.AddEntry(kafkaOffset, timestamp, size) } // GetHighWaterMark returns the next offset to be assigned func (pl *PersistentLedger) GetHighWaterMark() int64 { return pl.Ledger.GetHighWaterMark() } // GetEarliestOffset returns the earliest offset in the ledger func (pl *PersistentLedger) GetEarliestOffset() int64 { return pl.Ledger.GetEarliestOffset() } // GetLatestOffset returns the latest offset in the ledger func (pl *PersistentLedger) GetLatestOffset() int64 { return pl.Ledger.GetLatestOffset() } // GetStats returns statistics about the ledger func (pl *PersistentLedger) GetStats() (count int, earliestTime, latestTime int64) { count, _, earliestTime, latestTime = pl.Ledger.GetStats() return count, earliestTime, latestTime }