1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-24 20:42:47 +02:00
seaweedfs/weed/command/mq_agent.go
Chris Lu 51543bbb87
Admin UI: Add message queue to admin UI (#6958)
* add a menu item "Message Queue"

* add a menu item "Message Queue"
  * move the "brokers" link under it.
  * add "topics", "subscribers". Add pages for them.

* refactor

* show topic details

* admin display publisher and subscriber info

* remove publisher and subscribers from the topic row pull down

* collecting more stats from publishers and subscribers

* fix layout

* fix publisher name

* add local listeners for mq broker and agent

* render consumer group offsets

* remove subscribers from left menu

* topic with retention

* support editing topic retention

* show retention when listing topics

* create bucket

* Update s3_buckets_templ.go

* embed the static assets into the binary

fix https://github.com/seaweedfs/seaweedfs/issues/6964
2025-07-11 10:19:27 -07:00

91 lines
2.6 KiB
Go

package command
import (
"github.com/seaweedfs/seaweedfs/weed/mq/agent"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
mqAgentOptions MessageQueueAgentOptions
)
type MessageQueueAgentOptions struct {
brokers []pb.ServerAddress
brokersString *string
filerGroup *string
ip *string
port *int
}
func init() {
cmdMqAgent.Run = runMqAgent // break init cycle
mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address")
mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
}
var cmdMqAgent = &Command{
UsageLine: "mq.agent [-port=16777] [-broker=<ip:port>]",
Short: "<WIP> start a message queue agent",
Long: `start a message queue agent
The agent runs on local server to accept gRPC calls to write or read messages.
The messages are sent to message queue brokers.
`,
}
func runMqAgent(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses()
return mqAgentOptions.startQueueAgent()
}
func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent")
agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{
SeedBrokers: mqAgentOpt.brokers,
}, grpcDialOption)
// start grpc listener
grpcL, localL, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
}
// Create main gRPC server
grpcS := pb.NewGrpcServer()
mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer)
reflection.Register(grpcS)
// Start localhost listener if available
if localL != nil {
localGrpcS := pb.NewGrpcServer()
mq_agent_pb.RegisterSeaweedMessagingAgentServer(localGrpcS, agentServer)
reflection.Register(localGrpcS)
go func() {
glog.V(0).Infof("MQ Agent listening on localhost:%d", *mqAgentOpt.port)
if err := localGrpcS.Serve(localL); err != nil {
glog.Errorf("MQ Agent localhost listener error: %v", err)
}
}()
}
glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port)
grpcS.Serve(grpcL)
return true
}