1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-13 07:02:48 +02:00
seaweedfs/weed/mq/broker/broker_grpc_lookup.go
Chris Lu 51543bbb87
Admin UI: Add message queue to admin UI (#6958)
* add a menu item "Message Queue"

* add a menu item "Message Queue"
  * move the "brokers" link under it.
  * add "topics", "subscribers". Add pages for them.

* refactor

* show topic details

* admin display publisher and subscriber info

* remove publisher and subscribers from the topic row pull down

* collecting more stats from publishers and subscribers

* fix layout

* fix publisher name

* add local listeners for mq broker and agent

* render consumer group offsets

* remove subscribers from left menu

* topic with retention

* support editing topic retention

* show retention when listing topics

* create bucket

* Update s3_buckets_templ.go

* embed the static assets into the binary

fix https://github.com/seaweedfs/seaweedfs/issues/6964
2025-07-11 10:19:27 -07:00

312 lines
11 KiB
Go

package broker
import (
"context"
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// LookupTopicBrokers returns the brokers that are serving the topic
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
t := topic.FromPbTopic(request.Topic)
ret := &mq_pb.LookupTopicBrokersResponse{}
conf := &mq_pb.ConfigureTopicResponse{}
ret.Topic = request.Topic
if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
} else {
err = b.ensureTopicActiveAssignments(t, conf)
ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
}
return ret, err
}
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
ret := &mq_pb.ListTopicsResponse{}
// Scan the filer directory structure to find all topics
err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// List all namespaces under /topics
stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: filer.TopicsDir,
Limit: 1000,
})
if err != nil {
glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err)
return err
}
// Process each namespace
for {
resp, err := stream.Recv()
if err != nil {
if err.Error() == "EOF" {
break
}
return err
}
if !resp.Entry.IsDirectory {
continue
}
namespaceName := resp.Entry.Name
namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName)
// List all topics in this namespace
topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: namespacePath,
Limit: 1000,
})
if err != nil {
glog.V(0).Infof("list topics in namespace %s: %v", namespacePath, err)
continue
}
// Process each topic in the namespace
for {
topicResp, err := topicStream.Recv()
if err != nil {
if err.Error() == "EOF" {
break
}
glog.V(0).Infof("error reading topic stream in namespace %s: %v", namespaceName, err)
break
}
if !topicResp.Entry.IsDirectory {
continue
}
topicName := topicResp.Entry.Name
// Check if topic.conf exists
topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName)
confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
Directory: topicPath,
Name: filer.TopicConfFile,
})
if err != nil {
glog.V(0).Infof("lookup topic.conf in %s: %v", topicPath, err)
continue
}
if confResp.Entry != nil {
// This is a valid topic
topic := &schema_pb.Topic{
Namespace: namespaceName,
Name: topicName,
}
ret.Topics = append(ret.Topics, topic)
}
}
}
return nil
})
if err != nil {
glog.V(0).Infof("list topics from filer: %v", err)
// Return empty response on error
return &mq_pb.ListTopicsResponse{}, nil
}
return ret, nil
}
// GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.GetTopicConfiguration(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
t := topic.FromPbTopic(request.Topic)
var conf *mq_pb.ConfigureTopicResponse
var createdAtNs, modifiedAtNs int64
if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
}
// Ensure topic assignments are active
err = b.ensureTopicActiveAssignments(t, conf)
if err != nil {
glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to ensure topic assignments: %v", err)
}
// Build the response with complete configuration including metadata
ret := &mq_pb.GetTopicConfigurationResponse{
Topic: request.Topic,
PartitionCount: int32(len(conf.BrokerPartitionAssignments)),
RecordType: conf.RecordType,
BrokerPartitionAssignments: conf.BrokerPartitionAssignments,
CreatedAtNs: createdAtNs,
LastUpdatedNs: modifiedAtNs,
Retention: conf.Retention,
}
return ret, nil
}
// GetTopicPublishers returns the active publishers for a topic
func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq_pb.GetTopicPublishersRequest) (resp *mq_pb.GetTopicPublishersResponse, err error) {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.GetTopicPublishers(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
t := topic.FromPbTopic(request.Topic)
var publishers []*mq_pb.TopicPublisher
// Get topic configuration to find partition assignments
var conf *mq_pb.ConfigureTopicResponse
if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
}
// Collect publishers from each partition that is hosted on this broker
for _, assignment := range conf.BrokerPartitionAssignments {
// Only collect from partitions where this broker is the leader
if assignment.LeaderBroker == b.option.BrokerAddress().String() {
partition := topic.FromPbPartition(assignment.Partition)
if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
// Get publisher information from local partition
localPartition.Publishers.ForEachPublisher(func(clientName string, publisher *topic.LocalPublisher) {
connectTimeNs, lastSeenTimeNs := publisher.GetTimestamps()
lastPublishedOffset, lastAckedOffset := publisher.GetOffsets()
publishers = append(publishers, &mq_pb.TopicPublisher{
PublisherName: clientName,
ClientId: clientName, // For now, client name is used as client ID
Partition: assignment.Partition,
ConnectTimeNs: connectTimeNs,
LastSeenTimeNs: lastSeenTimeNs,
Broker: assignment.LeaderBroker,
IsActive: true,
LastPublishedOffset: lastPublishedOffset,
LastAckedOffset: lastAckedOffset,
})
})
}
}
}
return &mq_pb.GetTopicPublishersResponse{
Publishers: publishers,
}, nil
}
// GetTopicSubscribers returns the active subscribers for a topic
func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *mq_pb.GetTopicSubscribersRequest) (resp *mq_pb.GetTopicSubscribersResponse, err error) {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.GetTopicSubscribers(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
t := topic.FromPbTopic(request.Topic)
var subscribers []*mq_pb.TopicSubscriber
// Get topic configuration to find partition assignments
var conf *mq_pb.ConfigureTopicResponse
if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
}
// Collect subscribers from each partition that is hosted on this broker
for _, assignment := range conf.BrokerPartitionAssignments {
// Only collect from partitions where this broker is the leader
if assignment.LeaderBroker == b.option.BrokerAddress().String() {
partition := topic.FromPbPartition(assignment.Partition)
if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
// Get subscriber information from local partition
localPartition.Subscribers.ForEachSubscriber(func(clientName string, subscriber *topic.LocalSubscriber) {
// Parse client name to extract consumer group and consumer ID
// Format is typically: "consumerGroup/consumerID"
consumerGroup := "default"
consumerID := clientName
if idx := strings.Index(clientName, "/"); idx != -1 {
consumerGroup = clientName[:idx]
consumerID = clientName[idx+1:]
}
connectTimeNs, lastSeenTimeNs := subscriber.GetTimestamps()
lastReceivedOffset, lastAckedOffset := subscriber.GetOffsets()
subscribers = append(subscribers, &mq_pb.TopicSubscriber{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: clientName, // Full client name as client ID
Partition: assignment.Partition,
ConnectTimeNs: connectTimeNs,
LastSeenTimeNs: lastSeenTimeNs,
Broker: assignment.LeaderBroker,
IsActive: true,
CurrentOffset: lastAckedOffset, // for compatibility
LastReceivedOffset: lastReceivedOffset,
})
})
}
}
}
return &mq_pb.GetTopicSubscribersResponse{
Subscribers: subscribers,
}, nil
}
func (b *MessageQueueBroker) isLockOwner() bool {
return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
}