diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index cd9583016..0007e7364 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -67,7 +67,12 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) { if !found { continue } - partitionSlotToBrokerList.RemoveBroker(broker) + pickedBroker := pickBrokers(balancer.Brokers, 1) + if len(pickedBroker) == 0 { + partitionSlotToBrokerList.RemoveBroker(broker) + } else { + partitionSlotToBrokerList.ReplaceBroker(broker, pickedBroker[0]) + } } balancer.onPubRemoveBroker(broker, stats) balancer.OnRemoveBroker(broker, stats) diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go index f4180cf81..441b61898 100644 --- a/weed/mq/pub_balancer/partition_list_broker.go +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -44,9 +44,13 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke }) } func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) { + ps.ReplaceBroker(broker, "") +} + +func (ps *PartitionSlotToBrokerList) ReplaceBroker(oldBroker string, newBroker string) { for _, partitionSlot := range ps.PartitionSlots { - if partitionSlot.AssignedBroker == broker { - partitionSlot.AssignedBroker = "" + if partitionSlot.AssignedBroker == oldBroker { + partitionSlot.AssignedBroker = newBroker } } }