From d268fbe18a4ebe3485a117d53407638584a4b730 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 22 Jan 2024 11:57:17 -0800 Subject: [PATCH] when configure, cancel existing assignments --- weed/mq/broker/broker_grpc_configure.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index f5bcceb44..9b6cf9d2a 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -42,6 +42,11 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. if err == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) } else { + if resp!=nil && len(resp.BrokerPartitionAssignments) > 0 { + if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil { + glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) + } + } resp = &mq_pb.ConfigureTopicResponse{} if b.Balancer.Brokers.IsEmpty() { return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())