mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
242 lines
8 KiB
Go
242 lines
8 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/offset"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
|
)
|
|
|
|
// SubscribeWithOffset handles subscription requests with offset-based positioning
|
|
// TODO: This extends the broker with offset-aware subscription support
|
|
// ASSUMPTION: This will eventually be integrated into the main SubscribeMessage method
|
|
func (b *MessageQueueBroker) SubscribeWithOffset(
|
|
ctx context.Context,
|
|
req *mq_pb.SubscribeMessageRequest,
|
|
stream mq_pb.SeaweedMessaging_SubscribeMessageServer,
|
|
offsetType schema_pb.OffsetType,
|
|
startOffset int64,
|
|
) error {
|
|
|
|
initMessage := req.GetInit()
|
|
if initMessage == nil {
|
|
return fmt.Errorf("missing init message")
|
|
}
|
|
|
|
// Extract partition information from the request
|
|
t := topic.FromPbTopic(initMessage.Topic)
|
|
|
|
// Get partition from the request's partition_offset field
|
|
if initMessage.PartitionOffset == nil || initMessage.PartitionOffset.Partition == nil {
|
|
return fmt.Errorf("missing partition information in request")
|
|
}
|
|
|
|
// Use the partition information from the request
|
|
p := topic.Partition{
|
|
RingSize: initMessage.PartitionOffset.Partition.RingSize,
|
|
RangeStart: initMessage.PartitionOffset.Partition.RangeStart,
|
|
RangeStop: initMessage.PartitionOffset.Partition.RangeStop,
|
|
UnixTimeNs: initMessage.PartitionOffset.Partition.UnixTimeNs,
|
|
}
|
|
|
|
// Create offset-based subscription
|
|
subscriptionID := fmt.Sprintf("%s-%s-%d", initMessage.ConsumerGroup, initMessage.ConsumerId, startOffset)
|
|
subscription, err := b.offsetManager.CreateSubscription(subscriptionID, t, p, offsetType, startOffset)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create offset subscription: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if closeErr := b.offsetManager.CloseSubscription(subscriptionID); closeErr != nil {
|
|
glog.V(0).Infof("Failed to close subscription %s: %v", subscriptionID, closeErr)
|
|
}
|
|
}()
|
|
|
|
// Get local partition for reading
|
|
localTopicPartition, err := b.GetOrGenerateLocalPartition(t, p)
|
|
if err != nil {
|
|
return fmt.Errorf("topic %v partition %v not found: %v", t, p, err)
|
|
}
|
|
|
|
// Subscribe to messages using offset-based positioning
|
|
return b.subscribeWithOffsetSubscription(ctx, localTopicPartition, subscription, stream, initMessage)
|
|
}
|
|
|
|
// subscribeWithOffsetSubscription handles the actual message consumption with offset tracking
|
|
func (b *MessageQueueBroker) subscribeWithOffsetSubscription(
|
|
ctx context.Context,
|
|
localPartition *topic.LocalPartition,
|
|
subscription *offset.OffsetSubscription,
|
|
stream mq_pb.SeaweedMessaging_SubscribeMessageServer,
|
|
initMessage *mq_pb.SubscribeMessageRequest_InitMessage,
|
|
) error {
|
|
|
|
clientName := fmt.Sprintf("%s-%s", initMessage.ConsumerGroup, initMessage.ConsumerId)
|
|
|
|
// TODO: Implement offset-based message reading
|
|
// ASSUMPTION: For now, we'll use the existing subscription mechanism and track offsets separately
|
|
// This should be replaced with proper offset-based reading from storage
|
|
|
|
// Convert the subscription's current offset to a proper MessagePosition
|
|
startPosition, err := b.convertOffsetToMessagePosition(subscription)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to convert offset to message position: %w", err)
|
|
}
|
|
|
|
return localPartition.Subscribe(clientName,
|
|
startPosition,
|
|
func() bool {
|
|
// Check if subscription is still active and not at end
|
|
if !subscription.IsActive {
|
|
return false
|
|
}
|
|
|
|
atEnd, err := subscription.IsAtEnd()
|
|
if err != nil {
|
|
glog.V(0).Infof("Error checking if subscription at end: %v", err)
|
|
return false
|
|
}
|
|
|
|
return !atEnd
|
|
},
|
|
func(logEntry *filer_pb.LogEntry) (bool, error) {
|
|
// Check if this message matches our offset requirements
|
|
currentOffset := subscription.GetNextOffset()
|
|
|
|
// TODO: Map LogEntry to offset - for now using timestamp as proxy
|
|
// ASSUMPTION: LogEntry.Offset field should be populated by the publish flow
|
|
if logEntry.Offset < currentOffset {
|
|
// Skip messages before our current offset
|
|
return false, nil
|
|
}
|
|
|
|
// Send message to client
|
|
if err := stream.Send(&mq_pb.SubscribeMessageResponse{
|
|
Message: &mq_pb.SubscribeMessageResponse_Data{
|
|
Data: &mq_pb.DataMessage{
|
|
Key: logEntry.Key,
|
|
Value: logEntry.Data,
|
|
TsNs: logEntry.TsNs,
|
|
},
|
|
},
|
|
}); err != nil {
|
|
glog.Errorf("Error sending data to %s: %v", clientName, err)
|
|
return false, err
|
|
}
|
|
|
|
// Advance subscription offset
|
|
subscription.AdvanceOffset()
|
|
|
|
// Check context for cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
return true, ctx.Err()
|
|
default:
|
|
return false, nil
|
|
}
|
|
})
|
|
}
|
|
|
|
// GetSubscriptionInfo returns information about an active subscription
|
|
func (b *MessageQueueBroker) GetSubscriptionInfo(subscriptionID string) (map[string]interface{}, error) {
|
|
subscription, err := b.offsetManager.GetSubscription(subscriptionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lag, err := subscription.GetLag()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
atEnd, err := subscription.IsAtEnd()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"subscription_id": subscription.ID,
|
|
"start_offset": subscription.StartOffset,
|
|
"current_offset": subscription.CurrentOffset,
|
|
"offset_type": subscription.OffsetType.String(),
|
|
"is_active": subscription.IsActive,
|
|
"lag": lag,
|
|
"at_end": atEnd,
|
|
}, nil
|
|
}
|
|
|
|
// ListActiveSubscriptions returns information about all active subscriptions
|
|
func (b *MessageQueueBroker) ListActiveSubscriptions() ([]map[string]interface{}, error) {
|
|
subscriptions, err := b.offsetManager.ListActiveSubscriptions()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]map[string]interface{}, len(subscriptions))
|
|
for i, subscription := range subscriptions {
|
|
lag, _ := subscription.GetLag()
|
|
atEnd, _ := subscription.IsAtEnd()
|
|
|
|
result[i] = map[string]interface{}{
|
|
"subscription_id": subscription.ID,
|
|
"start_offset": subscription.StartOffset,
|
|
"current_offset": subscription.CurrentOffset,
|
|
"offset_type": subscription.OffsetType.String(),
|
|
"is_active": subscription.IsActive,
|
|
"lag": lag,
|
|
"at_end": atEnd,
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// SeekSubscription seeks an existing subscription to a specific offset
|
|
func (b *MessageQueueBroker) SeekSubscription(subscriptionID string, offset int64) error {
|
|
subscription, err := b.offsetManager.GetSubscription(subscriptionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return subscription.SeekToOffset(offset)
|
|
}
|
|
|
|
// convertOffsetToMessagePosition converts a subscription's current offset to a MessagePosition for log_buffer
|
|
func (b *MessageQueueBroker) convertOffsetToMessagePosition(subscription *offset.OffsetSubscription) (log_buffer.MessagePosition, error) {
|
|
currentOffset := subscription.GetNextOffset()
|
|
|
|
// Handle special offset cases
|
|
switch subscription.OffsetType {
|
|
case schema_pb.OffsetType_RESET_TO_EARLIEST:
|
|
return log_buffer.NewMessagePosition(1, -3), nil
|
|
|
|
case schema_pb.OffsetType_RESET_TO_LATEST:
|
|
return log_buffer.NewMessagePosition(time.Now().UnixNano(), -4), nil
|
|
|
|
case schema_pb.OffsetType_EXACT_OFFSET, schema_pb.OffsetType_EXACT_TS_NS:
|
|
// For exact offsets, we need to convert the Kafka offset to a SeaweedMQ timestamp
|
|
// TODO: This should use proper offset ledger lookup: ledger.GetRecord(currentOffset)
|
|
// For now, implement proper offset-based timestamp approximation
|
|
|
|
// Use current time as base and subtract offset-based duration
|
|
// Higher offsets get older timestamps (further back in time)
|
|
baseTime := time.Now()
|
|
|
|
// Approximate each offset as 1 millisecond back in time
|
|
// This ensures proper progression: higher offset = older timestamp
|
|
timestampNs := baseTime.Add(-time.Duration(currentOffset) * time.Millisecond).UnixNano()
|
|
|
|
return log_buffer.NewMessagePosition(timestampNs, -2), nil
|
|
|
|
default:
|
|
// Default to starting from current time for unknown offset types
|
|
return log_buffer.NewMessagePosition(time.Now().UnixNano(), -2), nil
|
|
}
|
|
}
|