mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-06-29 08:12:47 +02:00
* rename * set agent address * refactor * add agent sub * pub messages * grpc new client * can publish records via agent * send init message with session id * fmt * check cancelled request while waiting * use sessionId * handle possible nil stream * subscriber process messages * separate debug port * use atomic int64 * less logs * minor * skip io.EOF * rename * remove unused * use saved offsets * do not reuse session, since always session id is new after restart remove last active ts from SessionEntry * simplify printing * purge unused * just proxy the subscription, skipping the session step * adjust offset types * subscribe offset type and possible value * start after the known tsns * avoid wrongly set startPosition * move * remove * refactor * typo * fix * fix changed path
55 lines
1.5 KiB
Go
55 lines
1.5 KiB
Go
package agent
|
|
|
|
import (
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
|
"google.golang.org/grpc"
|
|
"sync"
|
|
)
|
|
|
|
type SessionId int64
|
|
type SessionEntry[T any] struct {
|
|
entry T
|
|
}
|
|
|
|
type MessageQueueAgentOptions struct {
|
|
SeedBrokers []pb.ServerAddress
|
|
}
|
|
|
|
type MessageQueueAgent struct {
|
|
mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
|
|
option *MessageQueueAgentOptions
|
|
brokers []pb.ServerAddress
|
|
grpcDialOption grpc.DialOption
|
|
publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
|
|
publishersLock sync.RWMutex
|
|
subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
|
|
subscribersLock sync.RWMutex
|
|
}
|
|
|
|
func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
|
|
|
|
// initialize brokers which may change later
|
|
var brokers []pb.ServerAddress
|
|
for _, broker := range option.SeedBrokers {
|
|
brokers = append(brokers, broker)
|
|
}
|
|
|
|
return &MessageQueueAgent{
|
|
option: option,
|
|
brokers: brokers,
|
|
grpcDialOption: grpcDialOption,
|
|
publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
|
|
subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
|
|
}
|
|
}
|
|
|
|
func (a *MessageQueueAgent) brokersList() []string {
|
|
var brokers []string
|
|
for _, broker := range a.brokers {
|
|
brokers = append(brokers, broker.String())
|
|
}
|
|
return brokers
|
|
}
|