From e8b05ecc917464bba42c839ec2ddea7fd3a22e58 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 22 Jan 2024 10:47:39 -0800 Subject: [PATCH] add/remove assigned partitions --- weed/mq/broker/broker_grpc_assign.go | 52 ++++++++++++ weed/mq/broker/broker_grpc_configure.go | 106 +++++++++--------------- weed/mq/broker/broker_grpc_lookup.go | 10 ++- weed/mq/pub_balancer/broker_stats.go | 8 +- 4 files changed, 104 insertions(+), 72 deletions(-) create mode 100644 weed/mq/broker/broker_grpc_assign.go diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go new file mode 100644 index 000000000..323c0055c --- /dev/null +++ b/weed/mq/broker/broker_grpc_assign.go @@ -0,0 +1,52 @@ +package broker + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment +func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { + ret := &mq_pb.AssignTopicPartitionsResponse{} + self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)) + + // drain existing topic partition subscriptions + for _, assignment := range request.BrokerPartitionAssignments { + t := topic.FromPbTopic(request.Topic) + partition := topic.FromPbPartition(assignment.Partition) + b.accessLock.Lock() + if request.IsDraining { + // TODO drain existing topic partition subscriptions + b.localTopicManager.RemoveTopicPartition(t, partition) + } else { + var localPartition *topic.LocalPartition + if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + b.localTopicManager.AddTopicPartition(t, localPartition) + } + } + b.accessLock.Unlock() + } + + // if is leader, notify the followers to drain existing topic partition subscriptions + if request.IsLeader { + for _, brokerPartition := range request.BrokerPartitionAssignments { + for _, follower := range brokerPartition.FollowerBrokers { + err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.AssignTopicPartitions(context.Background(), request) + return err + }) + if err != nil { + return ret, err + } + } + } + } + + glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) + return ret, nil +} diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index e0f9319a4..9292a6184 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "sync" ) // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer @@ -54,7 +55,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) } - if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments); assignErr != nil { + if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, true); assignErr != nil { return nil, assignErr } @@ -63,77 +64,46 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return resp, err } -func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) error { +func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error { + // notify the brokers to create the topic partitions in parallel + var wg sync.WaitGroup for _, bpa := range assignments { - fmt.Printf("create topic %s partition %+v on %s\n", t, bpa.Partition, bpa.LeaderBroker) - if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { - _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ - Topic: t, - BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ - { - Partition: bpa.Partition, + wg.Add(1) + go func(bpa *mq_pb.BrokerPartitionAssignment) { + defer wg.Done() + if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { + _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{ + Topic: t, + BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + Partition: bpa.Partition, + }, }, - }, - IsLeader: true, - IsDraining: false, - }) - if doCreateErr != nil { - return fmt.Errorf("do create topic %s on %s: %v", t, bpa.LeaderBroker, doCreateErr) - } - brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) - if !found { - brokerStats = pub_balancer.NewBrokerStats() - if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { - brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) + IsLeader: true, + IsDraining: !isAdd, + }) + if doCreateErr != nil { + if !isAdd { + return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) + } else { + return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) + } } + brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) + if !found { + brokerStats = pub_balancer.NewBrokerStats() + if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { + brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) + } + } + brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) + return nil + }); doCreateErr != nil { + glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) } - brokerStats.RegisterAssignment(t, bpa.Partition) - return nil - }); doCreateErr != nil { - return doCreateErr - } + }(bpa) } + wg.Wait() + return nil } - -// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment -func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { - ret := &mq_pb.AssignTopicPartitionsResponse{} - self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)) - - // drain existing topic partition subscriptions - for _, assignment := range request.BrokerPartitionAssignments { - t := topic.FromPbTopic(request.Topic) - partition := topic.FromPbPartition(assignment.Partition) - b.accessLock.Lock() - if request.IsDraining { - // TODO drain existing topic partition subscriptions - b.localTopicManager.RemoveTopicPartition(t, partition) - } else { - var localPartition *topic.LocalPartition - if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { - localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localPartition) - } - } - b.accessLock.Unlock() - } - - // if is leader, notify the followers to drain existing topic partition subscriptions - if request.IsLeader { - for _, brokerPartition := range request.BrokerPartitionAssignments { - for _, follower := range brokerPartition.FollowerBrokers { - err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.AssignTopicPartitions(context.Background(), request) - return err - }) - if err != nil { - return ret, err - } - } - } - } - - glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) - return ret, nil -} diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 0ba0b628c..4ba1a0f75 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -3,6 +3,7 @@ package broker import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" @@ -25,12 +26,17 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq return resp, err } + t := topic.FromPbTopic(request.Topic) ret := &mq_pb.LookupTopicBrokersResponse{} + conf := &mq_pb.ConfigureTopicResponse{} ret.Topic = request.Topic - conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(request.Topic)) - if err == nil { + if conf, err = b.readTopicConfFromFiler(t); err != nil { + glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err) ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments + } else { + err = b.ensureTopicActiveAssignments(t, conf) } + return ret, err } diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index 2ae123822..45c5271df 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -63,7 +63,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { } -func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { +func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, @@ -78,5 +78,9 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti IsLeader: true, } key := tps.TopicPartition.String() - bs.TopicPartitionStats.Set(key, tps) + if isAdd { + bs.TopicPartitionStats.SetIfAbsent(key, tps) + } else { + bs.TopicPartitionStats.Remove(key) + } }