package broker import ( "context" "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/offset" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) // SubscribeWithOffset handles subscription requests with offset-based positioning // TODO: This extends the broker with offset-aware subscription support // ASSUMPTION: This will eventually be integrated into the main SubscribeMessage method func (b *MessageQueueBroker) SubscribeWithOffset( ctx context.Context, req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer, offsetType schema_pb.OffsetType, startOffset int64, ) error { initMessage := req.GetInit() if initMessage == nil { return fmt.Errorf("missing init message") } // Extract partition information from the request t := topic.FromPbTopic(initMessage.Topic) // Get partition from the request's partition_offset field if initMessage.PartitionOffset == nil || initMessage.PartitionOffset.Partition == nil { return fmt.Errorf("missing partition information in request") } // Use the partition information from the request p := topic.Partition{ RingSize: initMessage.PartitionOffset.Partition.RingSize, RangeStart: initMessage.PartitionOffset.Partition.RangeStart, RangeStop: initMessage.PartitionOffset.Partition.RangeStop, UnixTimeNs: initMessage.PartitionOffset.Partition.UnixTimeNs, } // Create offset-based subscription subscriptionID := fmt.Sprintf("%s-%s-%d", initMessage.ConsumerGroup, initMessage.ConsumerId, startOffset) subscription, err := b.offsetManager.CreateSubscription(subscriptionID, t, p, offsetType, startOffset) if err != nil { return fmt.Errorf("failed to create offset subscription: %w", err) } defer func() { if closeErr := b.offsetManager.CloseSubscription(subscriptionID); closeErr != nil { glog.V(0).Infof("Failed to close subscription %s: %v", subscriptionID, closeErr) } }() // Get local partition for reading localTopicPartition, err := b.GetOrGenerateLocalPartition(t, p) if err != nil { return fmt.Errorf("topic %v partition %v not found: %v", t, p, err) } // Subscribe to messages using offset-based positioning return b.subscribeWithOffsetSubscription(ctx, localTopicPartition, subscription, stream, initMessage) } // subscribeWithOffsetSubscription handles the actual message consumption with offset tracking func (b *MessageQueueBroker) subscribeWithOffsetSubscription( ctx context.Context, localPartition *topic.LocalPartition, subscription *offset.OffsetSubscription, stream mq_pb.SeaweedMessaging_SubscribeMessageServer, initMessage *mq_pb.SubscribeMessageRequest_InitMessage, ) error { clientName := fmt.Sprintf("%s-%s", initMessage.ConsumerGroup, initMessage.ConsumerId) // TODO: Implement offset-based message reading // ASSUMPTION: For now, we'll use the existing subscription mechanism and track offsets separately // This should be replaced with proper offset-based reading from storage // Convert the subscription's current offset to a proper MessagePosition startPosition, err := b.convertOffsetToMessagePosition(subscription) if err != nil { return fmt.Errorf("failed to convert offset to message position: %w", err) } return localPartition.Subscribe(clientName, startPosition, func() bool { // Check if subscription is still active and not at end if !subscription.IsActive { return false } atEnd, err := subscription.IsAtEnd() if err != nil { glog.V(0).Infof("Error checking if subscription at end: %v", err) return false } return !atEnd }, func(logEntry *filer_pb.LogEntry) (bool, error) { // Check if this message matches our offset requirements currentOffset := subscription.GetNextOffset() // TODO: Map LogEntry to offset - for now using timestamp as proxy // ASSUMPTION: LogEntry.Offset field should be populated by the publish flow if logEntry.Offset < currentOffset { // Skip messages before our current offset return false, nil } // Send message to client if err := stream.Send(&mq_pb.SubscribeMessageResponse{ Message: &mq_pb.SubscribeMessageResponse_Data{ Data: &mq_pb.DataMessage{ Key: logEntry.Key, Value: logEntry.Data, TsNs: logEntry.TsNs, }, }, }); err != nil { glog.Errorf("Error sending data to %s: %v", clientName, err) return false, err } // Advance subscription offset subscription.AdvanceOffset() // Check context for cancellation select { case <-ctx.Done(): return true, ctx.Err() default: return false, nil } }) } // GetSubscriptionInfo returns information about an active subscription func (b *MessageQueueBroker) GetSubscriptionInfo(subscriptionID string) (map[string]interface{}, error) { subscription, err := b.offsetManager.GetSubscription(subscriptionID) if err != nil { return nil, err } lag, err := subscription.GetLag() if err != nil { return nil, err } atEnd, err := subscription.IsAtEnd() if err != nil { return nil, err } return map[string]interface{}{ "subscription_id": subscription.ID, "start_offset": subscription.StartOffset, "current_offset": subscription.CurrentOffset, "offset_type": subscription.OffsetType.String(), "is_active": subscription.IsActive, "lag": lag, "at_end": atEnd, }, nil } // ListActiveSubscriptions returns information about all active subscriptions func (b *MessageQueueBroker) ListActiveSubscriptions() ([]map[string]interface{}, error) { subscriptions, err := b.offsetManager.ListActiveSubscriptions() if err != nil { return nil, err } result := make([]map[string]interface{}, len(subscriptions)) for i, subscription := range subscriptions { lag, _ := subscription.GetLag() atEnd, _ := subscription.IsAtEnd() result[i] = map[string]interface{}{ "subscription_id": subscription.ID, "start_offset": subscription.StartOffset, "current_offset": subscription.CurrentOffset, "offset_type": subscription.OffsetType.String(), "is_active": subscription.IsActive, "lag": lag, "at_end": atEnd, } } return result, nil } // SeekSubscription seeks an existing subscription to a specific offset func (b *MessageQueueBroker) SeekSubscription(subscriptionID string, offset int64) error { subscription, err := b.offsetManager.GetSubscription(subscriptionID) if err != nil { return err } return subscription.SeekToOffset(offset) } // convertOffsetToMessagePosition converts a subscription's current offset to a MessagePosition for log_buffer func (b *MessageQueueBroker) convertOffsetToMessagePosition(subscription *offset.OffsetSubscription) (log_buffer.MessagePosition, error) { currentOffset := subscription.GetNextOffset() // Handle special offset cases switch subscription.OffsetType { case schema_pb.OffsetType_RESET_TO_EARLIEST: return log_buffer.NewMessagePosition(1, -3), nil case schema_pb.OffsetType_RESET_TO_LATEST: return log_buffer.NewMessagePosition(time.Now().UnixNano(), -4), nil case schema_pb.OffsetType_EXACT_OFFSET, schema_pb.OffsetType_EXACT_TS_NS: // For exact offsets, we need to convert the Kafka offset to a SeaweedMQ timestamp // TODO: This should use proper offset ledger lookup: ledger.GetRecord(currentOffset) // For now, implement proper offset-based timestamp approximation // Use current time as base and subtract offset-based duration // Higher offsets get older timestamps (further back in time) baseTime := time.Now() // Approximate each offset as 1 millisecond back in time // This ensures proper progression: higher offset = older timestamp timestampNs := baseTime.Add(-time.Duration(currentOffset) * time.Millisecond).UnixNano() return log_buffer.NewMessagePosition(timestampNs, -2), nil default: // Default to starting from current time for unknown offset types return log_buffer.NewMessagePosition(time.Now().UnixNano(), -2), nil } }