From be0c426dc70765fba0916bae6f2a4ec4162f8784 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 16 Jan 2024 09:30:46 -0800 Subject: [PATCH] simplify to LookupTopicPartitions(topic) --- weed/mq/broker/broker_grpc_configure.go | 24 ++++++++++++++++++++++- weed/mq/broker/broker_grpc_lookup.go | 13 ++----------- weed/mq/pub_balancer/lookup.go | 26 +------------------------ 3 files changed, 26 insertions(+), 37 deletions(-) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index e8b70a0ce..83a26446c 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -1,6 +1,7 @@ package broker import ( + "bytes" "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -38,7 +39,28 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } ret := &mq_pb.ConfigureTopicResponse{} - ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, request.PartitionCount) + existingAssignments := b.Balancer.LookupTopicPartitions(request.Topic) + if len(existingAssignments) == int(request.PartitionCount) { + glog.V(0).Infof("existing topic partitions %d: %+v", len(existingAssignments), existingAssignments) + ret.BrokerPartitionAssignments = existingAssignments + } else { + if b.Balancer.Brokers.IsEmpty() { + return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error()) + } + ret.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) + + // save the topic configuration on filer + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name) + if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var buf bytes.Buffer + filer.ProtoToText(&buf, ret) + return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes()) + }); err != nil { + return nil, fmt.Errorf("create topic %s: %v", topicDir, err) + } + + b.Balancer.OnPartitionChange(request.Topic, ret.BrokerPartitionAssignments) + } for _, bpa := range ret.BrokerPartitionAssignments { fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 74456c6e3..fbf85dde7 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -8,16 +8,7 @@ import ( "google.golang.org/grpc/status" ) -// FindTopicBrokers returns the brokers that are serving the topic -// -// 1. lock the topic -// -// 2. find the topic partitions on the filer -// 2.1 if the topic is not found, return error -// 2.1.2 if the request is_for_publish, create the topic -// 2.2 if the topic is found, return the brokers -// -// 3. unlock the topic +// LookupTopicBrokers returns the brokers that are serving the topic func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { if b.currentBalancer == "" { return nil, status.Errorf(codes.Unavailable, "no balancer") @@ -35,7 +26,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq ret := &mq_pb.LookupTopicBrokersResponse{} ret.Topic = request.Topic - ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, -1) + ret.BrokerPartitionAssignments = b.Balancer.LookupTopicPartitions(ret.Topic) return ret, err } diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index b74909729..052932c04 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -2,7 +2,6 @@ package pub_balancer import ( "errors" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -10,10 +9,7 @@ var ( ErrNoBroker = errors.New("no broker") ) -func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) { - if partitionCount == 0 { - partitionCount = 6 - } +func (balancer *Balancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) { // find existing topic partition assignments for brokerStatsItem := range balancer.Brokers.IterBuffered() { broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val @@ -35,25 +31,5 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pa } } } - if len(assignments) > 0 { - glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments) - return assignments, true, nil - } - if partitionCount < 0 { - return nil, false, nil - } - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - // t := topic.FromPbTopic(request.Topic) - if balancer.Brokers.IsEmpty() { - return nil, alreadyExists, ErrNoBroker - } - assignments = AllocateTopicPartitions(balancer.Brokers, partitionCount) - balancer.OnPartitionChange(topic, assignments) return }