1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/metrics.go
chrislu 721d1a46d8 Phase 1 Complete: Client compatibility and metrics
- Add comprehensive client compatibility tests for Sarama and kafka-go
- Test multiple Kafka client library versions (2.6.0, 2.8.0, 3.0.0, 3.4.0)
- Test API version negotiation, cross-client compatibility, consumer groups
- Implement complete metrics system with request/error/latency tracking
- Add connection metrics and concurrent-safe atomic operations
- Integrate metrics into main protocol handler with request timing
- Add comprehensive test coverage for all metrics functionality

Phase 1 COMPLETE: Data plane robustness, admin visibility, client compatibility, and observability all implemented with full test coverage.
2025-09-15 20:45:06 -07:00

233 lines
6.6 KiB
Go

package protocol
import (
"sync"
"sync/atomic"
"time"
)
// Metrics tracks basic request/error/latency statistics for Kafka protocol operations
type Metrics struct {
// Request counters by API key
requestCounts map[uint16]*int64
errorCounts map[uint16]*int64
// Latency tracking
latencySum map[uint16]*int64 // Total latency in microseconds
latencyCount map[uint16]*int64 // Number of requests for average calculation
// Connection metrics
activeConnections int64
totalConnections int64
// Mutex for map operations
mu sync.RWMutex
// Start time for uptime calculation
startTime time.Time
}
// APIMetrics represents metrics for a specific API
type APIMetrics struct {
APIKey uint16 `json:"api_key"`
APIName string `json:"api_name"`
RequestCount int64 `json:"request_count"`
ErrorCount int64 `json:"error_count"`
AvgLatencyMs float64 `json:"avg_latency_ms"`
}
// ConnectionMetrics represents connection-related metrics
type ConnectionMetrics struct {
ActiveConnections int64 `json:"active_connections"`
TotalConnections int64 `json:"total_connections"`
UptimeSeconds int64 `json:"uptime_seconds"`
StartTime time.Time `json:"start_time"`
}
// MetricsSnapshot represents a complete metrics snapshot
type MetricsSnapshot struct {
APIs []APIMetrics `json:"apis"`
Connections ConnectionMetrics `json:"connections"`
Timestamp time.Time `json:"timestamp"`
}
// NewMetrics creates a new metrics tracker
func NewMetrics() *Metrics {
return &Metrics{
requestCounts: make(map[uint16]*int64),
errorCounts: make(map[uint16]*int64),
latencySum: make(map[uint16]*int64),
latencyCount: make(map[uint16]*int64),
startTime: time.Now(),
}
}
// RecordRequest records a successful request with latency
func (m *Metrics) RecordRequest(apiKey uint16, latency time.Duration) {
m.ensureCounters(apiKey)
atomic.AddInt64(m.requestCounts[apiKey], 1)
atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds())
atomic.AddInt64(m.latencyCount[apiKey], 1)
}
// RecordError records an error for a specific API
func (m *Metrics) RecordError(apiKey uint16, latency time.Duration) {
m.ensureCounters(apiKey)
atomic.AddInt64(m.requestCounts[apiKey], 1)
atomic.AddInt64(m.errorCounts[apiKey], 1)
atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds())
atomic.AddInt64(m.latencyCount[apiKey], 1)
}
// RecordConnection records a new connection
func (m *Metrics) RecordConnection() {
atomic.AddInt64(&m.activeConnections, 1)
atomic.AddInt64(&m.totalConnections, 1)
}
// RecordDisconnection records a connection closure
func (m *Metrics) RecordDisconnection() {
atomic.AddInt64(&m.activeConnections, -1)
}
// GetSnapshot returns a complete metrics snapshot
func (m *Metrics) GetSnapshot() MetricsSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
apis := make([]APIMetrics, 0, len(m.requestCounts))
for apiKey, requestCount := range m.requestCounts {
requests := atomic.LoadInt64(requestCount)
errors := atomic.LoadInt64(m.errorCounts[apiKey])
latencySum := atomic.LoadInt64(m.latencySum[apiKey])
latencyCount := atomic.LoadInt64(m.latencyCount[apiKey])
var avgLatencyMs float64
if latencyCount > 0 {
avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0 // Convert to milliseconds
}
apis = append(apis, APIMetrics{
APIKey: apiKey,
APIName: getAPIName(apiKey),
RequestCount: requests,
ErrorCount: errors,
AvgLatencyMs: avgLatencyMs,
})
}
return MetricsSnapshot{
APIs: apis,
Connections: ConnectionMetrics{
ActiveConnections: atomic.LoadInt64(&m.activeConnections),
TotalConnections: atomic.LoadInt64(&m.totalConnections),
UptimeSeconds: int64(time.Since(m.startTime).Seconds()),
StartTime: m.startTime,
},
Timestamp: time.Now(),
}
}
// GetAPIMetrics returns metrics for a specific API
func (m *Metrics) GetAPIMetrics(apiKey uint16) APIMetrics {
m.ensureCounters(apiKey)
requests := atomic.LoadInt64(m.requestCounts[apiKey])
errors := atomic.LoadInt64(m.errorCounts[apiKey])
latencySum := atomic.LoadInt64(m.latencySum[apiKey])
latencyCount := atomic.LoadInt64(m.latencyCount[apiKey])
var avgLatencyMs float64
if latencyCount > 0 {
avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0
}
return APIMetrics{
APIKey: apiKey,
APIName: getAPIName(apiKey),
RequestCount: requests,
ErrorCount: errors,
AvgLatencyMs: avgLatencyMs,
}
}
// GetConnectionMetrics returns connection-related metrics
func (m *Metrics) GetConnectionMetrics() ConnectionMetrics {
return ConnectionMetrics{
ActiveConnections: atomic.LoadInt64(&m.activeConnections),
TotalConnections: atomic.LoadInt64(&m.totalConnections),
UptimeSeconds: int64(time.Since(m.startTime).Seconds()),
StartTime: m.startTime,
}
}
// Reset resets all metrics (useful for testing)
func (m *Metrics) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
for apiKey := range m.requestCounts {
atomic.StoreInt64(m.requestCounts[apiKey], 0)
atomic.StoreInt64(m.errorCounts[apiKey], 0)
atomic.StoreInt64(m.latencySum[apiKey], 0)
atomic.StoreInt64(m.latencyCount[apiKey], 0)
}
atomic.StoreInt64(&m.activeConnections, 0)
atomic.StoreInt64(&m.totalConnections, 0)
m.startTime = time.Now()
}
// ensureCounters ensures that counters exist for the given API key
func (m *Metrics) ensureCounters(apiKey uint16) {
m.mu.RLock()
if _, exists := m.requestCounts[apiKey]; exists {
m.mu.RUnlock()
return
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
if _, exists := m.requestCounts[apiKey]; exists {
return
}
m.requestCounts[apiKey] = new(int64)
m.errorCounts[apiKey] = new(int64)
m.latencySum[apiKey] = new(int64)
m.latencyCount[apiKey] = new(int64)
}
// Global metrics instance
var globalMetrics = NewMetrics()
// GetGlobalMetrics returns the global metrics instance
func GetGlobalMetrics() *Metrics {
return globalMetrics
}
// RecordRequestMetrics is a convenience function to record request metrics globally
func RecordRequestMetrics(apiKey uint16, latency time.Duration) {
globalMetrics.RecordRequest(apiKey, latency)
}
// RecordErrorMetrics is a convenience function to record error metrics globally
func RecordErrorMetrics(apiKey uint16, latency time.Duration) {
globalMetrics.RecordError(apiKey, latency)
}
// RecordConnectionMetrics is a convenience function to record connection metrics globally
func RecordConnectionMetrics() {
globalMetrics.RecordConnection()
}
// RecordDisconnectionMetrics is a convenience function to record disconnection metrics globally
func RecordDisconnectionMetrics() {
globalMetrics.RecordDisconnection()
}