mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-25 13:02:47 +02:00
221 lines
5.2 KiB
Go
221 lines
5.2 KiB
Go
package webhook
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/ThreeDotsLabs/watermill"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
|
|
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
|
|
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/notification"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func init() {
|
|
notification.MessageQueues = append(notification.MessageQueues, &Queue{})
|
|
}
|
|
|
|
type Queue struct {
|
|
router *message.Router
|
|
queueChannel *gochannel.GoChannel
|
|
config *config
|
|
client client
|
|
filter *filter
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func (w *Queue) GetName() string {
|
|
return queueName
|
|
}
|
|
|
|
func (w *Queue) SendMessage(key string, msg proto.Message) error {
|
|
eventNotification, ok := msg.(*filer_pb.EventNotification)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
|
|
return nil
|
|
}
|
|
|
|
m := newWebhookMessage(key, msg)
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
|
|
wMsg, err := m.toWaterMillMessage()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return w.queueChannel.Publish(pubSubTopicName, wMsg)
|
|
}
|
|
|
|
func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
|
|
payload, err := proto.Marshal(w.Notification)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
msg := message.NewMessage(watermill.NewUUID(), payload)
|
|
// Set event type and key as metadata
|
|
msg.Metadata.Set("event_type", w.EventType)
|
|
msg.Metadata.Set("key", w.Key)
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
|
|
c := newConfigWithDefaults(configuration, prefix)
|
|
|
|
if err := c.validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return w.initialize(c)
|
|
}
|
|
|
|
func (w *Queue) initialize(cfg *config) error {
|
|
w.ctx, w.cancel = context.WithCancel(context.Background())
|
|
w.config = cfg
|
|
w.filter = newFilter(cfg)
|
|
|
|
hClient, err := newHTTPClient(cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create webhook http client: %w", err)
|
|
}
|
|
w.client = hClient
|
|
|
|
if err = w.setupWatermillQueue(cfg); err != nil {
|
|
return fmt.Errorf("failed to setup watermill queue: %w", err)
|
|
}
|
|
if err = w.logDeadLetterMessages(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Queue) setupWatermillQueue(cfg *config) error {
|
|
logger := watermill.NewStdLogger(false, false)
|
|
pubSubConfig := gochannel.Config{
|
|
OutputChannelBuffer: int64(cfg.bufferSize),
|
|
Persistent: false,
|
|
}
|
|
w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
|
|
|
|
router, err := message.NewRouter(
|
|
message.RouterConfig{
|
|
CloseTimeout: 60 * time.Second,
|
|
},
|
|
logger,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create router: %w", err)
|
|
}
|
|
w.router = router
|
|
|
|
retryMiddleware := middleware.Retry{
|
|
MaxRetries: cfg.maxRetries,
|
|
InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second,
|
|
MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second,
|
|
Multiplier: 2.0,
|
|
RandomizationFactor: 0.3,
|
|
Logger: logger,
|
|
}.Middleware
|
|
|
|
poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create poison queue: %w", err)
|
|
}
|
|
|
|
router.AddPlugin(plugin.SignalsHandler)
|
|
router.AddMiddleware(retryMiddleware, poisonQueue)
|
|
|
|
for i := 0; i < cfg.nWorkers; i++ {
|
|
router.AddNoPublisherHandler(
|
|
pubSubHandlerNameTemplate(i),
|
|
pubSubTopicName,
|
|
w.queueChannel,
|
|
w.handleWebhook,
|
|
)
|
|
}
|
|
|
|
go func() {
|
|
// cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
|
|
defer w.cancel()
|
|
|
|
if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
glog.Errorf("webhook pubsub worker stopped with error: %v", err)
|
|
}
|
|
|
|
glog.Info("webhook pubsub worker stopped")
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Queue) handleWebhook(msg *message.Message) error {
|
|
var n filer_pb.EventNotification
|
|
if err := proto.Unmarshal(msg.Payload, &n); err != nil {
|
|
glog.Errorf("failed to unmarshal protobuf message: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Reconstruct webhook message from metadata and payload
|
|
webhookMsg := &webhookMessage{
|
|
Key: msg.Metadata.Get("key"),
|
|
EventType: msg.Metadata.Get("event_type"),
|
|
Notification: &n,
|
|
}
|
|
|
|
if err := w.client.sendMessage(webhookMsg); err != nil {
|
|
glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Queue) logDeadLetterMessages() error {
|
|
ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case msg := <-ch:
|
|
if msg == nil {
|
|
glog.Errorf("received nil message from dead letter channel")
|
|
continue
|
|
}
|
|
key := "unknown"
|
|
if msg.Metadata != nil {
|
|
if keyValue, exists := msg.Metadata["key"]; exists {
|
|
key = keyValue
|
|
}
|
|
}
|
|
payload := ""
|
|
if msg.Payload != nil {
|
|
payload = string(msg.Payload)
|
|
}
|
|
glog.Errorf("received dead letter message: %s, key: %s", payload, key)
|
|
case <-w.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|