1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/offset/smq_mapping.go
2025-09-16 01:48:29 -07:00

211 lines
6.4 KiB
Go

package offset
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// KafkaToSMQMapper handles the conversion between Kafka offsets and SMQ PartitionOffset
type KafkaToSMQMapper struct {
ledger *Ledger
}
// NewKafkaToSMQMapper creates a new mapper with the given ledger
func NewKafkaToSMQMapper(ledger *Ledger) *KafkaToSMQMapper {
return &KafkaToSMQMapper{
ledger: ledger,
}
}
// KafkaOffsetToSMQPartitionOffset converts a Kafka offset to SMQ PartitionOffset
// This is the core mapping function that bridges Kafka and SMQ semantics
func (m *KafkaToSMQMapper) KafkaOffsetToSMQPartitionOffset(
kafkaOffset int64,
topic string,
kafkaPartition int32,
) (*schema_pb.PartitionOffset, error) {
// Step 1: Look up the SMQ timestamp for this Kafka offset
smqTimestamp, _, err := m.ledger.GetRecord(kafkaOffset)
if err != nil {
return nil, fmt.Errorf("failed to find SMQ timestamp for Kafka offset %d: %w", kafkaOffset, err)
}
// Step 2: Create SMQ Partition using centralized utility
smqPartition := kafka.CreateSMQPartition(kafkaPartition, smqTimestamp)
// Step 3: Create PartitionOffset with the mapped timestamp
partitionOffset := &schema_pb.PartitionOffset{
Partition: smqPartition,
StartTsNs: smqTimestamp, // This is the key mapping: Kafka offset → SMQ timestamp
}
return partitionOffset, nil
}
// SMQPartitionOffsetToKafkaOffset converts SMQ PartitionOffset back to Kafka offset
// This is used during Fetch operations to convert SMQ data back to Kafka semantics
func (m *KafkaToSMQMapper) SMQPartitionOffsetToKafkaOffset(
partitionOffset *schema_pb.PartitionOffset,
) (int64, error) {
smqTimestamp := partitionOffset.StartTsNs
// Binary search through the ledger to find the Kafka offset for this timestamp
entries := m.ledger.entries
for _, entry := range entries {
if entry.Timestamp == smqTimestamp {
return entry.KafkaOffset, nil
}
}
return -1, fmt.Errorf("no Kafka offset found for SMQ timestamp %d", smqTimestamp)
}
// CreateSMQSubscriptionRequest creates a proper SMQ subscription request for a Kafka fetch
func (m *KafkaToSMQMapper) CreateSMQSubscriptionRequest(
topic string,
kafkaPartition int32,
startKafkaOffset int64,
consumerGroup string,
) (*schema_pb.PartitionOffset, schema_pb.OffsetType, error) {
var startTimestamp int64
var offsetType schema_pb.OffsetType
// Handle special Kafka offset values
switch startKafkaOffset {
case -2: // EARLIEST
startTimestamp = m.ledger.earliestTime
offsetType = schema_pb.OffsetType_RESET_TO_EARLIEST
case -1: // LATEST
startTimestamp = m.ledger.latestTime
offsetType = schema_pb.OffsetType_RESET_TO_LATEST
default: // Specific offset
if startKafkaOffset < 0 {
return nil, 0, fmt.Errorf("invalid Kafka offset: %d", startKafkaOffset)
}
// Look up the SMQ timestamp for this Kafka offset
timestamp, _, err := m.ledger.GetRecord(startKafkaOffset)
if err != nil {
// If exact offset not found, use the next available timestamp
if startKafkaOffset >= m.ledger.GetHighWaterMark() {
startTimestamp = time.Now().UnixNano() // Start from now for future messages
offsetType = schema_pb.OffsetType_EXACT_TS_NS
} else {
return nil, 0, fmt.Errorf("Kafka offset %d not found in ledger", startKafkaOffset)
}
} else {
startTimestamp = timestamp
offsetType = schema_pb.OffsetType_EXACT_TS_NS
}
}
// Create SMQ partition mapping using centralized utility
smqPartition := kafka.CreateSMQPartition(kafkaPartition, time.Now().UnixNano())
partitionOffset := &schema_pb.PartitionOffset{
Partition: smqPartition,
StartTsNs: startTimestamp,
}
return partitionOffset, offsetType, nil
}
// ExtractKafkaPartitionFromSMQPartition extracts the Kafka partition number from SMQ Partition
func ExtractKafkaPartitionFromSMQPartition(smqPartition *schema_pb.Partition) int32 {
// Use centralized utility for consistent extraction
return kafka.ExtractKafkaPartitionFromSMQRange(smqPartition.RangeStart)
}
// OffsetMappingInfo provides debugging information about the mapping
type OffsetMappingInfo struct {
KafkaOffset int64
SMQTimestamp int64
KafkaPartition int32
SMQRangeStart int32
SMQRangeStop int32
MessageSize int32
}
// GetMappingInfo returns detailed mapping information for debugging
func (m *KafkaToSMQMapper) GetMappingInfo(kafkaOffset int64, kafkaPartition int32) (*OffsetMappingInfo, error) {
timestamp, size, err := m.ledger.GetRecord(kafkaOffset)
if err != nil {
return nil, err
}
start, stop := kafka.MapKafkaPartitionToSMQRange(kafkaPartition)
return &OffsetMappingInfo{
KafkaOffset: kafkaOffset,
SMQTimestamp: timestamp,
KafkaPartition: kafkaPartition,
SMQRangeStart: start,
SMQRangeStop: stop,
MessageSize: size,
}, nil
}
// ValidateMapping checks if the Kafka-SMQ mapping is consistent
func (m *KafkaToSMQMapper) ValidateMapping(topic string, kafkaPartition int32) error {
// Check that offsets are sequential
entries := m.ledger.entries
for i := 1; i < len(entries); i++ {
if entries[i].KafkaOffset != entries[i-1].KafkaOffset+1 {
return fmt.Errorf("non-sequential Kafka offsets: %d -> %d",
entries[i-1].KafkaOffset, entries[i].KafkaOffset)
}
}
// Check that timestamps are monotonically increasing
for i := 1; i < len(entries); i++ {
if entries[i].Timestamp <= entries[i-1].Timestamp {
return fmt.Errorf("non-monotonic SMQ timestamps: %d -> %d",
entries[i-1].Timestamp, entries[i].Timestamp)
}
}
return nil
}
// GetOffsetRange returns the Kafka offset range for a given SMQ time range
func (m *KafkaToSMQMapper) GetOffsetRange(startTime, endTime int64) (startOffset, endOffset int64, err error) {
startOffset = -1
endOffset = -1
entries := m.ledger.entries
for _, entry := range entries {
if entry.Timestamp >= startTime && startOffset == -1 {
startOffset = entry.KafkaOffset
}
if entry.Timestamp <= endTime {
endOffset = entry.KafkaOffset
}
}
if startOffset == -1 {
return 0, 0, fmt.Errorf("no offsets found in time range [%d, %d]", startTime, endTime)
}
return startOffset, endOffset, nil
}
// CreatePartitionOffsetForTimeRange creates a PartitionOffset for a specific time range
func (m *KafkaToSMQMapper) CreatePartitionOffsetForTimeRange(
kafkaPartition int32,
startTime int64,
) *schema_pb.PartitionOffset {
smqPartition := kafka.CreateSMQPartition(kafkaPartition, time.Now().UnixNano())
return &schema_pb.PartitionOffset{
Partition: smqPartition,
StartTsNs: startTime,
}
}