diff --git a/weed/command/command.go b/weed/command/command.go index 7635405dc..abd1b63e9 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -33,7 +33,7 @@ var Commands = []*Command{ cmdMount, cmdS3, cmdIam, - cmdMsgBroker, + cmdMqBroker, cmdScaffold, cmdServer, cmdShell, diff --git a/weed/command/msg_broker.go b/weed/command/mq_broker.go similarity index 56% rename from weed/command/msg_broker.go rename to weed/command/mq_broker.go index 3274f599b..a5a6e3566 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/mq_broker.go @@ -19,10 +19,10 @@ import ( ) var ( - messageBrokerStandaloneOptions MessageBrokerOptions + mqBrokerStandaloneOptions MessageQueueBrokerOptions ) -type MessageBrokerOptions struct { +type MessageQueueBrokerOptions struct { filer *string ip *string port *int @@ -31,16 +31,16 @@ type MessageBrokerOptions struct { } func init() { - cmdMsgBroker.Run = runMsgBroker // break init cycle - messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") - messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") - messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port") - messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file") - messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file") + cmdMqBroker.Run = runMqBroker // break init cycle + mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address") + mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") + mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") + mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") + mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") } -var cmdMsgBroker = &Command{ - UsageLine: "msgBroker [-port=17777] [-filer=]", +var cmdMqBroker = &Command{ + UsageLine: "mq.broker [-port=17777] [-filer=]", Short: "start a message queue broker", Long: `start a message queue broker @@ -50,19 +50,19 @@ var cmdMsgBroker = &Command{ `, } -func runMsgBroker(cmd *Command, args []string) bool { +func runMqBroker(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - return messageBrokerStandaloneOptions.startQueueServer() + return mqBrokerStandaloneOptions.startQueueServer() } -func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { +func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { - grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) + grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) - filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) + filerAddress := pb.ServerAddress(*mqBrokerOpt.filer) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") cipher := false @@ -77,10 +77,10 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) + glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) break } } @@ -89,15 +89,15 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { Filers: []pb.ServerAddress{filerAddress}, DefaultReplication: "", MaxMB: 0, - Ip: *msgBrokerOpt.ip, - Port: *msgBrokerOpt.port, + Ip: *mqBrokerOpt.ip, + Port: *mqBrokerOpt.port, Cipher: cipher, }, grpcDialOption) // start grpc listener - grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) + grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0) if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) + glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) diff --git a/weed/command/server.go b/weed/command/server.go index b1812bb9b..7c14fd14f 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -24,13 +24,13 @@ type ServerOptions struct { } var ( - serverOptions ServerOptions - masterOptions MasterOptions - filerOptions FilerOptions - s3Options S3Options - iamOptions IamOptions - webdavOptions WebDavOption - msgBrokerOptions MessageBrokerOptions + serverOptions ServerOptions + masterOptions MasterOptions + filerOptions FilerOptions + s3Options S3Options + iamOptions IamOptions + webdavOptions WebDavOption + mqBrokerOptions MessageQueueBrokerOptions ) func init() { @@ -74,7 +74,7 @@ var ( isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") - isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") + isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker") serverWhiteList []string @@ -155,7 +155,7 @@ func init() { webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") - msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") + mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port") } @@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool { if *isStartingWebDav { *isStartingFiler = true } - if *isStartingMsgBroker { + if *isStartingMqBroker { *isStartingFiler = true } @@ -208,7 +208,7 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack - msgBrokerOptions.ip = serverIp + mqBrokerOptions.ip = serverIp // serverOptions.v.pulseSeconds = pulseSeconds // masterOptions.pulseSeconds = pulseSeconds @@ -224,7 +224,7 @@ func runServer(cmd *Command, args []string) bool { s3Options.filer = &filerAddress iamOptions.filer = &filerAddress webdavOptions.filer = &filerAddress - msgBrokerOptions.filer = &filerAddress + mqBrokerOptions.filer = &filerAddress go stats_collect.StartMetricsServer(*serverMetricsHttpPort) @@ -276,10 +276,10 @@ func runServer(cmd *Command, args []string) bool { }() } - if *isStartingMsgBroker { + if *isStartingMqBroker { go func() { time.Sleep(2 * time.Second) - msgBrokerOptions.startQueueServer() + mqBrokerOptions.startQueueServer() }() }