1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/offset/manager.go
2025-09-12 23:02:10 -07:00

318 lines
8.6 KiB
Go

package offset
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// PartitionOffsetManager manages sequential offset assignment for a single partition
type PartitionOffsetManager struct {
mu sync.RWMutex
partition *schema_pb.Partition
nextOffset int64
// Checkpointing for recovery
lastCheckpoint int64
checkpointInterval int64
storage OffsetStorage
}
// OffsetStorage interface for persisting offset state
type OffsetStorage interface {
// SaveCheckpoint persists the current offset state for recovery
SaveCheckpoint(partition *schema_pb.Partition, offset int64) error
// LoadCheckpoint retrieves the last saved offset state
LoadCheckpoint(partition *schema_pb.Partition) (int64, error)
// GetHighestOffset scans storage to find the highest assigned offset
GetHighestOffset(partition *schema_pb.Partition) (int64, error)
}
// NewPartitionOffsetManager creates a new offset manager for a partition
func NewPartitionOffsetManager(partition *schema_pb.Partition, storage OffsetStorage) (*PartitionOffsetManager, error) {
manager := &PartitionOffsetManager{
partition: partition,
checkpointInterval: 1, // Checkpoint every offset for immediate persistence
storage: storage,
}
// Recover offset state
if err := manager.recover(); err != nil {
return nil, fmt.Errorf("failed to recover offset state: %w", err)
}
return manager, nil
}
// AssignOffset assigns the next sequential offset
func (m *PartitionOffsetManager) AssignOffset() int64 {
var shouldCheckpoint bool
var checkpointOffset int64
m.mu.Lock()
offset := m.nextOffset
m.nextOffset++
// Check if we should checkpoint (but don't do it inside the lock)
if offset-m.lastCheckpoint >= m.checkpointInterval {
shouldCheckpoint = true
checkpointOffset = offset
}
m.mu.Unlock()
// Checkpoint outside the lock to avoid deadlock
if shouldCheckpoint {
m.checkpoint(checkpointOffset)
}
return offset
}
// AssignOffsets assigns a batch of sequential offsets
func (m *PartitionOffsetManager) AssignOffsets(count int64) (baseOffset int64, lastOffset int64) {
var shouldCheckpoint bool
var checkpointOffset int64
m.mu.Lock()
baseOffset = m.nextOffset
lastOffset = m.nextOffset + count - 1
m.nextOffset += count
// Check if we should checkpoint (but don't do it inside the lock)
if lastOffset-m.lastCheckpoint >= m.checkpointInterval {
shouldCheckpoint = true
checkpointOffset = lastOffset
}
m.mu.Unlock()
// Checkpoint outside the lock to avoid deadlock
if shouldCheckpoint {
m.checkpoint(checkpointOffset)
}
return baseOffset, lastOffset
}
// GetNextOffset returns the next offset that will be assigned
func (m *PartitionOffsetManager) GetNextOffset() int64 {
m.mu.RLock()
defer m.mu.RUnlock()
return m.nextOffset
}
// GetHighWaterMark returns the high water mark (next offset)
func (m *PartitionOffsetManager) GetHighWaterMark() int64 {
return m.GetNextOffset()
}
// recover restores offset state from storage
func (m *PartitionOffsetManager) recover() error {
var checkpointOffset int64 = -1
var highestOffset int64 = -1
// Try to load checkpoint
if offset, err := m.storage.LoadCheckpoint(m.partition); err == nil && offset >= 0 {
checkpointOffset = offset
}
// Try to scan storage for highest offset
if offset, err := m.storage.GetHighestOffset(m.partition); err == nil && offset >= 0 {
highestOffset = offset
}
// Use the higher of checkpoint or storage scan
if checkpointOffset >= 0 && highestOffset >= 0 {
if highestOffset > checkpointOffset {
m.nextOffset = highestOffset + 1
m.lastCheckpoint = highestOffset
} else {
m.nextOffset = checkpointOffset + 1
m.lastCheckpoint = checkpointOffset
}
} else if checkpointOffset >= 0 {
m.nextOffset = checkpointOffset + 1
m.lastCheckpoint = checkpointOffset
} else if highestOffset >= 0 {
m.nextOffset = highestOffset + 1
m.lastCheckpoint = highestOffset
} else {
// No data exists, start from 0
m.nextOffset = 0
m.lastCheckpoint = -1
}
return nil
}
// checkpoint saves the current offset state
func (m *PartitionOffsetManager) checkpoint(offset int64) {
if err := m.storage.SaveCheckpoint(m.partition, offset); err != nil {
// Log error but don't fail - checkpointing is for optimization
fmt.Printf("Failed to checkpoint offset %d: %v\n", offset, err)
return
}
m.mu.Lock()
m.lastCheckpoint = offset
m.mu.Unlock()
}
// PartitionOffsetRegistry manages offset managers for multiple partitions
type PartitionOffsetRegistry struct {
mu sync.RWMutex
managers map[string]*PartitionOffsetManager
storage OffsetStorage
}
// NewPartitionOffsetRegistry creates a new registry
func NewPartitionOffsetRegistry(storage OffsetStorage) *PartitionOffsetRegistry {
return &PartitionOffsetRegistry{
managers: make(map[string]*PartitionOffsetManager),
storage: storage,
}
}
// GetManager returns the offset manager for a partition, creating it if needed
func (r *PartitionOffsetRegistry) GetManager(partition *schema_pb.Partition) (*PartitionOffsetManager, error) {
key := partitionKey(partition)
r.mu.RLock()
manager, exists := r.managers[key]
r.mu.RUnlock()
if exists {
return manager, nil
}
// Create new manager
r.mu.Lock()
defer r.mu.Unlock()
// Double-check after acquiring write lock
if manager, exists := r.managers[key]; exists {
return manager, nil
}
manager, err := NewPartitionOffsetManager(partition, r.storage)
if err != nil {
return nil, err
}
r.managers[key] = manager
return manager, nil
}
// AssignOffset assigns an offset for the given partition
func (r *PartitionOffsetRegistry) AssignOffset(partition *schema_pb.Partition) (int64, error) {
manager, err := r.GetManager(partition)
if err != nil {
return 0, err
}
return manager.AssignOffset(), nil
}
// AssignOffsets assigns a batch of offsets for the given partition
func (r *PartitionOffsetRegistry) AssignOffsets(partition *schema_pb.Partition, count int64) (baseOffset, lastOffset int64, err error) {
manager, err := r.GetManager(partition)
if err != nil {
return 0, 0, err
}
baseOffset, lastOffset = manager.AssignOffsets(count)
return baseOffset, lastOffset, nil
}
// GetHighWaterMark returns the high water mark for a partition
func (r *PartitionOffsetRegistry) GetHighWaterMark(partition *schema_pb.Partition) (int64, error) {
manager, err := r.GetManager(partition)
if err != nil {
return 0, err
}
return manager.GetHighWaterMark(), nil
}
// partitionKey generates a unique key for a partition
func partitionKey(partition *schema_pb.Partition) string {
return fmt.Sprintf("ring:%d:range:%d-%d:time:%d",
partition.RingSize, partition.RangeStart, partition.RangeStop, partition.UnixTimeNs)
}
// OffsetAssignment represents an assigned offset with metadata
type OffsetAssignment struct {
Offset int64
Timestamp int64
Partition *schema_pb.Partition
}
// BatchOffsetAssignment represents a batch of assigned offsets
type BatchOffsetAssignment struct {
BaseOffset int64
LastOffset int64
Count int64
Timestamp int64
Partition *schema_pb.Partition
}
// AssignmentResult contains the result of offset assignment
type AssignmentResult struct {
Assignment *OffsetAssignment
Batch *BatchOffsetAssignment
Error error
}
// OffsetAssigner provides high-level offset assignment operations
type OffsetAssigner struct {
registry *PartitionOffsetRegistry
}
// NewOffsetAssigner creates a new offset assigner
func NewOffsetAssigner(storage OffsetStorage) *OffsetAssigner {
return &OffsetAssigner{
registry: NewPartitionOffsetRegistry(storage),
}
}
// AssignSingleOffset assigns a single offset with timestamp
func (a *OffsetAssigner) AssignSingleOffset(partition *schema_pb.Partition) *AssignmentResult {
offset, err := a.registry.AssignOffset(partition)
if err != nil {
return &AssignmentResult{Error: err}
}
return &AssignmentResult{
Assignment: &OffsetAssignment{
Offset: offset,
Timestamp: time.Now().UnixNano(),
Partition: partition,
},
}
}
// AssignBatchOffsets assigns a batch of offsets with timestamp
func (a *OffsetAssigner) AssignBatchOffsets(partition *schema_pb.Partition, count int64) *AssignmentResult {
baseOffset, lastOffset, err := a.registry.AssignOffsets(partition, count)
if err != nil {
return &AssignmentResult{Error: err}
}
return &AssignmentResult{
Batch: &BatchOffsetAssignment{
BaseOffset: baseOffset,
LastOffset: lastOffset,
Count: count,
Timestamp: time.Now().UnixNano(),
Partition: partition,
},
}
}
// GetHighWaterMark returns the high water mark for a partition
func (a *OffsetAssigner) GetHighWaterMark(partition *schema_pb.Partition) (int64, error) {
return a.registry.GetHighWaterMark(partition)
}