mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
191 lines
5 KiB
Go
191 lines
5 KiB
Go
package offset
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// SMQRecord represents a record from SeaweedMQ storage
|
|
// This interface is defined here to avoid circular imports between protocol and integration packages
|
|
type SMQRecord interface {
|
|
GetKey() []byte
|
|
GetValue() []byte
|
|
GetTimestamp() int64
|
|
GetOffset() int64
|
|
}
|
|
|
|
// OffsetEntry represents a single offset mapping
|
|
type OffsetEntry struct {
|
|
KafkaOffset int64 // Kafka offset (sequential integer)
|
|
Timestamp int64 // SeaweedMQ timestamp (nanoseconds)
|
|
Size int32 // Message size in bytes
|
|
}
|
|
|
|
// Ledger maintains the mapping between Kafka offsets and SeaweedMQ timestamps
|
|
// for a single topic partition
|
|
type Ledger struct {
|
|
mu sync.RWMutex
|
|
entries []OffsetEntry // sorted by KafkaOffset
|
|
nextOffset int64 // next offset to assign
|
|
earliestTime int64 // timestamp of earliest message
|
|
latestTime int64 // timestamp of latest message
|
|
}
|
|
|
|
// NewLedger creates a new offset ledger starting from offset 0
|
|
func NewLedger() *Ledger {
|
|
return &Ledger{
|
|
entries: make([]OffsetEntry, 0, 1000), // pre-allocate for performance
|
|
nextOffset: 0,
|
|
}
|
|
}
|
|
|
|
// AssignOffsets reserves a range of consecutive Kafka offsets
|
|
// Returns the base offset of the reserved range
|
|
func (l *Ledger) AssignOffsets(count int64) int64 {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
baseOffset := l.nextOffset
|
|
l.nextOffset += count
|
|
return baseOffset
|
|
}
|
|
|
|
// AppendRecord adds a new offset entry to the ledger
|
|
// The kafkaOffset should be from a previous AssignOffsets call
|
|
func (l *Ledger) AppendRecord(kafkaOffset, timestamp int64, size int32) error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
// Validate offset is in expected range
|
|
if kafkaOffset < 0 || kafkaOffset >= l.nextOffset {
|
|
return fmt.Errorf("invalid offset %d, expected 0 <= offset < %d", kafkaOffset, l.nextOffset)
|
|
}
|
|
|
|
// Check for duplicate offset (shouldn't happen in normal operation)
|
|
if len(l.entries) > 0 && l.entries[len(l.entries)-1].KafkaOffset >= kafkaOffset {
|
|
return fmt.Errorf("offset %d already exists or is out of order", kafkaOffset)
|
|
}
|
|
|
|
entry := OffsetEntry{
|
|
KafkaOffset: kafkaOffset,
|
|
Timestamp: timestamp,
|
|
Size: size,
|
|
}
|
|
|
|
l.entries = append(l.entries, entry)
|
|
|
|
// Update earliest/latest timestamps
|
|
if l.earliestTime == 0 || timestamp < l.earliestTime {
|
|
l.earliestTime = timestamp
|
|
}
|
|
if timestamp > l.latestTime {
|
|
l.latestTime = timestamp
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetRecord retrieves the record information for a given Kafka offset
|
|
func (l *Ledger) GetRecord(kafkaOffset int64) (timestamp int64, size int32, err error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
// Binary search for the offset
|
|
idx := sort.Search(len(l.entries), func(i int) bool {
|
|
return l.entries[i].KafkaOffset >= kafkaOffset
|
|
})
|
|
|
|
if idx >= len(l.entries) || l.entries[idx].KafkaOffset != kafkaOffset {
|
|
return 0, 0, fmt.Errorf("offset %d not found", kafkaOffset)
|
|
}
|
|
|
|
entry := l.entries[idx]
|
|
return entry.Timestamp, entry.Size, nil
|
|
}
|
|
|
|
// GetEarliestOffset returns the smallest Kafka offset in the ledger
|
|
func (l *Ledger) GetEarliestOffset() int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
if len(l.entries) == 0 {
|
|
return 0 // no messages yet, earliest is 0
|
|
}
|
|
return l.entries[0].KafkaOffset
|
|
}
|
|
|
|
// GetLatestOffset returns the largest Kafka offset in the ledger
|
|
func (l *Ledger) GetLatestOffset() int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
if len(l.entries) == 0 {
|
|
return 0 // no messages yet, latest is 0
|
|
}
|
|
return l.entries[len(l.entries)-1].KafkaOffset
|
|
}
|
|
|
|
// GetHighWaterMark returns the next offset that will be assigned
|
|
// (i.e., one past the latest offset)
|
|
func (l *Ledger) GetHighWaterMark() int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.nextOffset
|
|
}
|
|
|
|
// GetEntries returns all offset entries in the ledger
|
|
func (l *Ledger) GetEntries() []OffsetEntry {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
// Return a copy to prevent external modification
|
|
result := make([]OffsetEntry, len(l.entries))
|
|
copy(result, l.entries)
|
|
return result
|
|
}
|
|
|
|
// FindOffsetByTimestamp returns the first offset with a timestamp >= target
|
|
// Used for timestamp-based offset lookup
|
|
func (l *Ledger) FindOffsetByTimestamp(targetTimestamp int64) int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
if len(l.entries) == 0 {
|
|
return 0
|
|
}
|
|
|
|
// Binary search for first entry with timestamp >= targetTimestamp
|
|
idx := sort.Search(len(l.entries), func(i int) bool {
|
|
return l.entries[i].Timestamp >= targetTimestamp
|
|
})
|
|
|
|
if idx >= len(l.entries) {
|
|
// Target timestamp is after all entries, return high water mark
|
|
return l.nextOffset
|
|
}
|
|
|
|
return l.entries[idx].KafkaOffset
|
|
}
|
|
|
|
// GetStats returns basic statistics about the ledger
|
|
func (l *Ledger) GetStats() (entryCount int, earliestTime, latestTime, nextOffset int64) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
return len(l.entries), l.earliestTime, l.latestTime, l.nextOffset
|
|
}
|
|
|
|
// GetTimestampRange returns the time range covered by this ledger
|
|
func (l *Ledger) GetTimestampRange() (earliest, latest int64) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
if len(l.entries) == 0 {
|
|
now := time.Now().UnixNano()
|
|
return now, now // stub values when no data
|
|
}
|
|
|
|
return l.earliestTime, l.latestTime
|
|
}
|