package offset import ( "fmt" "sync" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) // OffsetSubscriber handles offset-based subscription logic type OffsetSubscriber struct { mu sync.RWMutex offsetRegistry *PartitionOffsetRegistry subscriptions map[string]*OffsetSubscription } // OffsetSubscription represents an active offset-based subscription type OffsetSubscription struct { ID string Partition *schema_pb.Partition StartOffset int64 CurrentOffset int64 OffsetType schema_pb.OffsetType IsActive bool offsetRegistry *PartitionOffsetRegistry } // NewOffsetSubscriber creates a new offset-based subscriber func NewOffsetSubscriber(offsetRegistry *PartitionOffsetRegistry) *OffsetSubscriber { return &OffsetSubscriber{ offsetRegistry: offsetRegistry, subscriptions: make(map[string]*OffsetSubscription), } } // CreateSubscription creates a new offset-based subscription func (s *OffsetSubscriber) CreateSubscription( subscriptionID string, partition *schema_pb.Partition, offsetType schema_pb.OffsetType, startOffset int64, ) (*OffsetSubscription, error) { s.mu.Lock() defer s.mu.Unlock() // Check if subscription already exists if _, exists := s.subscriptions[subscriptionID]; exists { return nil, fmt.Errorf("subscription %s already exists", subscriptionID) } // Resolve the actual start offset based on type actualStartOffset, err := s.resolveStartOffset(partition, offsetType, startOffset) if err != nil { return nil, fmt.Errorf("failed to resolve start offset: %w", err) } subscription := &OffsetSubscription{ ID: subscriptionID, Partition: partition, StartOffset: actualStartOffset, CurrentOffset: actualStartOffset, OffsetType: offsetType, IsActive: true, offsetRegistry: s.offsetRegistry, } s.subscriptions[subscriptionID] = subscription return subscription, nil } // GetSubscription retrieves an existing subscription func (s *OffsetSubscriber) GetSubscription(subscriptionID string) (*OffsetSubscription, error) { s.mu.RLock() defer s.mu.RUnlock() subscription, exists := s.subscriptions[subscriptionID] if !exists { return nil, fmt.Errorf("subscription %s not found", subscriptionID) } return subscription, nil } // CloseSubscription closes and removes a subscription func (s *OffsetSubscriber) CloseSubscription(subscriptionID string) error { s.mu.Lock() defer s.mu.Unlock() subscription, exists := s.subscriptions[subscriptionID] if !exists { return fmt.Errorf("subscription %s not found", subscriptionID) } subscription.IsActive = false delete(s.subscriptions, subscriptionID) return nil } // resolveStartOffset resolves the actual start offset based on OffsetType func (s *OffsetSubscriber) resolveStartOffset( partition *schema_pb.Partition, offsetType schema_pb.OffsetType, requestedOffset int64, ) (int64, error) { switch offsetType { case schema_pb.OffsetType_EXACT_OFFSET: // Validate that the requested offset exists return s.validateAndGetOffset(partition, requestedOffset) case schema_pb.OffsetType_RESET_TO_OFFSET: // Use the requested offset, even if it doesn't exist yet return requestedOffset, nil case schema_pb.OffsetType_RESET_TO_EARLIEST: // Start from offset 0 return 0, nil case schema_pb.OffsetType_RESET_TO_LATEST: // Start from the current high water mark hwm, err := s.offsetRegistry.GetHighWaterMark(partition) if err != nil { return 0, err } return hwm, nil case schema_pb.OffsetType_RESUME_OR_EARLIEST: // Try to resume from a saved position, fallback to earliest // For now, just use earliest (consumer group position tracking will be added later) return 0, nil case schema_pb.OffsetType_RESUME_OR_LATEST: // Try to resume from a saved position, fallback to latest // For now, just use latest hwm, err := s.offsetRegistry.GetHighWaterMark(partition) if err != nil { return 0, err } return hwm, nil default: return 0, fmt.Errorf("unsupported offset type: %v", offsetType) } } // validateAndGetOffset validates that an offset exists and returns it func (s *OffsetSubscriber) validateAndGetOffset(partition *schema_pb.Partition, offset int64) (int64, error) { if offset < 0 { return 0, fmt.Errorf("offset cannot be negative: %d", offset) } // Get the current high water mark hwm, err := s.offsetRegistry.GetHighWaterMark(partition) if err != nil { return 0, fmt.Errorf("failed to get high water mark: %w", err) } // Check if offset is within valid range if offset >= hwm { return 0, fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm) } return offset, nil } // SeekToOffset seeks a subscription to a specific offset func (sub *OffsetSubscription) SeekToOffset(offset int64) error { if !sub.IsActive { return fmt.Errorf("subscription is not active") } // Validate the offset if offset < 0 { return fmt.Errorf("offset cannot be negative: %d", offset) } hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) if err != nil { return fmt.Errorf("failed to get high water mark: %w", err) } if offset > hwm { return fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm) } sub.CurrentOffset = offset return nil } // GetNextOffset returns the next offset to read func (sub *OffsetSubscription) GetNextOffset() int64 { return sub.CurrentOffset } // AdvanceOffset advances the subscription to the next offset func (sub *OffsetSubscription) AdvanceOffset() { sub.CurrentOffset++ } // GetLag returns the lag between current position and high water mark func (sub *OffsetSubscription) GetLag() (int64, error) { if !sub.IsActive { return 0, fmt.Errorf("subscription is not active") } hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) if err != nil { return 0, fmt.Errorf("failed to get high water mark: %w", err) } lag := hwm - sub.CurrentOffset if lag < 0 { lag = 0 } return lag, nil } // IsAtEnd checks if the subscription has reached the end of available data func (sub *OffsetSubscription) IsAtEnd() (bool, error) { if !sub.IsActive { return true, fmt.Errorf("subscription is not active") } hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) if err != nil { return false, fmt.Errorf("failed to get high water mark: %w", err) } return sub.CurrentOffset >= hwm, nil } // OffsetRange represents a range of offsets type OffsetRange struct { StartOffset int64 EndOffset int64 Count int64 } // GetOffsetRange returns a range of offsets for batch reading func (sub *OffsetSubscription) GetOffsetRange(maxCount int64) (*OffsetRange, error) { if !sub.IsActive { return nil, fmt.Errorf("subscription is not active") } hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) if err != nil { return nil, fmt.Errorf("failed to get high water mark: %w", err) } startOffset := sub.CurrentOffset endOffset := startOffset + maxCount - 1 // Don't go beyond high water mark if endOffset >= hwm { endOffset = hwm - 1 } // If start is already at or beyond HWM, return empty range if startOffset >= hwm { return &OffsetRange{ StartOffset: startOffset, EndOffset: startOffset - 1, // Empty range Count: 0, }, nil } count := endOffset - startOffset + 1 return &OffsetRange{ StartOffset: startOffset, EndOffset: endOffset, Count: count, }, nil } // AdvanceOffsetBy advances the subscription by a specific number of offsets func (sub *OffsetSubscription) AdvanceOffsetBy(count int64) { sub.CurrentOffset += count } // OffsetSeeker provides utilities for offset-based seeking type OffsetSeeker struct { offsetRegistry *PartitionOffsetRegistry } // NewOffsetSeeker creates a new offset seeker func NewOffsetSeeker(offsetRegistry *PartitionOffsetRegistry) *OffsetSeeker { return &OffsetSeeker{ offsetRegistry: offsetRegistry, } } // SeekToTimestamp finds the offset closest to a given timestamp // This bridges offset-based and timestamp-based seeking func (seeker *OffsetSeeker) SeekToTimestamp(partition *schema_pb.Partition, timestamp int64) (int64, error) { // TODO: This requires integration with the storage layer to map timestamps to offsets // For now, return an error indicating this feature needs implementation return 0, fmt.Errorf("timestamp-to-offset mapping not implemented yet") } // ValidateOffsetRange validates that an offset range is valid func (seeker *OffsetSeeker) ValidateOffsetRange(partition *schema_pb.Partition, startOffset, endOffset int64) error { if startOffset < 0 { return fmt.Errorf("start offset cannot be negative: %d", startOffset) } if endOffset < startOffset { return fmt.Errorf("end offset %d cannot be less than start offset %d", endOffset, startOffset) } hwm, err := seeker.offsetRegistry.GetHighWaterMark(partition) if err != nil { return fmt.Errorf("failed to get high water mark: %w", err) } if startOffset >= hwm { return fmt.Errorf("start offset %d is beyond high water mark %d", startOffset, hwm) } if endOffset >= hwm { return fmt.Errorf("end offset %d is beyond high water mark %d", endOffset, hwm) } return nil } // GetAvailableOffsetRange returns the range of available offsets for a partition func (seeker *OffsetSeeker) GetAvailableOffsetRange(partition *schema_pb.Partition) (*OffsetRange, error) { hwm, err := seeker.offsetRegistry.GetHighWaterMark(partition) if err != nil { return nil, fmt.Errorf("failed to get high water mark: %w", err) } if hwm == 0 { // No data available return &OffsetRange{ StartOffset: 0, EndOffset: -1, Count: 0, }, nil } return &OffsetRange{ StartOffset: 0, EndOffset: hwm - 1, Count: hwm, }, nil }