1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-20 02:00:23 +02:00
seaweedfs/test/kafka/cmd/setup/main.go
chrislu 2809081bd4 fix: correct Kafka Gateway health check to use TCP dial instead of HTTP
- Changed waitForService to separate waitForHTTPService and waitForTCPService functions
- Kafka Gateway health check now uses TCP dial against port 9093 instead of HTTP GET
- HTTP GET against Kafka protocol port was causing setup failures with connection refused
- Added proper timeout handling for TCP dial (2 seconds per attempt)

This fixes the critical issue where test setup would fail when trying to verify
Kafka Gateway availability using the wrong protocol.
2025-09-16 00:22:46 -07:00

171 lines
4.2 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"time"
)
// Schema represents a schema registry schema
type Schema struct {
Subject string `json:"subject"`
Version int `json:"version"`
Schema string `json:"schema"`
}
// SchemaResponse represents the response from schema registry
type SchemaResponse struct {
ID int `json:"id"`
}
func main() {
log.Println("Setting up Kafka integration test environment...")
kafkaBootstrap := getEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092")
schemaRegistryURL := getEnv("SCHEMA_REGISTRY_URL", "http://schema-registry:8081")
kafkaGatewayURL := getEnv("KAFKA_GATEWAY_URL", "kafka-gateway:9093")
log.Printf("Kafka Bootstrap Servers: %s", kafkaBootstrap)
log.Printf("Schema Registry URL: %s", schemaRegistryURL)
log.Printf("Kafka Gateway URL: %s", kafkaGatewayURL)
// Wait for services to be ready
waitForHTTPService("Schema Registry", schemaRegistryURL+"/subjects")
waitForTCPService("Kafka Gateway", kafkaGatewayURL) // TCP connectivity check for Kafka protocol
// Register test schemas
if err := registerSchemas(schemaRegistryURL); err != nil {
log.Fatalf("Failed to register schemas: %v", err)
}
log.Println("Test environment setup completed successfully!")
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
func waitForHTTPService(name, url string) {
log.Printf("Waiting for %s to be ready...", name)
for i := 0; i < 60; i++ { // Wait up to 60 seconds
resp, err := http.Get(url)
if err == nil && resp.StatusCode < 400 {
resp.Body.Close()
log.Printf("%s is ready", name)
return
}
if resp != nil {
resp.Body.Close()
}
time.Sleep(1 * time.Second)
}
log.Fatalf("%s is not ready after 60 seconds", name)
}
func waitForTCPService(name, address string) {
log.Printf("Waiting for %s to be ready...", name)
for i := 0; i < 60; i++ { // Wait up to 60 seconds
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
if err == nil {
conn.Close()
log.Printf("%s is ready", name)
return
}
time.Sleep(1 * time.Second)
}
log.Fatalf("%s is not ready after 60 seconds", name)
}
func registerSchemas(registryURL string) error {
schemas := []Schema{
{
Subject: "user-value",
Schema: `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}`,
},
{
Subject: "user-event-value",
Schema: `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"},
{"name": "eventType", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "data", "type": ["null", "string"], "default": null}
]
}`,
},
{
Subject: "log-entry-value",
Schema: `{
"type": "record",
"name": "LogEntry",
"fields": [
{"name": "level", "type": "string"},
{"name": "message", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "service", "type": "string"},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}`,
},
}
for _, schema := range schemas {
if err := registerSchema(registryURL, schema); err != nil {
return fmt.Errorf("failed to register schema %s: %w", schema.Subject, err)
}
log.Printf("Registered schema: %s", schema.Subject)
}
return nil
}
func registerSchema(registryURL string, schema Schema) error {
url := fmt.Sprintf("%s/subjects/%s/versions", registryURL, schema.Subject)
payload := map[string]interface{}{
"schema": schema.Schema,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := http.Post(url, "application/vnd.schemaregistry.v1+json", bytes.NewBuffer(jsonData))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var response SchemaResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return err
}
log.Printf("Schema %s registered with ID: %d", schema.Subject, response.ID)
return nil
}