mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-20 02:00:23 +02:00
- Changed waitForService to separate waitForHTTPService and waitForTCPService functions - Kafka Gateway health check now uses TCP dial against port 9093 instead of HTTP GET - HTTP GET against Kafka protocol port was causing setup failures with connection refused - Added proper timeout handling for TCP dial (2 seconds per attempt) This fixes the critical issue where test setup would fail when trying to verify Kafka Gateway availability using the wrong protocol.
171 lines
4.2 KiB
Go
171 lines
4.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
// Schema represents a schema registry schema
|
|
type Schema struct {
|
|
Subject string `json:"subject"`
|
|
Version int `json:"version"`
|
|
Schema string `json:"schema"`
|
|
}
|
|
|
|
// SchemaResponse represents the response from schema registry
|
|
type SchemaResponse struct {
|
|
ID int `json:"id"`
|
|
}
|
|
|
|
func main() {
|
|
log.Println("Setting up Kafka integration test environment...")
|
|
|
|
kafkaBootstrap := getEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092")
|
|
schemaRegistryURL := getEnv("SCHEMA_REGISTRY_URL", "http://schema-registry:8081")
|
|
kafkaGatewayURL := getEnv("KAFKA_GATEWAY_URL", "kafka-gateway:9093")
|
|
|
|
log.Printf("Kafka Bootstrap Servers: %s", kafkaBootstrap)
|
|
log.Printf("Schema Registry URL: %s", schemaRegistryURL)
|
|
log.Printf("Kafka Gateway URL: %s", kafkaGatewayURL)
|
|
|
|
// Wait for services to be ready
|
|
waitForHTTPService("Schema Registry", schemaRegistryURL+"/subjects")
|
|
waitForTCPService("Kafka Gateway", kafkaGatewayURL) // TCP connectivity check for Kafka protocol
|
|
|
|
// Register test schemas
|
|
if err := registerSchemas(schemaRegistryURL); err != nil {
|
|
log.Fatalf("Failed to register schemas: %v", err)
|
|
}
|
|
|
|
log.Println("Test environment setup completed successfully!")
|
|
}
|
|
|
|
func getEnv(key, defaultValue string) string {
|
|
if value := os.Getenv(key); value != "" {
|
|
return value
|
|
}
|
|
return defaultValue
|
|
}
|
|
|
|
func waitForHTTPService(name, url string) {
|
|
log.Printf("Waiting for %s to be ready...", name)
|
|
for i := 0; i < 60; i++ { // Wait up to 60 seconds
|
|
resp, err := http.Get(url)
|
|
if err == nil && resp.StatusCode < 400 {
|
|
resp.Body.Close()
|
|
log.Printf("%s is ready", name)
|
|
return
|
|
}
|
|
if resp != nil {
|
|
resp.Body.Close()
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
log.Fatalf("%s is not ready after 60 seconds", name)
|
|
}
|
|
|
|
func waitForTCPService(name, address string) {
|
|
log.Printf("Waiting for %s to be ready...", name)
|
|
for i := 0; i < 60; i++ { // Wait up to 60 seconds
|
|
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
|
|
if err == nil {
|
|
conn.Close()
|
|
log.Printf("%s is ready", name)
|
|
return
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
log.Fatalf("%s is not ready after 60 seconds", name)
|
|
}
|
|
|
|
func registerSchemas(registryURL string) error {
|
|
schemas := []Schema{
|
|
{
|
|
Subject: "user-value",
|
|
Schema: `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": ["null", "string"], "default": null}
|
|
]
|
|
}`,
|
|
},
|
|
{
|
|
Subject: "user-event-value",
|
|
Schema: `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"},
|
|
{"name": "eventType", "type": "string"},
|
|
{"name": "timestamp", "type": "long"},
|
|
{"name": "data", "type": ["null", "string"], "default": null}
|
|
]
|
|
}`,
|
|
},
|
|
{
|
|
Subject: "log-entry-value",
|
|
Schema: `{
|
|
"type": "record",
|
|
"name": "LogEntry",
|
|
"fields": [
|
|
{"name": "level", "type": "string"},
|
|
{"name": "message", "type": "string"},
|
|
{"name": "timestamp", "type": "long"},
|
|
{"name": "service", "type": "string"},
|
|
{"name": "metadata", "type": {"type": "map", "values": "string"}}
|
|
]
|
|
}`,
|
|
},
|
|
}
|
|
|
|
for _, schema := range schemas {
|
|
if err := registerSchema(registryURL, schema); err != nil {
|
|
return fmt.Errorf("failed to register schema %s: %w", schema.Subject, err)
|
|
}
|
|
log.Printf("Registered schema: %s", schema.Subject)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func registerSchema(registryURL string, schema Schema) error {
|
|
url := fmt.Sprintf("%s/subjects/%s/versions", registryURL, schema.Subject)
|
|
|
|
payload := map[string]interface{}{
|
|
"schema": schema.Schema,
|
|
}
|
|
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp, err := http.Post(url, "application/vnd.schemaregistry.v1+json", bytes.NewBuffer(jsonData))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode >= 400 {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var response SchemaResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("Schema %s registered with ID: %d", schema.Subject, response.ID)
|
|
return nil
|
|
}
|