package offset import ( "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) // KafkaToSMQMapper handles the conversion between Kafka offsets and SMQ PartitionOffset type KafkaToSMQMapper struct { ledger *Ledger } // NewKafkaToSMQMapper creates a new mapper with the given ledger func NewKafkaToSMQMapper(ledger *Ledger) *KafkaToSMQMapper { return &KafkaToSMQMapper{ ledger: ledger, } } // KafkaOffsetToSMQPartitionOffset converts a Kafka offset to SMQ PartitionOffset // This is the core mapping function that bridges Kafka and SMQ semantics func (m *KafkaToSMQMapper) KafkaOffsetToSMQPartitionOffset( kafkaOffset int64, topic string, kafkaPartition int32, ) (*schema_pb.PartitionOffset, error) { // Step 1: Look up the SMQ timestamp for this Kafka offset smqTimestamp, _, err := m.ledger.GetRecord(kafkaOffset) if err != nil { return nil, fmt.Errorf("failed to find SMQ timestamp for Kafka offset %d: %w", kafkaOffset, err) } // Step 2: Create SMQ Partition using centralized utility smqPartition := kafka.CreateSMQPartition(kafkaPartition, smqTimestamp) // Step 3: Create PartitionOffset with the mapped timestamp partitionOffset := &schema_pb.PartitionOffset{ Partition: smqPartition, StartTsNs: smqTimestamp, // This is the key mapping: Kafka offset → SMQ timestamp } return partitionOffset, nil } // SMQPartitionOffsetToKafkaOffset converts SMQ PartitionOffset back to Kafka offset // This is used during Fetch operations to convert SMQ data back to Kafka semantics func (m *KafkaToSMQMapper) SMQPartitionOffsetToKafkaOffset( partitionOffset *schema_pb.PartitionOffset, ) (int64, error) { smqTimestamp := partitionOffset.StartTsNs // Binary search through the ledger to find the Kafka offset for this timestamp entries := m.ledger.entries for _, entry := range entries { if entry.Timestamp == smqTimestamp { return entry.KafkaOffset, nil } } return -1, fmt.Errorf("no Kafka offset found for SMQ timestamp %d", smqTimestamp) } // CreateSMQSubscriptionRequest creates a proper SMQ subscription request for a Kafka fetch func (m *KafkaToSMQMapper) CreateSMQSubscriptionRequest( topic string, kafkaPartition int32, startKafkaOffset int64, consumerGroup string, ) (*schema_pb.PartitionOffset, schema_pb.OffsetType, error) { var startTimestamp int64 var offsetType schema_pb.OffsetType // Handle special Kafka offset values switch startKafkaOffset { case -2: // EARLIEST startTimestamp = m.ledger.earliestTime offsetType = schema_pb.OffsetType_RESET_TO_EARLIEST case -1: // LATEST startTimestamp = m.ledger.latestTime offsetType = schema_pb.OffsetType_RESET_TO_LATEST default: // Specific offset if startKafkaOffset < 0 { return nil, 0, fmt.Errorf("invalid Kafka offset: %d", startKafkaOffset) } // Look up the SMQ timestamp for this Kafka offset timestamp, _, err := m.ledger.GetRecord(startKafkaOffset) if err != nil { // If exact offset not found, use the next available timestamp if startKafkaOffset >= m.ledger.GetHighWaterMark() { startTimestamp = time.Now().UnixNano() // Start from now for future messages offsetType = schema_pb.OffsetType_EXACT_TS_NS } else { return nil, 0, fmt.Errorf("Kafka offset %d not found in ledger", startKafkaOffset) } } else { startTimestamp = timestamp offsetType = schema_pb.OffsetType_EXACT_TS_NS } } // Create SMQ partition mapping using centralized utility smqPartition := kafka.CreateSMQPartition(kafkaPartition, time.Now().UnixNano()) partitionOffset := &schema_pb.PartitionOffset{ Partition: smqPartition, StartTsNs: startTimestamp, } return partitionOffset, offsetType, nil } // ExtractKafkaPartitionFromSMQPartition extracts the Kafka partition number from SMQ Partition func ExtractKafkaPartitionFromSMQPartition(smqPartition *schema_pb.Partition) int32 { // Use centralized utility for consistent extraction return kafka.ExtractKafkaPartitionFromSMQRange(smqPartition.RangeStart) } // OffsetMappingInfo provides debugging information about the mapping type OffsetMappingInfo struct { KafkaOffset int64 SMQTimestamp int64 KafkaPartition int32 SMQRangeStart int32 SMQRangeStop int32 MessageSize int32 } // GetMappingInfo returns detailed mapping information for debugging func (m *KafkaToSMQMapper) GetMappingInfo(kafkaOffset int64, kafkaPartition int32) (*OffsetMappingInfo, error) { timestamp, size, err := m.ledger.GetRecord(kafkaOffset) if err != nil { return nil, err } start, stop := kafka.MapKafkaPartitionToSMQRange(kafkaPartition) return &OffsetMappingInfo{ KafkaOffset: kafkaOffset, SMQTimestamp: timestamp, KafkaPartition: kafkaPartition, SMQRangeStart: start, SMQRangeStop: stop, MessageSize: size, }, nil } // ValidateMapping checks if the Kafka-SMQ mapping is consistent func (m *KafkaToSMQMapper) ValidateMapping(topic string, kafkaPartition int32) error { // Check that offsets are sequential entries := m.ledger.entries for i := 1; i < len(entries); i++ { if entries[i].KafkaOffset != entries[i-1].KafkaOffset+1 { return fmt.Errorf("non-sequential Kafka offsets: %d -> %d", entries[i-1].KafkaOffset, entries[i].KafkaOffset) } } // Check that timestamps are monotonically increasing for i := 1; i < len(entries); i++ { if entries[i].Timestamp <= entries[i-1].Timestamp { return fmt.Errorf("non-monotonic SMQ timestamps: %d -> %d", entries[i-1].Timestamp, entries[i].Timestamp) } } return nil } // GetOffsetRange returns the Kafka offset range for a given SMQ time range func (m *KafkaToSMQMapper) GetOffsetRange(startTime, endTime int64) (startOffset, endOffset int64, err error) { startOffset = -1 endOffset = -1 entries := m.ledger.entries for _, entry := range entries { if entry.Timestamp >= startTime && startOffset == -1 { startOffset = entry.KafkaOffset } if entry.Timestamp <= endTime { endOffset = entry.KafkaOffset } } if startOffset == -1 { return 0, 0, fmt.Errorf("no offsets found in time range [%d, %d]", startTime, endTime) } return startOffset, endOffset, nil } // CreatePartitionOffsetForTimeRange creates a PartitionOffset for a specific time range func (m *KafkaToSMQMapper) CreatePartitionOffsetForTimeRange( kafkaPartition int32, startTime int64, ) *schema_pb.PartitionOffset { smqPartition := kafka.CreateSMQPartition(kafkaPartition, time.Now().UnixNano()) return &schema_pb.PartitionOffset{ Partition: smqPartition, StartTsNs: startTime, } }