package sub_coordinator import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) type TopicConsumerGroups struct { // map a consumer group name to a consumer group ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] } // Coordinator coordinates the instances in the consumer group for one topic. // It is responsible for: // 1. (Maybe) assigning partitions when a consumer instance is up/down. type Coordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] balancer *pub_balancer.Balancer } func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { return &Coordinator{ TopicSubscribers: cmap.New[*TopicConsumerGroups](), balancer: balancer, } } func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups { topicName := toTopicName(topic) tcg, _ := c.TopicSubscribers.Get(topicName) if tcg == nil { tcg = &TopicConsumerGroups{ ConsumerGroups: cmap.New[*ConsumerGroup](), } c.TopicSubscribers.Set(topicName, tcg) } return tcg } func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) { topicName := toTopicName(topic) c.TopicSubscribers.Remove(topicName) } func toTopicName(topic *mq_pb.Topic) string { topicName := topic.Namespace + "." + topic.Name return topicName } func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance { tcg := c.GetTopicConsumerGroups(topic) cg, _ := tcg.ConsumerGroups.Get(consumerGroup) if cg == nil { cg = NewConsumerGroup() tcg.ConsumerGroups.Set(consumerGroup, cg) } cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance) if cgi == nil { cgi = NewConsumerGroupInstance(consumerGroupInstance) cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi) } cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic) return cgi } func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) { tcg, _ := c.TopicSubscribers.Get(toTopicName(topic)) if tcg == nil { return } cg, _ := tcg.ConsumerGroups.Get(consumerGroup) if cg == nil { return } cg.ConsumerGroupInstances.Remove(consumerGroupInstance) cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic) if cg.ConsumerGroupInstances.Count() == 0 { tcg.ConsumerGroups.Remove(consumerGroup) } if tcg.ConsumerGroups.Count() == 0 { c.RemoveTopic(topic) } }