1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-25 21:12:47 +02:00
seaweedfs/weed/notification/webhook/webhook_queue_test.go
Ibrahim Konsowa d78aa3d2de
[Notifications] Improving webhook notifications (#6965)
* worker setup

* fix tests

* start worker

* graceful worker drain

* retry queue

* migrate queue to watermill

* adding filters and improvements

* add the event type to the webhook message

* eliminating redundant JSON serialization

* resolve review comments

* trigger actions

* fix tests

* typo fixes

* read max_backoff_seconds from config

* add more context to the dead letter

* close the http response on errors

* drain the http response body in case not empty

* eliminate exported typesπ
2025-07-15 10:49:37 -07:00

536 lines
12 KiB
Go

package webhook
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"google.golang.org/protobuf/proto"
)
func TestConfigValidation(t *testing.T) {
tests := []struct {
name string
config *config
wantErr bool
errMsg string
}{
{
name: "valid config",
config: &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: false,
},
{
name: "empty endpoint",
config: &config{
endpoint: "",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "endpoint is required",
},
{
name: "invalid URL",
config: &config{
endpoint: "://invalid-url",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "invalid webhook endpoint",
},
{
name: "timeout too large",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 301,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "timeout must be between",
},
{
name: "too many retries",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 30,
maxRetries: 11,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 10000,
},
wantErr: true,
errMsg: "max retries must be between",
},
{
name: "too many workers",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 101,
bufferSize: 10000,
},
wantErr: true,
errMsg: "workers must be between",
},
{
name: "buffer too large",
config: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 30,
maxRetries: 3,
backoffSeconds: 5,
maxBackoffSeconds: 30,
nWorkers: 5,
bufferSize: 1000001,
},
wantErr: true,
errMsg: "buffer size must be between",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.validate()
if (err != nil) != tt.wantErr {
t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil && tt.errMsg != "" {
if err.Error() == "" || !strings.Contains(err.Error(), tt.errMsg) {
t.Errorf("validate() error message = %v, want to contain %v", err.Error(), tt.errMsg)
}
}
})
}
}
func TestWebhookMessageSerialization(t *testing.T) {
msg := &filer_pb.EventNotification{
OldEntry: nil,
NewEntry: &filer_pb.Entry{
Name: "test.txt",
IsDirectory: false,
},
}
webhookMsg := newWebhookMessage("/test/path", msg)
wmMsg, err := webhookMsg.toWaterMillMessage()
if err != nil {
t.Fatalf("Failed to convert to watermill message: %v", err)
}
// Unmarshal the protobuf payload directly
var eventNotification filer_pb.EventNotification
err = proto.Unmarshal(wmMsg.Payload, &eventNotification)
if err != nil {
t.Fatalf("Failed to unmarshal protobuf message: %v", err)
}
// Check metadata
if wmMsg.Metadata.Get("key") != "/test/path" {
t.Errorf("Expected key '/test/path', got %v", wmMsg.Metadata.Get("key"))
}
if wmMsg.Metadata.Get("event_type") != "create" {
t.Errorf("Expected event type 'create', got %v", wmMsg.Metadata.Get("event_type"))
}
if eventNotification.NewEntry.Name != "test.txt" {
t.Errorf("Expected file name 'test.txt', got %v", eventNotification.NewEntry.Name)
}
}
func TestQueueInitialize(t *testing.T) {
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 10,
maxRetries: 3,
backoffSeconds: 3,
maxBackoffSeconds: 60,
nWorkers: 1,
bufferSize: 100,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Errorf("Initialize() error = %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
if q.router == nil {
t.Error("Expected router to be initialized")
}
if q.queueChannel == nil {
t.Error("Expected queueChannel to be initialized")
}
if q.client == nil {
t.Error("Expected client to be initialized")
}
if q.config == nil {
t.Error("Expected config to be initialized")
}
}
// TestQueueSendMessage test sending messages to the queue
func TestQueueSendMessage(t *testing.T) {
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Fatalf("Failed to initialize queue: %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
msg := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{
Name: "test.txt",
},
}
err = q.SendMessage("/test/path", msg)
if err != nil {
t.Errorf("SendMessage() error = %v", err)
}
}
func TestQueueHandleWebhook(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
cfg := &config{
endpoint: server.URL,
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 0,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
}
client, _ := newHTTPClient(cfg)
q := &Queue{
client: client,
}
message := newWebhookMessage("/test/path", &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{
Name: "test.txt",
},
})
wmMsg, err := message.toWaterMillMessage()
if err != nil {
t.Fatalf("Failed to create watermill message: %v", err)
}
err = q.handleWebhook(wmMsg)
if err != nil {
t.Errorf("handleWebhook() error = %v", err)
}
}
func TestQueueEndToEnd(t *testing.T) {
// Simplified test - just verify the queue can be created and message can be sent
// without needing full end-to-end processing
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 0,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Fatalf("Failed to initialize queue: %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
msg := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{
Name: "test.txt",
},
}
err = q.SendMessage("/test/path", msg)
if err != nil {
t.Errorf("SendMessage() error = %v", err)
}
}
func TestQueueRetryMechanism(t *testing.T) {
cfg := &config{
endpoint: "https://example.com/webhook",
authBearerToken: "test-token",
timeoutSeconds: 1,
maxRetries: 3, // Test that this config is used
backoffSeconds: 2,
maxBackoffSeconds: 10,
nWorkers: 1,
bufferSize: 10,
}
q := &Queue{}
err := q.initialize(cfg)
if err != nil {
t.Fatalf("Failed to initialize queue: %v", err)
}
defer func() {
if q.cancel != nil {
q.cancel()
}
time.Sleep(100 * time.Millisecond)
if q.router != nil {
q.router.Close()
}
}()
// Verify that the queue is properly configured for retries
if q.config.maxRetries != 3 {
t.Errorf("Expected maxRetries=3, got %d", q.config.maxRetries)
}
if q.config.backoffSeconds != 2 {
t.Errorf("Expected backoffSeconds=2, got %d", q.config.backoffSeconds)
}
if q.config.maxBackoffSeconds != 10 {
t.Errorf("Expected maxBackoffSeconds=10, got %d", q.config.maxBackoffSeconds)
}
// Test that we can send a message (retry behavior is handled by Watermill middleware)
msg := &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "test.txt"},
}
err = q.SendMessage("/test/retry", msg)
if err != nil {
t.Errorf("SendMessage() error = %v", err)
}
}
func TestQueueSendMessageWithFilter(t *testing.T) {
tests := []struct {
name string
cfg *config
key string
notification *filer_pb.EventNotification
shouldPublish bool
}{
{
name: "allowed event type",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"create"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "filtered event type",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"update", "rename"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "allowed path prefix",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
pathPrefixes: []string{"/data/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "filtered path prefix",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
pathPrefixes: []string{"/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "combined filters - both pass",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"create", "delete"},
pathPrefixes: []string{"/data/", "/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: true,
},
{
name: "combined filters - event fails",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"update", "delete"},
pathPrefixes: []string{"/data/", "/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
{
name: "combined filters - path fails",
cfg: &config{
endpoint: "https://example.com/webhook",
timeoutSeconds: 10,
maxRetries: 1,
backoffSeconds: 1,
maxBackoffSeconds: 1,
nWorkers: 1,
bufferSize: 10,
eventTypes: []string{"create", "delete"},
pathPrefixes: []string{"/logs/"},
},
key: "/data/file.txt",
notification: &filer_pb.EventNotification{
NewEntry: &filer_pb.Entry{Name: "file.txt"},
},
shouldPublish: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
shouldPublish := newFilter(tt.cfg).shouldPublish(tt.key, tt.notification)
if shouldPublish != tt.shouldPublish {
t.Errorf("Expected shouldPublish=%v, got %v", tt.shouldPublish, shouldPublish)
}
})
}
}