1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/api_versions_test.go
chrislu 2d8b7576ed Fix API version tests after adding DescribeGroups and ListGroups APIs
- Update expected API count from 14 to 16 in all test files
- Add expected version ranges for DescribeGroups (15) and ListGroups (16) APIs
- All protocol tests now pass with the new API additions from Phase 1
2025-09-15 21:03:05 -07:00

313 lines
9 KiB
Go

package protocol
import (
"encoding/binary"
"fmt"
"testing"
)
func TestApiVersions_AdvertisedVersionsMatch(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
response, err := handler.handleApiVersions(12345, 0)
if err != nil {
t.Fatalf("handleApiVersions failed: %v", err)
}
if len(response) < 10 {
t.Fatalf("Response too short: %d bytes", len(response))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(response[0:4])
if correlationID != 12345 {
t.Errorf("Expected correlation ID 12345, got %d", correlationID)
}
// Check error code
errorCode := binary.BigEndian.Uint16(response[4:6])
if errorCode != 0 {
t.Errorf("Expected error code 0, got %d", errorCode)
}
// Check number of API keys
numAPIKeys := binary.BigEndian.Uint32(response[6:10])
expectedAPIKeys := uint32(16)
if numAPIKeys != expectedAPIKeys {
t.Errorf("Expected %d API keys, got %d", expectedAPIKeys, numAPIKeys)
}
// Parse and verify specific API versions
offset := 10
apiVersionMap := make(map[uint16][2]uint16) // apiKey -> [minVersion, maxVersion]
for i := uint32(0); i < numAPIKeys && offset+6 <= len(response); i++ {
apiKey := binary.BigEndian.Uint16(response[offset : offset+2])
minVersion := binary.BigEndian.Uint16(response[offset+2 : offset+4])
maxVersion := binary.BigEndian.Uint16(response[offset+4 : offset+6])
offset += 6
apiVersionMap[apiKey] = [2]uint16{minVersion, maxVersion}
}
// Verify critical corrected versions
expectedVersions := map[uint16][2]uint16{
9: {0, 5}, // OffsetFetch: should now be v0-v5
19: {0, 5}, // CreateTopics: should now be v0-v5
3: {0, 7}, // Metadata: should be v0-v7
18: {0, 3}, // ApiVersions: should be v0-v3
15: {0, 5}, // DescribeGroups: v0-v5
16: {0, 4}, // ListGroups: v0-v4
}
for apiKey, expected := range expectedVersions {
if actual, exists := apiVersionMap[apiKey]; exists {
if actual[0] != expected[0] || actual[1] != expected[1] {
t.Errorf("API %d version mismatch: expected v%d-v%d, got v%d-v%d",
apiKey, expected[0], expected[1], actual[0], actual[1])
}
} else {
t.Errorf("API %d not found in response", apiKey)
}
}
}
func TestValidateAPIVersion_UpdatedVersions(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
testCases := []struct {
name string
apiKey uint16
version uint16
shouldErr bool
}{
// OffsetFetch - should now support up to v5
{"OffsetFetch v2", 9, 2, false},
{"OffsetFetch v3", 9, 3, false}, // Was rejected before, should work now
{"OffsetFetch v4", 9, 4, false}, // Was rejected before, should work now
{"OffsetFetch v5", 9, 5, false}, // Was rejected before, should work now
{"OffsetFetch v6", 9, 6, true}, // Should still be rejected
// CreateTopics - should now support up to v5
{"CreateTopics v4", 19, 4, false},
{"CreateTopics v5", 19, 5, false}, // Was rejected before, should work now
{"CreateTopics v6", 19, 6, true}, // Should be rejected
// Metadata - should still support up to v7
{"Metadata v7", 3, 7, false},
{"Metadata v8", 3, 8, true},
// ApiVersions - should still support up to v3
{"ApiVersions v3", 18, 3, false},
{"ApiVersions v4", 18, 4, true},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := handler.validateAPIVersion(tc.apiKey, tc.version)
if tc.shouldErr && err == nil {
t.Errorf("Expected error for API %d version %d, but got none", tc.apiKey, tc.version)
}
if !tc.shouldErr && err != nil {
t.Errorf("Unexpected error for API %d version %d: %v", tc.apiKey, tc.version, err)
}
})
}
}
func TestOffsetFetch_HigherVersionSupport(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Test that OffsetFetch v3, v4, v5 are now accepted
versions := []uint16{3, 4, 5}
for _, version := range versions {
t.Run(fmt.Sprintf("OffsetFetch_v%d", version), func(t *testing.T) {
// Create a basic OffsetFetch request
requestBody := make([]byte, 0, 64)
// Group ID (string: "test-group")
groupID := "test-group"
requestBody = append(requestBody, 0, byte(len(groupID))) // length
requestBody = append(requestBody, []byte(groupID)...) // group ID
// Topics array (1 topic)
requestBody = append(requestBody, 0, 0, 0, 1) // topics count
// Topic: "test-topic"
topicName := "test-topic"
requestBody = append(requestBody, 0, byte(len(topicName))) // topic name length
requestBody = append(requestBody, []byte(topicName)...) // topic name
// Partitions array (1 partition)
requestBody = append(requestBody, 0, 0, 0, 1) // partitions count
requestBody = append(requestBody, 0, 0, 0, 0) // partition 0
// Call handler with the higher version
response, err := handler.handleOffsetFetch(12345, version, requestBody)
if err != nil {
t.Fatalf("OffsetFetch v%d failed: %v", version, err)
}
if len(response) < 8 {
t.Fatalf("OffsetFetch v%d response too short: %d bytes", version, len(response))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(response[0:4])
if correlationID != 12345 {
t.Errorf("OffsetFetch v%d: expected correlation ID 12345, got %d", version, correlationID)
}
})
}
}
func TestCreateTopics_V5Support(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Test that CreateTopics v5 is now accepted and works
requestBody := make([]byte, 0, 128)
// Build v5 request (compact format)
// Topics array (compact: 1 topic = 2)
requestBody = append(requestBody, 0x02)
// Topic: "v5-test-topic"
topicName := "v5-test-topic"
requestBody = append(requestBody, byte(len(topicName)+1)) // Compact string length
requestBody = append(requestBody, []byte(topicName)...) // Topic name
// num_partitions = 2
requestBody = append(requestBody, 0x00, 0x00, 0x00, 0x02)
// replication_factor = 1
requestBody = append(requestBody, 0x00, 0x01)
// configs array (compact: empty = 0)
requestBody = append(requestBody, 0x00)
// tagged fields (empty)
requestBody = append(requestBody, 0x00)
// timeout_ms = 5000
requestBody = append(requestBody, 0x00, 0x00, 0x13, 0x88)
// validate_only = false
requestBody = append(requestBody, 0x00)
// tagged fields at end
requestBody = append(requestBody, 0x00)
// Call handler with v5
response, err := handler.handleCreateTopics(12346, 5, requestBody)
if err != nil {
t.Fatalf("CreateTopics v5 failed: %v", err)
}
if len(response) < 8 {
t.Fatalf("CreateTopics v5 response too short: %d bytes", len(response))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(response[0:4])
if correlationID != 12346 {
t.Errorf("CreateTopics v5: expected correlation ID 12346, got %d", correlationID)
}
// Verify topic was created
if !handler.seaweedMQHandler.TopicExists("v5-test-topic") {
t.Error("CreateTopics v5: topic was not created")
}
}
// Benchmark to ensure version validation is efficient
func BenchmarkValidateAPIVersion(b *testing.B) {
handler := NewTestHandler()
defer handler.Close()
// Test common API versions
testCases := []struct {
apiKey uint16
version uint16
}{
{9, 3}, // OffsetFetch v3
{9, 5}, // OffsetFetch v5
{19, 5}, // CreateTopics v5
{3, 7}, // Metadata v7
{18, 3}, // ApiVersions v3
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
tc := testCases[i%len(testCases)]
_ = handler.validateAPIVersion(tc.apiKey, tc.version)
}
}
func TestApiVersions_ResponseFormat(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
response, err := handler.handleApiVersions(99999, 0)
if err != nil {
t.Fatalf("handleApiVersions failed: %v", err)
}
// Verify the response can be parsed correctly
if len(response) < 10 {
t.Fatalf("Response too short for basic parsing")
}
offset := 0
// Correlation ID (4 bytes)
correlationID := binary.BigEndian.Uint32(response[offset : offset+4])
if correlationID != 99999 {
t.Errorf("Wrong correlation ID: expected 99999, got %d", correlationID)
}
offset += 4
// Error code (2 bytes)
errorCode := binary.BigEndian.Uint16(response[offset : offset+2])
if errorCode != 0 {
t.Errorf("Non-zero error code: %d", errorCode)
}
offset += 2
// Number of API keys (4 bytes)
numAPIKeys := binary.BigEndian.Uint32(response[offset : offset+4])
if numAPIKeys != 16 {
t.Errorf("Wrong number of API keys: expected 16, got %d", numAPIKeys)
}
offset += 4
// Verify each API key entry format (apiKey + minVer + maxVer)
for i := uint32(0); i < numAPIKeys && offset+6 <= len(response); i++ {
apiKey := binary.BigEndian.Uint16(response[offset : offset+2])
minVersion := binary.BigEndian.Uint16(response[offset+2 : offset+4])
maxVersion := binary.BigEndian.Uint16(response[offset+4 : offset+6])
// Verify minVersion <= maxVersion
if minVersion > maxVersion {
t.Errorf("API %d: invalid version range %d-%d", apiKey, minVersion, maxVersion)
}
// Verify minVersion is typically 0
if minVersion != 0 {
t.Errorf("API %d: unexpected min version %d (expected 0)", apiKey, minVersion)
}
offset += 6
}
if offset != len(response) {
t.Errorf("Response parsing mismatch: expected %d bytes, parsed %d", len(response), offset)
}
}