1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-06-29 16:22:46 +02:00
seaweedfs/weed/mq/client/agent_client/subscribe_session.go
Chris Lu 02773a6107
Accumulated changes for message queue (#6600)
* rename

* set agent address

* refactor

* add agent sub

* pub messages

* grpc new client

* can publish records via agent

* send init message with session id

* fmt

* check cancelled request while waiting

* use sessionId

* handle possible nil stream

* subscriber process messages

* separate debug port

* use atomic int64

* less logs

* minor

* skip io.EOF

* rename

* remove unused

* use saved offsets

* do not reuse session, since always session id is new after restart

remove last active ts from SessionEntry

* simplify printing

* purge unused

* just proxy the subscription, skipping the session step

* adjust offset types

* subscribe offset type and possible value

* start after the known tsns

* avoid wrongly set startPosition

* move

* remove

* refactor

* typo

* fix

* fix changed path
2025-03-09 23:49:42 -07:00

87 lines
2.5 KiB
Go

package agent_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type SubscribeOption struct {
ConsumerGroup string
ConsumerGroupInstanceId string
Topic topic.Topic
OffsetType schema_pb.OffsetType
OffsetTsNs int64
Filter string
MaxSubscribedPartitions int32
SlidingWindowSize int32
}
type SubscribeSession struct {
Option *SubscribeOption
stream grpc.BidiStreamingClient[mq_agent_pb.SubscribeRecordRequest, mq_agent_pb.SubscribeRecordResponse]
}
func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*SubscribeSession, error) {
// call local agent grpc server to create a new session
clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
}
agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
initRequest := &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{
ConsumerGroup: option.ConsumerGroup,
ConsumerGroupInstanceId: option.ConsumerGroupInstanceId,
Topic: &schema_pb.Topic{
Namespace: option.Topic.Namespace,
Name: option.Topic.Name,
},
OffsetType: option.OffsetType,
OffsetTsNs: option.OffsetTsNs,
MaxSubscribedPartitions: option.MaxSubscribedPartitions,
Filter: option.Filter,
SlidingWindowSize: option.SlidingWindowSize,
}
stream, err := agentClient.SubscribeRecord(context.Background())
if err != nil {
return nil, fmt.Errorf("subscribe record: %v", err)
}
if err = stream.Send(&mq_agent_pb.SubscribeRecordRequest{
Init: initRequest,
}); err != nil {
return nil, fmt.Errorf("send session id: %v", err)
}
return &SubscribeSession{
Option: option,
stream: stream,
}, nil
}
func (s *SubscribeSession) CloseSession() error {
err := s.stream.CloseSend()
return err
}
func (a *SubscribeSession) SubscribeMessageRecord(
onEachMessageFn func(key []byte, record *schema_pb.RecordValue),
onCompletionFn func()) error {
for {
resp, err := a.stream.Recv()
if err != nil {
return err
}
onEachMessageFn(resp.Key, resp.Value)
}
if onCompletionFn != nil {
onCompletionFn()
}
return nil
}