mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
212 lines
5.8 KiB
Go
212 lines
5.8 KiB
Go
package testutil
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
|
|
)
|
|
|
|
// GatewayTestServer wraps the gateway server with common test utilities
|
|
type GatewayTestServer struct {
|
|
*gateway.Server
|
|
t *testing.T
|
|
}
|
|
|
|
// GatewayOptions contains configuration for test gateway
|
|
type GatewayOptions struct {
|
|
Listen string
|
|
Masters string
|
|
UseProduction bool
|
|
// Add more options as needed
|
|
}
|
|
|
|
// NewGatewayTestServer creates a new test gateway server with common setup
|
|
func NewGatewayTestServer(t *testing.T, opts GatewayOptions) *GatewayTestServer {
|
|
if opts.Listen == "" {
|
|
opts.Listen = "127.0.0.1:0" // Use random port by default
|
|
}
|
|
|
|
// Allow switching to production gateway if requested (requires masters)
|
|
var srv *gateway.Server
|
|
if opts.UseProduction {
|
|
if opts.Masters == "" {
|
|
// Fallback to env variable for convenience in CI
|
|
if v := os.Getenv("SEAWEEDFS_MASTERS"); v != "" {
|
|
opts.Masters = v
|
|
} else {
|
|
opts.Masters = "localhost:9333"
|
|
}
|
|
}
|
|
srv = gateway.NewServer(gateway.Options{
|
|
Listen: opts.Listen,
|
|
Masters: opts.Masters,
|
|
})
|
|
} else {
|
|
// For unit testing without real SeaweedMQ masters
|
|
srv = gateway.NewTestServerForUnitTests(gateway.Options{
|
|
Listen: opts.Listen,
|
|
})
|
|
}
|
|
|
|
return &GatewayTestServer{
|
|
Server: srv,
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
// StartAndWait starts the gateway and waits for it to be ready
|
|
func (g *GatewayTestServer) StartAndWait() string {
|
|
g.t.Helper()
|
|
|
|
// Start server in goroutine
|
|
go func() {
|
|
if err := g.Start(); err != nil {
|
|
g.t.Errorf("Failed to start gateway: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for server to be ready
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
host, port := g.GetListenerAddr()
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
g.t.Logf("Gateway running on %s", addr)
|
|
|
|
return addr
|
|
}
|
|
|
|
// AddTestTopic adds a topic for testing with default configuration
|
|
func (g *GatewayTestServer) AddTestTopic(name string) {
|
|
g.t.Helper()
|
|
g.GetHandler().AddTopicForTesting(name, 1)
|
|
g.t.Logf("Added test topic: %s", name)
|
|
}
|
|
|
|
// AddTestTopics adds multiple topics for testing
|
|
func (g *GatewayTestServer) AddTestTopics(names ...string) {
|
|
g.t.Helper()
|
|
for _, name := range names {
|
|
g.AddTestTopic(name)
|
|
}
|
|
}
|
|
|
|
// CleanupAndClose properly closes the gateway server
|
|
func (g *GatewayTestServer) CleanupAndClose() {
|
|
g.t.Helper()
|
|
if err := g.Close(); err != nil {
|
|
g.t.Errorf("Failed to close gateway: %v", err)
|
|
}
|
|
}
|
|
|
|
// SMQAvailabilityMode indicates whether SeaweedMQ is available for testing
|
|
type SMQAvailabilityMode int
|
|
|
|
const (
|
|
SMQUnavailable SMQAvailabilityMode = iota // Use mock handler only
|
|
SMQAvailable // SMQ is available, can use production mode
|
|
SMQRequired // SMQ is required, skip test if unavailable
|
|
)
|
|
|
|
// CheckSMQAvailability checks if SeaweedFS masters are available for testing
|
|
func CheckSMQAvailability() (bool, string) {
|
|
masters := os.Getenv("SEAWEEDFS_MASTERS")
|
|
if masters == "" {
|
|
return false, ""
|
|
}
|
|
|
|
// Test if at least one master is reachable
|
|
if masters != "" {
|
|
// Try to connect to the first master to verify availability
|
|
conn, err := net.DialTimeout("tcp", masters, 2*time.Second)
|
|
if err != nil {
|
|
return false, masters // Masters specified but unreachable
|
|
}
|
|
conn.Close()
|
|
return true, masters
|
|
}
|
|
|
|
return false, ""
|
|
}
|
|
|
|
// NewGatewayTestServerWithSMQ creates a gateway server that automatically uses SMQ if available
|
|
func NewGatewayTestServerWithSMQ(t *testing.T, mode SMQAvailabilityMode) *GatewayTestServer {
|
|
smqAvailable, masters := CheckSMQAvailability()
|
|
|
|
switch mode {
|
|
case SMQRequired:
|
|
if !smqAvailable {
|
|
if masters != "" {
|
|
t.Skipf("Skipping test: SEAWEEDFS_MASTERS=%s specified but unreachable", masters)
|
|
} else {
|
|
t.Skip("Skipping test: SEAWEEDFS_MASTERS required but not set")
|
|
}
|
|
}
|
|
t.Logf("Using SMQ-backed gateway with masters: %s", masters)
|
|
return newGatewayTestServerWithTimeout(t, GatewayOptions{
|
|
UseProduction: true,
|
|
Masters: masters,
|
|
}, 120*time.Second)
|
|
|
|
case SMQAvailable:
|
|
if smqAvailable {
|
|
t.Logf("SMQ available, using production gateway with masters: %s", masters)
|
|
return newGatewayTestServerWithTimeout(t, GatewayOptions{
|
|
UseProduction: true,
|
|
Masters: masters,
|
|
}, 120*time.Second)
|
|
} else {
|
|
t.Logf("SMQ not available, using mock gateway")
|
|
return NewGatewayTestServer(t, GatewayOptions{})
|
|
}
|
|
|
|
default: // SMQUnavailable
|
|
t.Logf("Using mock gateway (SMQ integration disabled)")
|
|
return NewGatewayTestServer(t, GatewayOptions{})
|
|
}
|
|
}
|
|
|
|
// newGatewayTestServerWithTimeout creates a gateway server with a timeout to prevent hanging
|
|
func newGatewayTestServerWithTimeout(t *testing.T, opts GatewayOptions, timeout time.Duration) *GatewayTestServer {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
done := make(chan *GatewayTestServer, 1)
|
|
errChan := make(chan error, 1)
|
|
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
errChan <- fmt.Errorf("panic creating gateway: %v", r)
|
|
}
|
|
}()
|
|
|
|
// Create the gateway in a goroutine so we can timeout if it hangs
|
|
t.Logf("Creating gateway with masters: %s (with %v timeout)", opts.Masters, timeout)
|
|
gateway := NewGatewayTestServer(t, opts)
|
|
t.Logf("Gateway created successfully")
|
|
done <- gateway
|
|
}()
|
|
|
|
select {
|
|
case gateway := <-done:
|
|
return gateway
|
|
case err := <-errChan:
|
|
t.Fatalf("Error creating gateway: %v", err)
|
|
case <-ctx.Done():
|
|
t.Fatalf("Timeout creating gateway after %v - likely SMQ broker discovery failed. Check if MQ brokers are running and accessible.", timeout)
|
|
}
|
|
|
|
return nil // This should never be reached
|
|
}
|
|
|
|
// IsSMQMode returns true if the gateway is using real SMQ backend
|
|
// This is determined by checking if we have the SEAWEEDFS_MASTERS environment variable
|
|
func (g *GatewayTestServer) IsSMQMode() bool {
|
|
available, _ := CheckSMQAvailability()
|
|
return available
|
|
}
|