1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-06-29 16:22:46 +02:00
seaweedfs/weed/mq/agent/agent_grpc_publish.go
Chris Lu 02773a6107
Accumulated changes for message queue (#6600)
* rename

* set agent address

* refactor

* add agent sub

* pub messages

* grpc new client

* can publish records via agent

* send init message with session id

* fmt

* check cancelled request while waiting

* use sessionId

* handle possible nil stream

* subscriber process messages

* separate debug port

* use atomic int64

* less logs

* minor

* skip io.EOF

* rename

* remove unused

* use saved offsets

* do not reuse session, since always session id is new after restart

remove last active ts from SessionEntry

* simplify printing

* purge unused

* just proxy the subscription, skipping the session step

* adjust offset types

* subscribe offset type and possible value

* start after the known tsns

* avoid wrongly set startPosition

* move

* remove

* refactor

* typo

* fix

* fix changed path
2025-03-09 23:49:42 -07:00

44 lines
917 B
Go

package agent
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
)
func (a *MessageQueueAgent) PublishRecord(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error {
m, err := stream.Recv()
if err != nil {
return err
}
sessionId := SessionId(m.SessionId)
a.publishersLock.RLock()
publisherEntry, found := a.publishers[sessionId]
a.publishersLock.RUnlock()
if !found {
return fmt.Errorf("publish session id %d not found", sessionId)
}
defer func() {
a.publishersLock.Lock()
delete(a.publishers, sessionId)
a.publishersLock.Unlock()
}()
if m.Value != nil {
if err := publisherEntry.entry.PublishRecord(m.Key, m.Value); err != nil {
return err
}
}
for {
m, err = stream.Recv()
if err != nil {
return err
}
if m.Value == nil {
continue
}
if err := publisherEntry.entry.PublishRecord(m.Key, m.Value); err != nil {
return err
}
}
}