1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-06-29 16:22:46 +02:00
seaweedfs/test/mq/integration/performance_test.go
2025-06-25 17:42:39 -07:00

666 lines
19 KiB
Go

package integration
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// PerformanceMetrics holds test metrics
type PerformanceMetrics struct {
MessagesPublished int64
MessagesConsumed int64
PublishLatencies []time.Duration
ConsumeLatencies []time.Duration
StartTime time.Time
EndTime time.Time
ErrorCount int64
mu sync.RWMutex
}
func (m *PerformanceMetrics) AddPublishLatency(d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.PublishLatencies = append(m.PublishLatencies, d)
}
func (m *PerformanceMetrics) AddConsumeLatency(d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.ConsumeLatencies = append(m.ConsumeLatencies, d)
}
func (m *PerformanceMetrics) GetThroughput() float64 {
duration := m.EndTime.Sub(m.StartTime).Seconds()
if duration == 0 {
return 0
}
return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration
}
func (m *PerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration {
if len(latencies) == 0 {
return 0
}
// Simple P95 calculation - in production use proper percentile library
index := int(float64(len(latencies)) * 0.95)
if index >= len(latencies) {
index = len(latencies) - 1
}
// Sort latencies (simplified)
for i := 0; i < len(latencies)-1; i++ {
for j := 0; j < len(latencies)-i-1; j++ {
if latencies[j] > latencies[j+1] {
latencies[j], latencies[j+1] = latencies[j+1], latencies[j]
}
}
}
return latencies[index]
}
// Enhanced performance metrics with connection error tracking
type EnhancedPerformanceMetrics struct {
MessagesPublished int64
MessagesConsumed int64
PublishLatencies []time.Duration
ConsumeLatencies []time.Duration
StartTime time.Time
EndTime time.Time
ErrorCount int64
ConnectionErrors int64
ApplicationErrors int64
RetryAttempts int64
mu sync.RWMutex
}
func (m *EnhancedPerformanceMetrics) AddPublishLatency(d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.PublishLatencies = append(m.PublishLatencies, d)
}
func (m *EnhancedPerformanceMetrics) AddConsumeLatency(d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.ConsumeLatencies = append(m.ConsumeLatencies, d)
}
func (m *EnhancedPerformanceMetrics) GetThroughput() float64 {
duration := m.EndTime.Sub(m.StartTime).Seconds()
if duration == 0 {
return 0
}
return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration
}
func (m *EnhancedPerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration {
if len(latencies) == 0 {
return 0
}
// Simple P95 calculation
index := int(float64(len(latencies)) * 0.95)
if index >= len(latencies) {
index = len(latencies) - 1
}
// Sort latencies (simplified bubble sort for small datasets)
sorted := make([]time.Duration, len(latencies))
copy(sorted, latencies)
for i := 0; i < len(sorted)-1; i++ {
for j := 0; j < len(sorted)-i-1; j++ {
if sorted[j] > sorted[j+1] {
sorted[j], sorted[j+1] = sorted[j+1], sorted[j]
}
}
}
return sorted[index]
}
// isConnectionError determines if an error is a connection-level error
func isConnectionError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// Check for gRPC status codes
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown:
return true
}
}
// Check for common connection error strings
connectionErrorPatterns := []string{
"EOF",
"error reading server preface",
"connection refused",
"connection reset",
"broken pipe",
"network is unreachable",
"no route to host",
"transport is closing",
"connection error",
"dial tcp",
"context deadline exceeded",
}
for _, pattern := range connectionErrorPatterns {
if containsString(errStr, pattern) {
return true
}
}
return false
}
func containsString(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}
func TestPerformanceThroughput(t *testing.T) {
suite := NewIntegrationTestSuite(t)
require.NoError(t, suite.Setup())
topicName := "performance-throughput-test"
namespace := "perf-test"
metrics := &PerformanceMetrics{
StartTime: time.Now(),
}
// Test parameters
numMessages := 50000
numPublishers := 5
messageSize := 1024 // 1KB messages
// Create message payload
payload := make([]byte, messageSize)
for i := range payload {
payload[i] = byte(i % 256)
}
t.Logf("Starting throughput test: %d messages, %d publishers, %d bytes per message",
numMessages, numPublishers, messageSize)
// Start publishers
var publishWg sync.WaitGroup
messagesPerPublisher := numMessages / numPublishers
for i := 0; i < numPublishers; i++ {
publishWg.Add(1)
go func(publisherID int) {
defer publishWg.Done()
// Create publisher for this goroutine
pubConfig := &PublisherTestConfig{
Namespace: namespace,
TopicName: topicName,
PartitionCount: 4, // Multiple partitions for better throughput
PublisherName: fmt.Sprintf("perf-publisher-%d", publisherID),
RecordType: nil, // Use raw publish
}
publisher, err := suite.CreatePublisher(pubConfig)
if err != nil {
atomic.AddInt64(&metrics.ErrorCount, 1)
t.Errorf("Failed to create publisher %d: %v", publisherID, err)
return
}
for j := 0; j < messagesPerPublisher; j++ {
messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j)
start := time.Now()
err := publisher.Publish([]byte(messageKey), payload)
latency := time.Since(start)
if err != nil {
atomic.AddInt64(&metrics.ErrorCount, 1)
continue
}
atomic.AddInt64(&metrics.MessagesPublished, 1)
metrics.AddPublishLatency(latency)
// Small delay to prevent overwhelming the system
if j%1000 == 0 {
time.Sleep(1 * time.Millisecond)
}
}
}(i)
}
// Wait for publishing to complete
publishWg.Wait()
metrics.EndTime = time.Now()
// Verify results
publishedCount := atomic.LoadInt64(&metrics.MessagesPublished)
errorCount := atomic.LoadInt64(&metrics.ErrorCount)
throughput := metrics.GetThroughput()
t.Logf("Performance Results:")
t.Logf(" Messages Published: %d", publishedCount)
t.Logf(" Errors: %d", errorCount)
t.Logf(" Throughput: %.2f messages/second", throughput)
t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime))
if len(metrics.PublishLatencies) > 0 {
p95Latency := metrics.GetP95Latency(metrics.PublishLatencies)
t.Logf(" P95 Publish Latency: %v", p95Latency)
// Performance assertions
assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms")
}
// Throughput requirements
expectedMinThroughput := 5000.0 // 5K messages/sec minimum (relaxed from 10K for initial testing)
assert.Greater(t, throughput, expectedMinThroughput,
"Throughput should exceed %.0f messages/second", expectedMinThroughput)
// Error rate should be low
errorRate := float64(errorCount) / float64(publishedCount+errorCount)
assert.Less(t, errorRate, 0.05, "Error rate should be less than 5%")
}
func TestPerformanceLatency(t *testing.T) {
suite := NewIntegrationTestSuite(t)
require.NoError(t, suite.Setup())
topicName := "performance-latency-test"
namespace := "perf-test"
// Create publisher
pubConfig := &PublisherTestConfig{
Namespace: namespace,
TopicName: topicName,
PartitionCount: 1, // Single partition for latency testing
PublisherName: "latency-publisher",
RecordType: nil,
}
publisher, err := suite.CreatePublisher(pubConfig)
require.NoError(t, err)
// Start consumer first
subConfig := &SubscriberTestConfig{
Namespace: namespace,
TopicName: topicName,
ConsumerGroup: "latency-test-group",
ConsumerInstanceId: "latency-consumer-1",
MaxPartitionCount: 1,
SlidingWindowSize: 10,
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
}
subscriber, err := suite.CreateSubscriber(subConfig)
require.NoError(t, err)
numMessages := 5000
collector := NewMessageCollector(numMessages)
// Set up message handler
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
collector.AddMessage(TestMessage{
ID: string(m.Data.Key),
Content: m.Data.Value,
Timestamp: time.Unix(0, m.Data.TsNs),
Key: m.Data.Key,
})
})
// Start subscriber
go func() {
err := subscriber.Subscribe()
if err != nil {
t.Logf("Subscriber error: %v", err)
}
}()
// Wait for consumer to be ready
time.Sleep(2 * time.Second)
metrics := &PerformanceMetrics{
StartTime: time.Now(),
}
t.Logf("Starting latency test with %d messages", numMessages)
// Publish messages with controlled timing
for i := 0; i < numMessages; i++ {
messageKey := fmt.Sprintf("latency-msg-%d", i)
payload := fmt.Sprintf("test-payload-data-%d", i)
start := time.Now()
err := publisher.Publish([]byte(messageKey), []byte(payload))
publishLatency := time.Since(start)
require.NoError(t, err)
metrics.AddPublishLatency(publishLatency)
// Controlled rate for latency measurement
if i%100 == 0 {
time.Sleep(10 * time.Millisecond)
}
}
metrics.EndTime = time.Now()
// Wait for messages to be consumed
messages := collector.WaitForMessages(30 * time.Second)
// Analyze latency results
t.Logf("Latency Test Results:")
t.Logf(" Messages Published: %d", numMessages)
t.Logf(" Messages Consumed: %d", len(messages))
if len(metrics.PublishLatencies) > 0 {
p95PublishLatency := metrics.GetP95Latency(metrics.PublishLatencies)
t.Logf(" P95 Publish Latency: %v", p95PublishLatency)
// Latency assertions
assert.Less(t, p95PublishLatency, 50*time.Millisecond,
"P95 publish latency should be under 50ms")
}
// Verify message delivery
deliveryRate := float64(len(messages)) / float64(numMessages)
assert.Greater(t, deliveryRate, 0.80, "Should deliver at least 80% of messages")
}
func TestPerformanceConcurrentConsumers(t *testing.T) {
suite := NewIntegrationTestSuite(t)
require.NoError(t, suite.Setup())
topicName := "performance-concurrent-test"
namespace := "perf-test"
numPartitions := int32(4)
// Create publisher first
pubConfig := &PublisherTestConfig{
Namespace: namespace,
TopicName: topicName,
PartitionCount: numPartitions,
PublisherName: "concurrent-publisher",
RecordType: nil,
}
publisher, err := suite.CreatePublisher(pubConfig)
require.NoError(t, err)
// Test parameters
numConsumers := 8
numMessages := 20000
consumerGroup := "concurrent-perf-group"
t.Logf("Starting concurrent consumer test: %d consumers, %d messages, %d partitions",
numConsumers, numMessages, numPartitions)
// Start multiple consumers
var collectors []*MessageCollector
for i := 0; i < numConsumers; i++ {
collector := NewMessageCollector(numMessages / numConsumers) // Expected per consumer
collectors = append(collectors, collector)
subConfig := &SubscriberTestConfig{
Namespace: namespace,
TopicName: topicName,
ConsumerGroup: consumerGroup,
ConsumerInstanceId: fmt.Sprintf("consumer-%d", i),
MaxPartitionCount: numPartitions,
SlidingWindowSize: 10,
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
}
subscriber, err := suite.CreateSubscriber(subConfig)
require.NoError(t, err)
// Set up message handler for this consumer
func(consumerID int, c *MessageCollector) {
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
c.AddMessage(TestMessage{
ID: fmt.Sprintf("%d-%s", consumerID, string(m.Data.Key)),
Content: m.Data.Value,
Timestamp: time.Unix(0, m.Data.TsNs),
Key: m.Data.Key,
})
})
}(i, collector)
// Start subscriber
go func(consumerID int, s *SubscriberTestConfig) {
sub, _ := suite.CreateSubscriber(s)
err := sub.Subscribe()
if err != nil {
t.Logf("Consumer %d error: %v", consumerID, err)
}
}(i, subConfig)
}
// Wait for consumers to initialize
time.Sleep(3 * time.Second)
// Start publishing
startTime := time.Now()
var publishWg sync.WaitGroup
numPublishers := 3
messagesPerPublisher := numMessages / numPublishers
for i := 0; i < numPublishers; i++ {
publishWg.Add(1)
go func(publisherID int) {
defer publishWg.Done()
for j := 0; j < messagesPerPublisher; j++ {
messageKey := fmt.Sprintf("concurrent-msg-%d-%d", publisherID, j)
payload := fmt.Sprintf("publisher-%d-message-%d-data", publisherID, j)
err := publisher.Publish([]byte(messageKey), []byte(payload))
if err != nil {
t.Logf("Publish error: %v", err)
}
// Rate limiting
if j%1000 == 0 {
time.Sleep(2 * time.Millisecond)
}
}
}(i)
}
publishWg.Wait()
publishDuration := time.Since(startTime)
// Allow time for message consumption
time.Sleep(10 * time.Second)
// Analyze results
totalConsumed := int64(0)
for i, collector := range collectors {
messages := collector.GetMessages()
consumed := int64(len(messages))
totalConsumed += consumed
t.Logf("Consumer %d consumed %d messages", i, consumed)
}
publishThroughput := float64(numMessages) / publishDuration.Seconds()
consumeThroughput := float64(totalConsumed) / publishDuration.Seconds()
t.Logf("Concurrent Consumer Test Results:")
t.Logf(" Total Published: %d", numMessages)
t.Logf(" Total Consumed: %d", totalConsumed)
t.Logf(" Publish Throughput: %.2f msg/sec", publishThroughput)
t.Logf(" Consume Throughput: %.2f msg/sec", consumeThroughput)
t.Logf(" Test Duration: %v", publishDuration)
// Performance assertions (relaxed for initial testing)
deliveryRate := float64(totalConsumed) / float64(numMessages)
assert.Greater(t, deliveryRate, 0.70, "Should consume at least 70% of messages")
expectedMinThroughput := 2000.0 // 2K messages/sec minimum for concurrent consumption
assert.Greater(t, consumeThroughput, expectedMinThroughput,
"Consume throughput should exceed %.0f messages/second", expectedMinThroughput)
}
func TestPerformanceWithErrorHandling(t *testing.T) {
suite := NewIntegrationTestSuite(t)
require.NoError(t, suite.Setup())
topicName := "performance-error-handling-test"
namespace := "perf-test"
metrics := &EnhancedPerformanceMetrics{
StartTime: time.Now(),
}
// Test parameters
numMessages := 50000
numPublishers := 5
messageSize := 1024 // 1KB messages
// Create message payload
payload := make([]byte, messageSize)
for i := range payload {
payload[i] = byte(i % 256)
}
t.Logf("Starting performance test with enhanced error handling: %d messages, %d publishers, %d bytes per message",
numMessages, numPublishers, messageSize)
// Start publishers
var publishWg sync.WaitGroup
messagesPerPublisher := numMessages / numPublishers
for i := 0; i < numPublishers; i++ {
publishWg.Add(1)
go func(publisherID int) {
defer publishWg.Done()
// Create publisher for this goroutine
pubConfig := &PublisherTestConfig{
Namespace: namespace,
TopicName: topicName,
PartitionCount: 4, // Multiple partitions for better throughput
PublisherName: fmt.Sprintf("error-aware-publisher-%d", publisherID),
RecordType: nil, // Use raw publish
}
publisher, err := suite.CreatePublisher(pubConfig)
if err != nil {
atomic.AddInt64(&metrics.ErrorCount, 1)
t.Errorf("Failed to create publisher %d: %v", publisherID, err)
return
}
for j := 0; j < messagesPerPublisher; j++ {
messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j)
start := time.Now()
err := publisher.Publish([]byte(messageKey), payload)
latency := time.Since(start)
if err != nil {
atomic.AddInt64(&metrics.ErrorCount, 1)
// Classify the error type
if isConnectionError(err) {
atomic.AddInt64(&metrics.ConnectionErrors, 1)
t.Logf("Connection error (publisher %d, msg %d): %v", publisherID, j, err)
} else {
atomic.AddInt64(&metrics.ApplicationErrors, 1)
t.Logf("Application error (publisher %d, msg %d): %v", publisherID, j, err)
}
continue
}
atomic.AddInt64(&metrics.MessagesPublished, 1)
metrics.AddPublishLatency(latency)
// Small delay to prevent overwhelming the system
if j%1000 == 0 {
time.Sleep(1 * time.Millisecond)
}
}
}(i)
}
// Wait for publishing to complete
publishWg.Wait()
metrics.EndTime = time.Now()
// Analyze results with enhanced error reporting
publishedCount := atomic.LoadInt64(&metrics.MessagesPublished)
totalErrors := atomic.LoadInt64(&metrics.ErrorCount)
connectionErrors := atomic.LoadInt64(&metrics.ConnectionErrors)
applicationErrors := atomic.LoadInt64(&metrics.ApplicationErrors)
throughput := metrics.GetThroughput()
t.Logf("Enhanced Performance Results:")
t.Logf(" Messages Successfully Published: %d", publishedCount)
t.Logf(" Total Errors: %d", totalErrors)
t.Logf(" Connection-Level Errors: %d", connectionErrors)
t.Logf(" Application-Level Errors: %d", applicationErrors)
t.Logf(" Throughput: %.2f messages/second", throughput)
t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime))
if len(metrics.PublishLatencies) > 0 {
p95Latency := metrics.GetP95Latency(metrics.PublishLatencies)
t.Logf(" P95 Publish Latency: %v", p95Latency)
// Performance assertions (adjusted for error handling overhead)
assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms")
}
// Enhanced error analysis
totalAttempts := publishedCount + totalErrors
if totalAttempts > 0 {
successRate := float64(publishedCount) / float64(totalAttempts)
connectionErrorRate := float64(connectionErrors) / float64(totalAttempts)
applicationErrorRate := float64(applicationErrors) / float64(totalAttempts)
t.Logf("Error Analysis:")
t.Logf(" Success Rate: %.2f%%", successRate*100)
t.Logf(" Connection Error Rate: %.2f%%", connectionErrorRate*100)
t.Logf(" Application Error Rate: %.2f%%", applicationErrorRate*100)
// Assertions based on error types
assert.Greater(t, successRate, 0.80, "Success rate should be greater than 80%")
assert.Less(t, applicationErrorRate, 0.01, "Application error rate should be less than 1%")
// Connection errors are expected under high load but should be handled
if connectionErrors > 0 {
t.Logf("Note: %d connection errors detected - this indicates the test is successfully stressing the system", connectionErrors)
t.Logf("Recommendation: Implement retry logic for production applications to handle these connection errors")
}
}
// Throughput requirements (adjusted for error handling)
expectedMinThroughput := 5000.0 // 5K messages/sec minimum
assert.Greater(t, throughput, expectedMinThroughput,
"Throughput should exceed %.0f messages/second", expectedMinThroughput)
}