package broker import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "time" ) func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error { localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic), topic.FromPbPartition(req.Cursor.Partition)) if localTopicPartition == nil { stream.Send(&mq_pb.SubscribeResponse{ Message: &mq_pb.SubscribeResponse_Ctrl{ Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{ Error: "not initialized", }, }, }) return nil } clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId) localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error { value := logEntry.GetData() if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{ Data: &mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)), Value: value, }, }}); err != nil { glog.Errorf("Error sending setup response: %v", err) return err } return nil }) return nil }