package msgclient import ( "context" "github.com/OneOfOne/xxhash" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) type Publisher struct { publishClients []messaging_pb.SeaweedMessaging_PublishClient topicConfiguration *messaging_pb.TopicConfiguration messageCount uint64 publisherId string } func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, } publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { tp := broker.TopicPartition{ Namespace: namespace, Topic: topic, Partition: int32(i), } grpcClientConn, err := mc.findBroker(tp) if err != nil { return nil, err } client, err := setupPublisherClient(grpcClientConn, tp) if err != nil { return nil, err } publishClients[i] = client } return &Publisher{ publishClients: publishClients, topicConfiguration: topicConfiguration, }, nil } func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) if err != nil { return nil, err } // send init message err = stream.Send(&messaging_pb.PublishRequest{ Init: &messaging_pb.PublishRequest_InitMessage{ Namespace: tp.Namespace, Topic: tp.Topic, Partition: tp.Partition, }, }) if err != nil { return nil, err } // process init response initResponse, err := stream.Recv() if err != nil { return nil, err } if initResponse.Redirect != nil { // TODO follow redirection } if initResponse.Config != nil { } // setup looks for control messages doneChan := make(chan error, 1) go func() { for { in, err := stream.Recv() if err != nil { doneChan <- err return } if in.Redirect != nil { } if in.Config != nil { } } }() return stream, nil } func (p *Publisher) Publish(m *messaging_pb.Message) error { hashValue := p.messageCount p.messageCount++ if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { if m.Key != nil { hashValue = xxhash.Checksum64(m.Key) } } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { hashValue = xxhash.Checksum64(m.Key) } else { // round robin } idx := int(hashValue) % len(p.publishClients) if idx < 0 { idx += len(p.publishClients) } return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ Data: m, }) }