diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 3b585f6f6..3ca10d258 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -150,11 +150,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { self := b.option.BrokerAddress() - glog.V(0).Infof("broker %s load topic %v partition %v", self, t, p) // load local topic partition from configuration on filer if not found var conf *mq_pb.ConfigureTopicResponse - conf, err = b.readTopicConfFromFiler(t, p) + conf, err = b.readTopicConfFromFiler(t) if err != nil { return nil, err } @@ -177,17 +176,20 @@ func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p t return localTopicPartition, nil } -func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic, p topic.Partition) (conf *mq_pb.ConfigureTopicResponse, err error) { +func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) { + + glog.V(0).Infof("load conf for topic %v from filer", t) + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") if err != nil { - return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err) + return fmt.Errorf("read topic %v partition %v conf: %v", t, err) } // parse into filer conf object conf = &mq_pb.ConfigureTopicResponse{} if err = jsonpb.Unmarshal(data, conf); err != nil { - return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err) + return fmt.Errorf("unmarshal topic %v conf: %v", t, err) } return nil }); err != nil { diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 352f5fa81..94bd6b0e2 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -4,6 +4,7 @@ import ( "context" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -35,8 +36,27 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess ctx := stream.Context() - // process ack messages go func() { + // try to load the partition assignment from filer + if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil { + assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(conf.BrokerPartitionAssignments)) + for i, assignment := range conf.BrokerPartitionAssignments { + assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ + Partition: assignment.Partition, + Broker: assignment.LeaderBroker, + } + } + // send partition assignment to subscriber + cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{ + Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ + Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ + AssignedPartitions: assignedPartitions, + }, + }, + } + } + + // process ack messages for { _, err := stream.Recv() if err != nil {