1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/test/kafka/internal/testutil/gateway.go
2025-09-18 00:18:10 -07:00

212 lines
5.8 KiB
Go

package testutil
import (
"context"
"fmt"
"net"
"os"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
// GatewayTestServer wraps the gateway server with common test utilities
type GatewayTestServer struct {
*gateway.Server
t *testing.T
}
// GatewayOptions contains configuration for test gateway
type GatewayOptions struct {
Listen string
Masters string
UseProduction bool
// Add more options as needed
}
// NewGatewayTestServer creates a new test gateway server with common setup
func NewGatewayTestServer(t *testing.T, opts GatewayOptions) *GatewayTestServer {
if opts.Listen == "" {
opts.Listen = "127.0.0.1:0" // Use random port by default
}
// Allow switching to production gateway if requested (requires masters)
var srv *gateway.Server
if opts.UseProduction {
if opts.Masters == "" {
// Fallback to env variable for convenience in CI
if v := os.Getenv("SEAWEEDFS_MASTERS"); v != "" {
opts.Masters = v
} else {
opts.Masters = "localhost:9333"
}
}
srv = gateway.NewServer(gateway.Options{
Listen: opts.Listen,
Masters: opts.Masters,
})
} else {
// For unit testing without real SeaweedMQ masters
srv = gateway.NewTestServerForUnitTests(gateway.Options{
Listen: opts.Listen,
})
}
return &GatewayTestServer{
Server: srv,
t: t,
}
}
// StartAndWait starts the gateway and waits for it to be ready
func (g *GatewayTestServer) StartAndWait() string {
g.t.Helper()
// Start server in goroutine
go func() {
if err := g.Start(); err != nil {
g.t.Errorf("Failed to start gateway: %v", err)
}
}()
// Wait for server to be ready
time.Sleep(100 * time.Millisecond)
host, port := g.GetListenerAddr()
addr := fmt.Sprintf("%s:%d", host, port)
g.t.Logf("Gateway running on %s", addr)
return addr
}
// AddTestTopic adds a topic for testing with default configuration
func (g *GatewayTestServer) AddTestTopic(name string) {
g.t.Helper()
g.GetHandler().AddTopicForTesting(name, 1)
g.t.Logf("Added test topic: %s", name)
}
// AddTestTopics adds multiple topics for testing
func (g *GatewayTestServer) AddTestTopics(names ...string) {
g.t.Helper()
for _, name := range names {
g.AddTestTopic(name)
}
}
// CleanupAndClose properly closes the gateway server
func (g *GatewayTestServer) CleanupAndClose() {
g.t.Helper()
if err := g.Close(); err != nil {
g.t.Errorf("Failed to close gateway: %v", err)
}
}
// SMQAvailabilityMode indicates whether SeaweedMQ is available for testing
type SMQAvailabilityMode int
const (
SMQUnavailable SMQAvailabilityMode = iota // Use mock handler only
SMQAvailable // SMQ is available, can use production mode
SMQRequired // SMQ is required, skip test if unavailable
)
// CheckSMQAvailability checks if SeaweedFS masters are available for testing
func CheckSMQAvailability() (bool, string) {
masters := os.Getenv("SEAWEEDFS_MASTERS")
if masters == "" {
return false, ""
}
// Test if at least one master is reachable
if masters != "" {
// Try to connect to the first master to verify availability
conn, err := net.DialTimeout("tcp", masters, 2*time.Second)
if err != nil {
return false, masters // Masters specified but unreachable
}
conn.Close()
return true, masters
}
return false, ""
}
// NewGatewayTestServerWithSMQ creates a gateway server that automatically uses SMQ if available
func NewGatewayTestServerWithSMQ(t *testing.T, mode SMQAvailabilityMode) *GatewayTestServer {
smqAvailable, masters := CheckSMQAvailability()
switch mode {
case SMQRequired:
if !smqAvailable {
if masters != "" {
t.Skipf("Skipping test: SEAWEEDFS_MASTERS=%s specified but unreachable", masters)
} else {
t.Skip("Skipping test: SEAWEEDFS_MASTERS required but not set")
}
}
t.Logf("Using SMQ-backed gateway with masters: %s", masters)
return newGatewayTestServerWithTimeout(t, GatewayOptions{
UseProduction: true,
Masters: masters,
}, 120*time.Second)
case SMQAvailable:
if smqAvailable {
t.Logf("SMQ available, using production gateway with masters: %s", masters)
return newGatewayTestServerWithTimeout(t, GatewayOptions{
UseProduction: true,
Masters: masters,
}, 120*time.Second)
} else {
t.Logf("SMQ not available, using mock gateway")
return NewGatewayTestServer(t, GatewayOptions{})
}
default: // SMQUnavailable
t.Logf("Using mock gateway (SMQ integration disabled)")
return NewGatewayTestServer(t, GatewayOptions{})
}
}
// newGatewayTestServerWithTimeout creates a gateway server with a timeout to prevent hanging
func newGatewayTestServerWithTimeout(t *testing.T, opts GatewayOptions, timeout time.Duration) *GatewayTestServer {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
done := make(chan *GatewayTestServer, 1)
errChan := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
errChan <- fmt.Errorf("panic creating gateway: %v", r)
}
}()
// Create the gateway in a goroutine so we can timeout if it hangs
t.Logf("Creating gateway with masters: %s (with %v timeout)", opts.Masters, timeout)
gateway := NewGatewayTestServer(t, opts)
t.Logf("Gateway created successfully")
done <- gateway
}()
select {
case gateway := <-done:
return gateway
case err := <-errChan:
t.Fatalf("Error creating gateway: %v", err)
case <-ctx.Done():
t.Fatalf("Timeout creating gateway after %v - likely SMQ broker discovery failed. Check if MQ brokers are running and accessible.", timeout)
}
return nil // This should never be reached
}
// IsSMQMode returns true if the gateway is using real SMQ backend
// This is determined by checking if we have the SEAWEEDFS_MASTERS environment variable
func (g *GatewayTestServer) IsSMQMode() bool {
available, _ := CheckSMQAvailability()
return available
}