mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Replace hardcoded partition ranges with centralized kafka.CreateSMQPartition - Use proper SMQ ranges instead of single partition numbers in storage paths - All offset mapping tests now pass with consistent 32-slot ranges - Remove unused pub_balancer import
206 lines
6.4 KiB
Go
206 lines
6.4 KiB
Go
package offset
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer_client"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// SMQOffsetStorage implements LedgerStorage using SMQ's native offset persistence
|
|
// This reuses the same filer locations and file format that SMQ brokers use
|
|
type SMQOffsetStorage struct {
|
|
filerClientAccessor *filer_client.FilerClientAccessor
|
|
}
|
|
|
|
// NewSMQOffsetStorage creates a storage backend that uses SMQ's native offset files
|
|
func NewSMQOffsetStorage(filerAddress string) (*SMQOffsetStorage, error) {
|
|
filerClientAccessor := &filer_client.FilerClientAccessor{
|
|
GetFiler: func() pb.ServerAddress {
|
|
return pb.ServerAddress(filerAddress)
|
|
},
|
|
GetGrpcDialOption: func() grpc.DialOption {
|
|
return grpc.WithInsecure()
|
|
},
|
|
}
|
|
|
|
return &SMQOffsetStorage{
|
|
filerClientAccessor: filerClientAccessor,
|
|
}, nil
|
|
}
|
|
|
|
// SaveConsumerOffset saves the committed offset for a consumer group
|
|
// Uses the same file format and location as SMQ brokers:
|
|
// Path: <topic-dir>/<partition-dir>/<consumerGroup>.offset
|
|
// Content: 8-byte big-endian offset
|
|
func (s *SMQOffsetStorage) SaveConsumerOffset(key ConsumerOffsetKey, kafkaOffset, smqTimestamp int64, size int32) error {
|
|
t := topic.Topic{
|
|
Namespace: "kafka", // Use kafka namespace for Kafka topics
|
|
Name: key.Topic,
|
|
}
|
|
|
|
// Use proper Kafka partition mapping for SMQ ranges
|
|
smqPartition := kafka.CreateSMQPartition(key.Partition, time.Now().UnixNano())
|
|
p := topic.Partition{
|
|
RingSize: smqPartition.RingSize,
|
|
RangeStart: smqPartition.RangeStart,
|
|
RangeStop: smqPartition.RangeStop,
|
|
UnixTimeNs: smqPartition.UnixTimeNs,
|
|
}
|
|
|
|
partitionDir := topic.PartitionDir(t, p)
|
|
offsetFileName := fmt.Sprintf("%s.offset", key.ConsumerGroup)
|
|
|
|
// Use SMQ's 8-byte offset format
|
|
offsetBytes := make([]byte, 8)
|
|
util.Uint64toBytes(offsetBytes, uint64(kafkaOffset))
|
|
|
|
return s.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
|
|
})
|
|
}
|
|
|
|
// LoadConsumerOffsets loads the committed offset for a consumer group
|
|
// Returns empty slice since we only track the committed offset, not the mapping history
|
|
func (s *SMQOffsetStorage) LoadConsumerOffsets(key ConsumerOffsetKey) ([]OffsetEntry, error) {
|
|
offset, err := s.getCommittedOffset(key)
|
|
if err != nil {
|
|
return []OffsetEntry{}, nil // No committed offset found
|
|
}
|
|
|
|
if offset < 0 {
|
|
return []OffsetEntry{}, nil // No valid offset
|
|
}
|
|
|
|
// Return single entry representing the committed offset
|
|
return []OffsetEntry{
|
|
{
|
|
KafkaOffset: offset,
|
|
Timestamp: 0, // SMQ doesn't store timestamp mapping
|
|
Size: 0, // SMQ doesn't store size mapping
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// GetConsumerHighWaterMark returns the next offset after the committed offset
|
|
func (s *SMQOffsetStorage) GetConsumerHighWaterMark(key ConsumerOffsetKey) (int64, error) {
|
|
offset, err := s.getCommittedOffset(key)
|
|
if err != nil {
|
|
return 0, nil // Start from beginning if no committed offset
|
|
}
|
|
|
|
if offset < 0 {
|
|
return 0, nil // Start from beginning
|
|
}
|
|
|
|
return offset + 1, nil // Next offset after committed
|
|
}
|
|
|
|
// getCommittedOffset reads the committed offset from SMQ's filer location
|
|
func (s *SMQOffsetStorage) getCommittedOffset(key ConsumerOffsetKey) (int64, error) {
|
|
t := topic.Topic{
|
|
Namespace: "kafka",
|
|
Name: key.Topic,
|
|
}
|
|
|
|
// Use proper Kafka partition mapping for SMQ ranges
|
|
smqPartition := kafka.CreateSMQPartition(key.Partition, time.Now().UnixNano())
|
|
p := topic.Partition{
|
|
RingSize: smqPartition.RingSize,
|
|
RangeStart: smqPartition.RangeStart,
|
|
RangeStop: smqPartition.RangeStop,
|
|
UnixTimeNs: smqPartition.UnixTimeNs,
|
|
}
|
|
|
|
partitionDir := topic.PartitionDir(t, p)
|
|
offsetFileName := fmt.Sprintf("%s.offset", key.ConsumerGroup)
|
|
|
|
var offset int64 = -1
|
|
err := s.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(data) != 8 {
|
|
return fmt.Errorf("invalid offset file format")
|
|
}
|
|
offset = int64(util.BytesToUint64(data))
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
return offset, nil
|
|
}
|
|
|
|
// Legacy methods for backward compatibility
|
|
|
|
func (s *SMQOffsetStorage) SaveOffsetMapping(topicPartition string, kafkaOffset, smqTimestamp int64, size int32) error {
|
|
key, err := parseTopicPartitionKey(topicPartition)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.SaveConsumerOffset(key, kafkaOffset, smqTimestamp, size)
|
|
}
|
|
|
|
func (s *SMQOffsetStorage) LoadOffsetMappings(topicPartition string) ([]OffsetEntry, error) {
|
|
key, err := parseTopicPartitionKey(topicPartition)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.LoadConsumerOffsets(key)
|
|
}
|
|
|
|
func (s *SMQOffsetStorage) GetHighWaterMark(topicPartition string) (int64, error) {
|
|
key, err := parseTopicPartitionKey(topicPartition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return s.GetConsumerHighWaterMark(key)
|
|
}
|
|
|
|
// Close is a no-op for SMQ storage
|
|
func (s *SMQOffsetStorage) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// parseTopicPartitionKey parses legacy "topic:partition" format into ConsumerOffsetKey
|
|
func parseTopicPartitionKey(topicPartition string) (ConsumerOffsetKey, error) {
|
|
lastColonIndex := strings.LastIndex(topicPartition, ":")
|
|
if lastColonIndex <= 0 || lastColonIndex == len(topicPartition)-1 {
|
|
return ConsumerOffsetKey{}, fmt.Errorf("invalid legacy format: expected 'topic:partition', got '%s'", topicPartition)
|
|
}
|
|
|
|
topic := topicPartition[:lastColonIndex]
|
|
if topic == "" {
|
|
return ConsumerOffsetKey{}, fmt.Errorf("empty topic in legacy format: '%s'", topicPartition)
|
|
}
|
|
|
|
partitionStr := topicPartition[lastColonIndex+1:]
|
|
partition, err := strconv.ParseInt(partitionStr, 10, 32)
|
|
if err != nil {
|
|
return ConsumerOffsetKey{}, fmt.Errorf("invalid partition number in legacy format '%s': %w", topicPartition, err)
|
|
}
|
|
|
|
if partition < 0 {
|
|
return ConsumerOffsetKey{}, fmt.Errorf("negative partition number in legacy format: %d", partition)
|
|
}
|
|
|
|
// Return a ConsumerOffsetKey with empty group ID since legacy format doesn't include it
|
|
return ConsumerOffsetKey{
|
|
ConsumerGroup: "", // Legacy format doesn't specify group
|
|
Topic: topic,
|
|
Partition: int32(partition),
|
|
}, nil
|
|
}
|