From 1492bf7552e4b8dea59c4ade4b0b467565213e79 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 24 Sep 2023 23:05:41 -0700 Subject: [PATCH] fix listing topics --- weed/mq/balancer/balancer.go | 31 +++++++++++++++----------- weed/mq/balancer/lookup.go | 7 ++---- weed/mq/broker/broker_grpc_balancer.go | 5 +++-- weed/mq/broker/broker_grpc_create.go | 14 +++++++++++- weed/mq/broker/broker_grpc_lookup.go | 7 ++++-- weed/mq/broker/broker_stats.go | 1 + weed/mq/topic/local_manager.go | 9 +++++++- weed/mq/topic/topic_partition.go | 14 ++++++++++++ 8 files changed, 64 insertions(+), 24 deletions(-) create mode 100644 weed/mq/topic/topic_partition.go diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 4da735bd3..bc919d4e3 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -3,6 +3,7 @@ package balancer import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -30,7 +31,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { currentTopicPartitions := bs.Stats.Items() for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ - TopicPartition: TopicPartition{ + TopicPartition: topic.TopicPartition{ Namespace: topicPartitionStats.Topic.Namespace, Topic: topicPartitionStats.Topic.Name, RangeStart: topicPartitionStats.Partition.RangeStart, @@ -52,20 +53,28 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { } +func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { + tps := &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Namespace: t.Namespace, + Topic: t.Name, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + ConsumerCount: 0, + IsLeader: true, + } + key := tps.TopicPartition.String() + bs.Stats.Set(key, tps) +} + func (bs *BrokerStats) String() string { return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}", bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.Stats.Items()) } -type TopicPartition struct { - Namespace string - Topic string - RangeStart int32 - RangeStop int32 -} - type TopicPartitionStats struct { - TopicPartition + topic.TopicPartition ConsumerCount int32 IsLeader bool } @@ -81,7 +90,3 @@ func NewBrokerStats() *BrokerStats { Stats: cmap.New[*TopicPartitionStats](), } } - -func (tp *TopicPartition) String() string { - return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) -} diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go index 5a14b3317..7362fbab7 100644 --- a/weed/mq/balancer/lookup.go +++ b/weed/mq/balancer/lookup.go @@ -24,11 +24,8 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b RangeStop: topicPartitionStat.RangeStop, }, } - if topicPartitionStat.IsLeader { - assignment.LeaderBroker = broker - } else { - assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) - } + // TODO fix follower setting + assignment.LeaderBroker = broker assignments = append(assignments, assignment) } } diff --git a/weed/mq/broker/broker_grpc_balancer.go b/weed/mq/broker/broker_grpc_balancer.go index e82fc47bf..63dda7d69 100644 --- a/weed/mq/broker/broker_grpc_balancer.go +++ b/weed/mq/broker/broker_grpc_balancer.go @@ -24,6 +24,7 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin if initMessage != nil { broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats) } else { + // TODO fix this return status.Errorf(codes.InvalidArgument, "balancer init message is empty") } defer func() { @@ -42,8 +43,8 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin if receivedStats := req.GetStats(); receivedStats != nil { brokerStats.UpdateStats(receivedStats) - glog.V(3).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats) - glog.V(3).Infof("received stats: %+v", receivedStats) + glog.V(4).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats) + glog.V(4).Infof("received stats: %+v", receivedStats) } } diff --git a/weed/mq/broker/broker_grpc_create.go b/weed/mq/broker/broker_grpc_create.go index cb9c91f28..44c2d4ee9 100644 --- a/weed/mq/broker/broker_grpc_create.go +++ b/weed/mq/broker/broker_grpc_create.go @@ -2,6 +2,8 @@ package broker import ( "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -29,12 +31,22 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) for _, bpa := range ret.BrokerPartitionAssignments { + // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker) if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error { _, doCreateErr := client.DoCreateTopic(ctx, &mq_pb.DoCreateTopicRequest{ Topic: request.Topic, Partition: bpa.Partition, }) - return doCreateErr + if doCreateErr != nil { + return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr) + } + brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker) + if !found { + brokerStats = balancer.NewBrokerStats() + broker.Balancer.Brokers.Set(bpa.LeaderBroker, brokerStats) + } + brokerStats.RegisterAssignment(request.Topic, bpa.Partition) + return nil }); doCreateErr != nil { return nil, doCreateErr } diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index d6495e410..93586e22d 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -2,6 +2,7 @@ package broker import ( "context" + "fmt" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -62,7 +63,7 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb } ret := &mq_pb.ListTopicsResponse{} - knownTopics := make(map[*mq_pb.Topic]struct{}) + knownTopics := make(map[string]struct{}) for brokerStatsItem := range broker.Balancer.Brokers.IterBuffered() { _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { @@ -71,9 +72,11 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb Namespace: topicPartitionStat.TopicPartition.Namespace, Name: topicPartitionStat.TopicPartition.Topic, } - if _, found := knownTopics[topic]; found { + topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name) + if _, found := knownTopics[topicKey]; found { continue } + knownTopics[topicKey] = struct{}{} ret.Topics = append(ret.Topics, topic) } } diff --git a/weed/mq/broker/broker_stats.go b/weed/mq/broker/broker_stats.go index 3314f49a4..2c06f4668 100644 --- a/weed/mq/broker/broker_stats.go +++ b/weed/mq/broker/broker_stats.go @@ -61,6 +61,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error { if err != nil { return fmt.Errorf("send stats message: %v", err) } + glog.V(4).Infof("sent stats: %+v", stats) time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) } diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 0f5ef3496..432fa4153 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -63,7 +63,13 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br } manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { for _, localPartition := range localTopic.Partitions { - stats.Stats[topic] = &mq_pb.TopicPartitionStats{ + topicPartition := &TopicPartition{ + Namespace: string(localTopic.Namespace), + Topic: localTopic.Name, + RangeStart: localPartition.RangeStart, + RangeStop: localPartition.RangeStop, + } + stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{ Topic: &mq_pb.Topic{ Namespace: string(localTopic.Namespace), Name: localTopic.Name, @@ -75,6 +81,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br }, ConsumerCount: localPartition.ConsumerCount, } + // fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition) } }) diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go new file mode 100644 index 000000000..3d927b1d8 --- /dev/null +++ b/weed/mq/topic/topic_partition.go @@ -0,0 +1,14 @@ +package topic + +import "fmt" + +type TopicPartition struct { + Namespace string + Topic string + RangeStart int32 + RangeStop int32 +} + +func (tp *TopicPartition) String() string { + return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) +}