diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index e8238a5f7..3b585f6f6 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -149,26 +149,17 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { + self := b.option.BrokerAddress() + glog.V(0).Infof("broker %s load topic %v partition %v", self, t, p) + // load local topic partition from configuration on filer if not found var conf *mq_pb.ConfigureTopicResponse - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") - if err != nil { - return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err) - } - // parse into filer conf object - conf = &mq_pb.ConfigureTopicResponse{} - if err = jsonpb.Unmarshal(data, conf); err != nil { - return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err) - } - return nil - }); err != nil { + conf, err = b.readTopicConfFromFiler(t, p) + if err != nil { return nil, err } // create local topic partition - self := b.option.BrokerAddress() var hasCreated bool for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) { @@ -186,6 +177,25 @@ func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p t return localTopicPartition, nil } +func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic, p topic.Partition) (conf *mq_pb.ConfigureTopicResponse, err error) { + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err != nil { + return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err) + } + // parse into filer conf object + conf = &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err) + } + return nil + }); err != nil { + return nil, err + } + return conf, err +} + // duplicated from master_grpc_server.go func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx)