1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/metrics_test.go
chrislu 721d1a46d8 Phase 1 Complete: Client compatibility and metrics
- Add comprehensive client compatibility tests for Sarama and kafka-go
- Test multiple Kafka client library versions (2.6.0, 2.8.0, 3.0.0, 3.4.0)
- Test API version negotiation, cross-client compatibility, consumer groups
- Implement complete metrics system with request/error/latency tracking
- Add connection metrics and concurrent-safe atomic operations
- Integrate metrics into main protocol handler with request timing
- Add comprehensive test coverage for all metrics functionality

Phase 1 COMPLETE: Data plane robustness, admin visibility, client compatibility, and observability all implemented with full test coverage.
2025-09-15 20:45:06 -07:00

259 lines
6.8 KiB
Go

package protocol
import (
"testing"
"time"
)
func TestMetrics_BasicOperations(t *testing.T) {
metrics := NewMetrics()
// Test recording requests
metrics.RecordRequest(18, 10*time.Millisecond) // ApiVersions
metrics.RecordRequest(18, 20*time.Millisecond)
metrics.RecordRequest(3, 5*time.Millisecond) // Metadata
// Test recording errors
metrics.RecordError(18, 30*time.Millisecond)
// Test recording connections
metrics.RecordConnection()
metrics.RecordConnection()
metrics.RecordDisconnection()
// Get API metrics
apiVersionsMetrics := metrics.GetAPIMetrics(18)
if apiVersionsMetrics.RequestCount != 3 {
t.Errorf("Expected 3 requests for ApiVersions, got %d", apiVersionsMetrics.RequestCount)
}
if apiVersionsMetrics.ErrorCount != 1 {
t.Errorf("Expected 1 error for ApiVersions, got %d", apiVersionsMetrics.ErrorCount)
}
// Expected average latency: (10.0 + 20.0 + 30.0) / 3.0 = 20ms
if apiVersionsMetrics.AvgLatencyMs < 19.0 || apiVersionsMetrics.AvgLatencyMs > 21.0 {
t.Errorf("Expected average latency around 20ms, got %.2f", apiVersionsMetrics.AvgLatencyMs)
}
metadataMetrics := metrics.GetAPIMetrics(3)
if metadataMetrics.RequestCount != 1 {
t.Errorf("Expected 1 request for Metadata, got %d", metadataMetrics.RequestCount)
}
if metadataMetrics.ErrorCount != 0 {
t.Errorf("Expected 0 errors for Metadata, got %d", metadataMetrics.ErrorCount)
}
// Test connection metrics
connMetrics := metrics.GetConnectionMetrics()
if connMetrics.ActiveConnections != 1 {
t.Errorf("Expected 1 active connection, got %d", connMetrics.ActiveConnections)
}
if connMetrics.TotalConnections != 2 {
t.Errorf("Expected 2 total connections, got %d", connMetrics.TotalConnections)
}
}
func TestMetrics_Snapshot(t *testing.T) {
metrics := NewMetrics()
// Record some test data
metrics.RecordRequest(18, 15*time.Millisecond)
metrics.RecordRequest(3, 25*time.Millisecond)
metrics.RecordError(1, 50*time.Millisecond)
metrics.RecordConnection()
snapshot := metrics.GetSnapshot()
// Verify snapshot structure
if len(snapshot.APIs) == 0 {
t.Error("Expected APIs in snapshot")
}
if snapshot.Connections.TotalConnections != 1 {
t.Errorf("Expected 1 total connection in snapshot, got %d", snapshot.Connections.TotalConnections)
}
if snapshot.Timestamp.IsZero() {
t.Error("Expected non-zero timestamp in snapshot")
}
// Find ApiVersions in snapshot
var apiVersionsFound bool
for _, api := range snapshot.APIs {
if api.APIKey == 18 {
apiVersionsFound = true
if api.APIName != "ApiVersions" {
t.Errorf("Expected API name 'ApiVersions', got '%s'", api.APIName)
}
if api.RequestCount != 1 {
t.Errorf("Expected 1 request, got %d", api.RequestCount)
}
break
}
}
if !apiVersionsFound {
t.Error("ApiVersions not found in snapshot")
}
}
func TestMetrics_ConcurrentAccess(t *testing.T) {
metrics := NewMetrics()
// Test concurrent access
done := make(chan bool, 10)
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 100; j++ {
metrics.RecordRequest(18, time.Millisecond)
metrics.RecordConnection()
metrics.RecordDisconnection()
}
done <- true
}()
}
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
apiMetrics := metrics.GetAPIMetrics(18)
if apiMetrics.RequestCount != 1000 {
t.Errorf("Expected 1000 requests, got %d", apiMetrics.RequestCount)
}
connMetrics := metrics.GetConnectionMetrics()
if connMetrics.ActiveConnections != 0 {
t.Errorf("Expected 0 active connections, got %d", connMetrics.ActiveConnections)
}
if connMetrics.TotalConnections != 1000 {
t.Errorf("Expected 1000 total connections, got %d", connMetrics.TotalConnections)
}
}
func TestMetrics_Reset(t *testing.T) {
metrics := NewMetrics()
// Record some data
metrics.RecordRequest(18, 10*time.Millisecond)
metrics.RecordError(3, 20*time.Millisecond)
metrics.RecordConnection()
// Verify data exists
apiMetrics := metrics.GetAPIMetrics(18)
if apiMetrics.RequestCount == 0 {
t.Error("Expected non-zero request count before reset")
}
// Reset metrics
metrics.Reset()
// Verify data is cleared
apiMetrics = metrics.GetAPIMetrics(18)
if apiMetrics.RequestCount != 0 {
t.Errorf("Expected 0 requests after reset, got %d", apiMetrics.RequestCount)
}
connMetrics := metrics.GetConnectionMetrics()
if connMetrics.ActiveConnections != 0 {
t.Errorf("Expected 0 active connections after reset, got %d", connMetrics.ActiveConnections)
}
if connMetrics.TotalConnections != 0 {
t.Errorf("Expected 0 total connections after reset, got %d", connMetrics.TotalConnections)
}
}
func TestMetrics_GlobalInstance(t *testing.T) {
// Reset global metrics for clean test
globalMetrics.Reset()
// Test global convenience functions
RecordRequestMetrics(18, 5*time.Millisecond)
RecordErrorMetrics(3, 10*time.Millisecond)
RecordConnectionMetrics()
RecordDisconnectionMetrics()
// Verify through global instance
global := GetGlobalMetrics()
apiMetrics := global.GetAPIMetrics(18)
if apiMetrics.RequestCount != 1 {
t.Errorf("Expected 1 request in global metrics, got %d", apiMetrics.RequestCount)
}
errorMetrics := global.GetAPIMetrics(3)
if errorMetrics.ErrorCount != 1 {
t.Errorf("Expected 1 error in global metrics, got %d", errorMetrics.ErrorCount)
}
connMetrics := global.GetConnectionMetrics()
if connMetrics.ActiveConnections != 0 {
t.Errorf("Expected 0 active connections, got %d", connMetrics.ActiveConnections)
}
if connMetrics.TotalConnections != 1 {
t.Errorf("Expected 1 total connection, got %d", connMetrics.TotalConnections)
}
}
func TestMetrics_LatencyCalculation(t *testing.T) {
metrics := NewMetrics()
// Test various latencies
latencies := []time.Duration{
1 * time.Millisecond,
5 * time.Millisecond,
10 * time.Millisecond,
20 * time.Millisecond,
}
for _, latency := range latencies {
metrics.RecordRequest(18, latency)
}
apiMetrics := metrics.GetAPIMetrics(18)
// Expected average: (1.0 + 5.0 + 10.0 + 20.0) / 4.0 = 9ms
if apiMetrics.AvgLatencyMs < 8.5 || apiMetrics.AvgLatencyMs > 9.5 {
t.Errorf("Expected average latency around 9ms, got %.2f", apiMetrics.AvgLatencyMs)
}
if apiMetrics.RequestCount != 4 {
t.Errorf("Expected 4 requests, got %d", apiMetrics.RequestCount)
}
}
func TestMetrics_APINames(t *testing.T) {
metrics := NewMetrics()
// Test various API keys
testCases := []struct {
apiKey uint16
expected string
}{
{18, "ApiVersions"},
{3, "Metadata"},
{0, "Produce"},
{1, "Fetch"},
{15, "DescribeGroups"},
{16, "ListGroups"},
{999, "Unknown"},
}
for _, tc := range testCases {
metrics.RecordRequest(tc.apiKey, time.Millisecond)
apiMetrics := metrics.GetAPIMetrics(tc.apiKey)
if apiMetrics.APIName != tc.expected {
t.Errorf("Expected API name '%s' for key %d, got '%s'",
tc.expected, tc.apiKey, apiMetrics.APIName)
}
}
}