package msgclient import ( "context" "fmt" "log" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) type MessagingClient struct { bootstrapBrokers []string grpcConnections map[broker.TopicPartition]*grpc.ClientConn grpcDialOption grpc.DialOption } func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { return &MessagingClient{ bootstrapBrokers: bootstrapBrokers, grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), } } func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { for _, broker := range mc.bootstrapBrokers { grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) if err != nil { log.Printf("dial broker %s: %v", broker, err) continue } defer grpcConnection.Close() resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), &messaging_pb.FindBrokerRequest{ Namespace: tp.Namespace, Topic: tp.Topic, Parition: tp.Partition, }) if err != nil { return nil, err } targetBroker := resp.Broker return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) } return nil, fmt.Errorf("no broker found for %+v", tp) }