diff --git a/go.mod b/go.mod index 52a61a9c7..14a814ff1 100644 --- a/go.mod +++ b/go.mod @@ -266,6 +266,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8 // indirect github.com/rclone/ftp v0.0.0-20230327202000-dadc1f64e87d // indirect + github.com/rdleal/intervalst v0.0.0-20221028215511-a098aa0d2cb8 // indirect github.com/rfjakob/eme v1.1.2 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/shirou/gopsutil/v3 v3.23.5 // indirect diff --git a/go.sum b/go.sum index 84925174e..10dfe33e1 100644 --- a/go.sum +++ b/go.sum @@ -733,6 +733,8 @@ github.com/rclone/rclone v1.63.1 h1:iITCUNBfAXnguHjRPFq+w/gGIW0L0las78h4H5CH2Ms= github.com/rclone/rclone v1.63.1/go.mod h1:eUQaKsf1wJfHKB0RDoM8RaPAeRB2eI/Qw+Vc9Ho5FGM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rdleal/intervalst v0.0.0-20221028215511-a098aa0d2cb8 h1:5jSBlCYQYquRF8Zch4QrimJcX7/H1qbWQEAzYNxMubc= +github.com/rdleal/intervalst v0.0.0-20221028215511-a098aa0d2cb8/go.mod h1:xO89Z6BC+LQDH+IPQQw/OESt5UADgFD41tYMUINGpxQ= github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= github.com/rekby/fixenv v0.3.2/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c= diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 4f9eb5182..96448be83 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -17,7 +17,7 @@ import ( // 2.2 if the topic is found, return the brokers // // 3. unlock the topic -func (broker *MessageQueueBroker) FindTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) { +func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) { ret := &mq_pb.LookupTopicBrokersResponse{} // TODO lock the topic diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 6e769b2fa..d8f33c2a5 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -79,16 +79,17 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS } response := &mq_pb.PublishResponse{} // TODO check whether current broker should be the leader for the topic partition - if initMessage := req.GetInit(); initMessage != nil { - localTopicPartition = broker.localTopicManager.GetTopicPartition( - topic.FromPbTopic(initMessage.Topic), - topic.FromPbPartition(initMessage.Partition), - ) + initMessage := req.GetInit() + if initMessage != nil { + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p) if localTopicPartition == nil { - response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) - return stream.Send(response) + localTopicPartition = topic.NewLocalPartition(t, p, true, nil) } + } else { + response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) + return stream.Send(response) } // process each published messages @@ -104,7 +105,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS AckSequence: sequence, } if dataMessage := req.GetData(); dataMessage != nil { - print('+') + print("+") localTopicPartition.Publish(dataMessage) } if err := stream.Send(response); err != nil { diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go new file mode 100644 index 000000000..a540143a4 --- /dev/null +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -0,0 +1,29 @@ +package main + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" +) + +func main() { + + publisher := pub_client.NewTopicPublisher( + "test", "test") + if err := publisher.Connect("localhost:17777"); err != nil { + fmt.Println(err) + return + } + + for i := 0; i < 10; i++ { + if dataErr := publisher.Publish( + []byte(fmt.Sprintf("key-%d", i)), + []byte(fmt.Sprintf("value-%d", i)), + ); dataErr != nil { + fmt.Println(dataErr) + return + } + } + + fmt.Println("done publishing") + +} diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go similarity index 100% rename from weed/mq/client/sub_client/subscriber.go rename to weed/mq/client/cmd/weed_sub/subscriber.go diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go new file mode 100644 index 000000000..c54b2687d --- /dev/null +++ b/weed/mq/client/pub_client/lookup.go @@ -0,0 +1,74 @@ +package pub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" +) + +func (p *TopicPublisher) doLookup( + brokerAddress string, grpcDialOption grpc.DialOption) error { + err := pb.WithBrokerGrpcClient(true, + brokerAddress, + grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + lookupResp, err := client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + IsForPublish: true, + }) + if err != nil { + return err + } + for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { + // partition => broker + p.partition2Broker.Insert( + brokerPartitionAssignment.Partition.RangeStart, + brokerPartitionAssignment.Partition.RangeStop, + brokerPartitionAssignment.LeaderBroker) + + // broker => publish client + // send init message + // save the publishing client + brokerAddress := brokerPartitionAssignment.LeaderBroker + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + publishClient, err := brokerClient.Publish(context.Background()) + if err != nil { + return fmt.Errorf("create publish client: %v", err) + } + p.broker2PublishClient.Set(brokerAddress, publishClient) + if err = publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Init{ + Init: &mq_pb.PublishRequest_InitMessage{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + Partition: &mq_pb.Partition{ + RingSize: brokerPartitionAssignment.Partition.RingSize, + RangeStart: brokerPartitionAssignment.Partition.RangeStart, + RangeStop: brokerPartitionAssignment.Partition.RangeStop, + }, + }, + }, + }); err != nil { + return fmt.Errorf("send init message: %v", err) + } + } + return nil + }) + + if err != nil { + return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) + } + return nil +} diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go new file mode 100644 index 000000000..0ecb55c9b --- /dev/null +++ b/weed/mq/client/pub_client/publish.go @@ -0,0 +1,34 @@ +package pub_client + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/broker" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func (p *TopicPublisher) Publish(key, value []byte) error { + hashKey := util.HashToInt32(key) % broker.MaxPartitionCount + if hashKey < 0 { + hashKey = -hashKey + } + brokerAddress, found := p.partition2Broker.Floor(hashKey, hashKey) + if !found { + return fmt.Errorf("no broker found for key %d", hashKey) + } + publishClient, found := p.broker2PublishClient.Get(brokerAddress) + if !found { + return fmt.Errorf("no publish client found for broker %s", brokerAddress) + } + if err := publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Data{ + Data: &mq_pb.DataMessage{ + Key: key, + Value: value, + }, + }, + }); err != nil { + return fmt.Errorf("send publish request: %v", err) + } + return nil +} diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 8be027ac7..171b5ebd7 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -1,59 +1,36 @@ -package main +package pub_client import ( - "context" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/rdleal/intervalst/interval" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) -func main() { - - err := pb.WithBrokerGrpcClient(true, - "localhost:17777", - grpc.WithTransportCredentials(insecure.NewCredentials()), - func(client mq_pb.SeaweedMessagingClient) error { - pubClient, err := client.Publish(context.Background()) - if err != nil { - return err - } - if initErr := pubClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: "test", - Name: "test", - }, - Partition: &mq_pb.Partition{ - RangeStart: 0, - RangeStop: 1, - RingSize: 1, - }, - }, - }, - }); initErr != nil { - return initErr - } - - for i := 0; i < 10; i++ { - if dataErr := pubClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Data{ - Data: &mq_pb.DataMessage{ - Key: []byte(fmt.Sprintf("key-%d", i)), - Value: []byte(fmt.Sprintf("value-%d", i)), - }, - }, - }); dataErr != nil { - return dataErr - } - } - return nil - }) - - if err != nil { - fmt.Println(err) - } - +type PublisherConfiguration struct { +} +type TopicPublisher struct { + namespace string + topic string + partition2Broker *interval.SearchTree[string, int32] + broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] +} + +func NewTopicPublisher(namespace, topic string) *TopicPublisher { + return &TopicPublisher{ + namespace: namespace, + topic: topic, + partition2Broker: interval.NewSearchTree[string](func(a, b int32) int { + return int(a - b) + }), + broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), + } +} + +func (p *TopicPublisher) Connect(bootstrapBroker string) error { + if err := p.doLookup(bootstrapBroker, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { + return err + } + return nil } diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go new file mode 100644 index 000000000..89d3d2c45 --- /dev/null +++ b/weed/mq/client/sub_client/lookup.go @@ -0,0 +1,74 @@ +package sub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" +) + +func (p *TopicSubscriber) doLookup( + brokerAddress string, grpcDialOption grpc.DialOption) error { + err := pb.WithBrokerGrpcClient(true, + brokerAddress, + grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + lookupResp, err := client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + IsForPublish: true, + }) + if err != nil { + return err + } + for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { + // partition => broker + p.partition2Broker.Insert( + brokerPartitionAssignment.Partition.RangeStart, + brokerPartitionAssignment.Partition.RangeStop, + brokerPartitionAssignment.LeaderBroker) + + // broker => publish client + // send init message + // save the publishing client + brokerAddress := brokerPartitionAssignment.LeaderBroker + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + publishClient, err := brokerClient.Publish(context.Background()) + if err != nil { + return fmt.Errorf("create publish client: %v", err) + } + p.broker2PublishClient.Set(brokerAddress, publishClient) + if err = publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Init{ + Init: &mq_pb.PublishRequest_InitMessage{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + Partition: &mq_pb.Partition{ + RingSize: brokerPartitionAssignment.Partition.RingSize, + RangeStart: brokerPartitionAssignment.Partition.RangeStart, + RangeStop: brokerPartitionAssignment.Partition.RangeStop, + }, + }, + }, + }); err != nil { + return fmt.Errorf("send init message: %v", err) + } + } + return nil + }) + + if err != nil { + return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) + } + return nil +} diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go new file mode 100644 index 000000000..158c93010 --- /dev/null +++ b/weed/mq/client/sub_client/subscribe.go @@ -0,0 +1,28 @@ +package sub_client + +import ( + cmap "github.com/orcaman/concurrent-map" + "github.com/rdleal/intervalst/interval" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +type SubscriberConfiguration struct { +} + +type TopicSubscriber struct { + namespace string + topic string + partition2Broker *interval.SearchTree[string, int32] + broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] +} + +func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber { + return &TopicSubscriber{ + namespace: namespace, + topic: topic, + partition2Broker: interval.NewSearchTree[string](func(a, b int32) int { + return int(a - b) + }), + broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), + } +} diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index a87922d9c..eaedb9f20 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -1,6 +1,7 @@ package topic import ( + "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -15,6 +16,24 @@ type LocalPartition struct { logBuffer *log_buffer.LogBuffer } +func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition { + return &LocalPartition{ + Partition: partition, + isLeader: isLeader, + FollowerBrokers: followerBrokers, + logBuffer: log_buffer.NewLogBuffer( + fmt.Sprintf("%s/%s/%4d-%4d", topic.Namespace, topic.Name, partition.RangeStart, partition.RangeStop), + 2*time.Minute, + func(startTime, stopTime time.Time, buf []byte) { + + }, + func() { + + }, + ), + } +} + type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error func (p LocalPartition) Publish(message *mq_pb.DataMessage) {