1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-06-29 16:22:46 +02:00
seaweedfs/telemetry/test/integration.go
2025-06-28 20:03:06 -07:00

311 lines
7.9 KiB
Go

package main
import (
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/seaweedfs/seaweedfs/telemetry/proto"
"github.com/seaweedfs/seaweedfs/weed/telemetry"
protobuf "google.golang.org/protobuf/proto"
)
const (
serverPort = "18080" // Use different port to avoid conflicts
serverURL = "http://localhost:" + serverPort
)
func main() {
fmt.Println("🧪 Starting SeaweedFS Telemetry Integration Test")
// Start telemetry server
fmt.Println("📡 Starting telemetry server...")
serverCmd, err := startTelemetryServer()
if err != nil {
log.Fatalf("❌ Failed to start telemetry server: %v", err)
}
defer stopServer(serverCmd)
// Wait for server to start
if !waitForServer(serverURL+"/health", 15*time.Second) {
log.Fatal("❌ Telemetry server failed to start")
}
fmt.Println("✅ Telemetry server started successfully")
// Test protobuf marshaling first
fmt.Println("🔧 Testing protobuf marshaling...")
if err := testProtobufMarshaling(); err != nil {
log.Fatalf("❌ Protobuf marshaling test failed: %v", err)
}
fmt.Println("✅ Protobuf marshaling test passed")
// Test protobuf client
fmt.Println("🔄 Testing protobuf telemetry client...")
if err := testTelemetryClient(); err != nil {
log.Fatalf("❌ Telemetry client test failed: %v", err)
}
fmt.Println("✅ Telemetry client test passed")
// Test server metrics endpoint
fmt.Println("📊 Testing Prometheus metrics endpoint...")
if err := testMetricsEndpoint(); err != nil {
log.Fatalf("❌ Metrics endpoint test failed: %v", err)
}
fmt.Println("✅ Metrics endpoint test passed")
// Test stats API
fmt.Println("📈 Testing stats API...")
if err := testStatsAPI(); err != nil {
log.Fatalf("❌ Stats API test failed: %v", err)
}
fmt.Println("✅ Stats API test passed")
// Test instances API
fmt.Println("📋 Testing instances API...")
if err := testInstancesAPI(); err != nil {
log.Fatalf("❌ Instances API test failed: %v", err)
}
fmt.Println("✅ Instances API test passed")
fmt.Println("🎉 All telemetry integration tests passed!")
}
func startTelemetryServer() (*exec.Cmd, error) {
// Get the directory where this test is running
testDir, err := os.Getwd()
if err != nil {
return nil, fmt.Errorf("failed to get working directory: %v", err)
}
// Navigate to the server directory (from main seaweedfs directory)
serverDir := filepath.Join(testDir, "telemetry", "server")
cmd := exec.Command("go", "run", ".",
"-port="+serverPort,
"-dashboard=false",
"-cleanup=1m",
"-max-age=1h")
cmd.Dir = serverDir
// Create log files for server output
logFile, err := os.Create("telemetry-server-test.log")
if err != nil {
return nil, fmt.Errorf("failed to create log file: %v", err)
}
cmd.Stdout = logFile
cmd.Stderr = logFile
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start server: %v", err)
}
return cmd, nil
}
func stopServer(cmd *exec.Cmd) {
if cmd != nil && cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
cmd.Wait()
// Clean up log file
os.Remove("telemetry-server-test.log")
}
}
func waitForServer(url string, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
fmt.Printf("⏳ Waiting for server at %s...\n", url)
for {
select {
case <-ctx.Done():
return false
default:
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true
}
}
time.Sleep(500 * time.Millisecond)
}
}
}
func testProtobufMarshaling() error {
// Test protobuf marshaling/unmarshaling
testData := &proto.TelemetryData{
ClusterId: "test-cluster-12345",
Version: "test-3.45",
Os: "linux/amd64",
VolumeServerCount: 2,
TotalDiskBytes: 1000000,
TotalVolumeCount: 10,
FilerCount: 1,
BrokerCount: 1,
Timestamp: time.Now().Unix(),
}
// Marshal
data, err := protobuf.Marshal(testData)
if err != nil {
return fmt.Errorf("failed to marshal protobuf: %v", err)
}
fmt.Printf(" Protobuf size: %d bytes\n", len(data))
// Unmarshal
testData2 := &proto.TelemetryData{}
if err := protobuf.Unmarshal(data, testData2); err != nil {
return fmt.Errorf("failed to unmarshal protobuf: %v", err)
}
// Verify data
if testData2.ClusterId != testData.ClusterId {
return fmt.Errorf("protobuf data mismatch: expected %s, got %s",
testData.ClusterId, testData2.ClusterId)
}
if testData2.VolumeServerCount != testData.VolumeServerCount {
return fmt.Errorf("volume server count mismatch: expected %d, got %d",
testData.VolumeServerCount, testData2.VolumeServerCount)
}
return nil
}
func testTelemetryClient() error {
// Create telemetry client
client := telemetry.NewClient(serverURL+"/api/collect", true)
// Create test data using protobuf format
testData := &proto.TelemetryData{
Version: "test-3.45",
Os: "linux/amd64",
VolumeServerCount: 3,
TotalDiskBytes: 1073741824, // 1GB
TotalVolumeCount: 50,
FilerCount: 2,
BrokerCount: 1,
Timestamp: time.Now().Unix(),
}
// Send telemetry data
if err := client.SendTelemetry(testData); err != nil {
return fmt.Errorf("failed to send telemetry: %v", err)
}
fmt.Printf(" Sent telemetry for cluster: %s\n", client.GetInstanceID())
// Wait a bit for processing
time.Sleep(2 * time.Second)
return nil
}
func testMetricsEndpoint() error {
resp, err := http.Get(serverURL + "/metrics")
if err != nil {
return fmt.Errorf("failed to get metrics: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("metrics endpoint returned status %d", resp.StatusCode)
}
// Read response and check for expected metrics
content, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read metrics response: %v", err)
}
contentStr := string(content)
expectedMetrics := []string{
"seaweedfs_telemetry_total_clusters",
"seaweedfs_telemetry_active_clusters",
"seaweedfs_telemetry_reports_received_total",
"seaweedfs_telemetry_volume_servers",
"seaweedfs_telemetry_disk_bytes",
"seaweedfs_telemetry_volume_count",
"seaweedfs_telemetry_filer_count",
"seaweedfs_telemetry_broker_count",
}
for _, metric := range expectedMetrics {
if !strings.Contains(contentStr, metric) {
return fmt.Errorf("missing expected metric: %s", metric)
}
}
// Check that we have at least one report received
if !strings.Contains(contentStr, "seaweedfs_telemetry_reports_received_total 1") {
fmt.Printf(" Warning: Expected at least 1 report received, metrics content:\n%s\n", contentStr)
}
fmt.Printf(" Found %d expected metrics\n", len(expectedMetrics))
return nil
}
func testStatsAPI() error {
resp, err := http.Get(serverURL + "/api/stats")
if err != nil {
return fmt.Errorf("failed to get stats: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("stats API returned status %d", resp.StatusCode)
}
// Read and verify JSON response
content, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read stats response: %v", err)
}
contentStr := string(content)
if !strings.Contains(contentStr, "total_instances") {
return fmt.Errorf("stats response missing total_instances field")
}
fmt.Printf(" Stats response: %s\n", contentStr)
return nil
}
func testInstancesAPI() error {
resp, err := http.Get(serverURL + "/api/instances?limit=10")
if err != nil {
return fmt.Errorf("failed to get instances: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("instances API returned status %d", resp.StatusCode)
}
// Read response
content, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read instances response: %v", err)
}
fmt.Printf(" Instances response length: %d bytes\n", len(content))
return nil
}