mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
CreateTopics Protocol Compliance completed: ## Implementation - Implement handleCreateTopicsV0V1() with proper v0/v1 request parsing - Support regular array/string format (not compact) for v0/v1 - Parse topic name, partitions, replication factor, assignments, configs - Handle timeout_ms and validate_only fields correctly - Maintain existing v2+ compact format support - Wire to SeaweedMQ handler for actual topic creation ## Key Features - Full v0-v5 CreateTopics API version support - Proper error handling (TOPIC_ALREADY_EXISTS, INVALID_PARTITIONS, etc.) - Partition count validation and enforcement - Compatible with existing SeaweedMQ topic management ## Tests - Comprehensive unit tests for v0/v1/v2+ parsing - Error condition testing (duplicate topics, invalid partitions) - Multi-topic creation support - Integration tests across all API versions - Performance benchmarks for CreateTopics operations ## Verification - All protocol tests pass (v0-v5 CreateTopics) - E2E Sarama tests continue to work - Real topics created with specified partition counts - Proper error responses for edge cases Ready for Phase 3: ApiVersions matrix accuracy
469 lines
13 KiB
Go
469 lines
13 KiB
Go
package protocol
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"testing"
|
|
)
|
|
|
|
func TestCreateTopicsV0_BasicParsing(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Build a CreateTopics v0 request
|
|
request := make([]byte, 0, 256)
|
|
|
|
// Topics array count (1 topic)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01)
|
|
|
|
// Topic 1: "test-topic"
|
|
topicName := "test-topic"
|
|
request = append(request, 0x00, byte(len(topicName))) // Topic name length
|
|
request = append(request, []byte(topicName)...) // Topic name
|
|
|
|
// num_partitions = 3
|
|
request = append(request, 0x00, 0x00, 0x00, 0x03)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// assignments array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// configs array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// timeout_ms = 5000
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88)
|
|
|
|
// Call handler
|
|
response, err := handler.handleCreateTopicsV0V1(12345, request)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
if len(response) < 10 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Check correlation ID
|
|
correlationID := binary.BigEndian.Uint32(response[0:4])
|
|
if correlationID != 12345 {
|
|
t.Errorf("Expected correlation ID 12345, got %d", correlationID)
|
|
}
|
|
|
|
// Check topics array count
|
|
topicsCount := binary.BigEndian.Uint32(response[4:8])
|
|
if topicsCount != 1 {
|
|
t.Errorf("Expected 1 topic in response, got %d", topicsCount)
|
|
}
|
|
|
|
// Verify topic was actually created
|
|
if !handler.seaweedMQHandler.TopicExists("test-topic") {
|
|
t.Error("Topic 'test-topic' was not created")
|
|
}
|
|
}
|
|
|
|
func TestCreateTopicsV0_TopicAlreadyExists(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Pre-create the topic
|
|
err := handler.seaweedMQHandler.CreateTopic("existing-topic", 2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to pre-create topic: %v", err)
|
|
}
|
|
|
|
// Build request for the same topic
|
|
request := make([]byte, 0, 256)
|
|
|
|
// Topics array count (1 topic)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01)
|
|
|
|
// Topic 1: "existing-topic"
|
|
topicName := "existing-topic"
|
|
request = append(request, 0x00, byte(len(topicName))) // Topic name length
|
|
request = append(request, []byte(topicName)...) // Topic name
|
|
|
|
// num_partitions = 1
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// assignments array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// configs array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// timeout_ms = 5000
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88)
|
|
|
|
// Call handler
|
|
response, err := handler.handleCreateTopicsV0V1(12346, request)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// Parse response to check error code
|
|
if len(response) < 12 {
|
|
t.Fatalf("Response too short for error code check: %d bytes", len(response))
|
|
}
|
|
|
|
// Skip correlation ID (4 bytes) + topics count (4 bytes) + topic name length (2 bytes) + topic name
|
|
offset := 4 + 4 + 2 + len(topicName)
|
|
if len(response) >= offset+2 {
|
|
errorCode := binary.BigEndian.Uint16(response[offset : offset+2])
|
|
if errorCode != 36 { // TOPIC_ALREADY_EXISTS
|
|
t.Errorf("Expected error code 36 (TOPIC_ALREADY_EXISTS), got %d", errorCode)
|
|
}
|
|
} else {
|
|
t.Error("Response too short to contain error code")
|
|
}
|
|
}
|
|
|
|
func TestCreateTopicsV0_InvalidPartitions(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Build request with invalid partition count (0)
|
|
request := make([]byte, 0, 256)
|
|
|
|
// Topics array count (1 topic)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01)
|
|
|
|
// Topic 1: "invalid-topic"
|
|
topicName := "invalid-topic"
|
|
request = append(request, 0x00, byte(len(topicName))) // Topic name length
|
|
request = append(request, []byte(topicName)...) // Topic name
|
|
|
|
// num_partitions = 0 (invalid)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// assignments array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// configs array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// timeout_ms = 5000
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88)
|
|
|
|
// Call handler
|
|
response, err := handler.handleCreateTopicsV0V1(12347, request)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
// Parse response to check error code
|
|
if len(response) < 12 {
|
|
t.Fatalf("Response too short for error code check: %d bytes", len(response))
|
|
}
|
|
|
|
// Skip correlation ID (4 bytes) + topics count (4 bytes) + topic name length (2 bytes) + topic name
|
|
offset := 4 + 4 + 2 + len(topicName)
|
|
if len(response) >= offset+2 {
|
|
errorCode := binary.BigEndian.Uint16(response[offset : offset+2])
|
|
if errorCode != 37 { // INVALID_PARTITIONS
|
|
t.Errorf("Expected error code 37 (INVALID_PARTITIONS), got %d", errorCode)
|
|
}
|
|
} else {
|
|
t.Error("Response too short to contain error code")
|
|
}
|
|
|
|
// Verify topic was not created
|
|
if handler.seaweedMQHandler.TopicExists("invalid-topic") {
|
|
t.Error("Topic with invalid partitions should not have been created")
|
|
}
|
|
}
|
|
|
|
func TestCreateTopicsV2Plus_CompactFormat(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Build a CreateTopics v2 request (compact format)
|
|
request := make([]byte, 0, 256)
|
|
|
|
// Topics array count (compact: count + 1, so 1 topic = 2)
|
|
request = append(request, 0x02)
|
|
|
|
// Topic 1: "compact-topic"
|
|
topicName := "compact-topic"
|
|
request = append(request, byte(len(topicName)+1)) // Compact string length
|
|
request = append(request, []byte(topicName)...) // Topic name
|
|
|
|
// num_partitions = 2
|
|
request = append(request, 0x00, 0x00, 0x00, 0x02)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// configs array (compact: empty = 0)
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields (empty)
|
|
request = append(request, 0x00)
|
|
|
|
// timeout_ms = 10000
|
|
request = append(request, 0x00, 0x00, 0x27, 0x10)
|
|
|
|
// validate_only = false
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields at end
|
|
request = append(request, 0x00)
|
|
|
|
// Call handler
|
|
response, err := handler.handleCreateTopicsV2Plus(12348, 2, request)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
if len(response) < 10 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Check correlation ID
|
|
correlationID := binary.BigEndian.Uint32(response[0:4])
|
|
if correlationID != 12348 {
|
|
t.Errorf("Expected correlation ID 12348, got %d", correlationID)
|
|
}
|
|
|
|
// Verify topic was created
|
|
if !handler.seaweedMQHandler.TopicExists("compact-topic") {
|
|
t.Error("Topic 'compact-topic' was not created")
|
|
}
|
|
}
|
|
|
|
func TestCreateTopicsV2Plus_MultipleTopics(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Build a CreateTopics v2 request with 2 topics
|
|
request := make([]byte, 0, 512)
|
|
|
|
// Topics array count (compact: 2 topics = 3)
|
|
request = append(request, 0x03)
|
|
|
|
// Topic 1: "topic-one"
|
|
topicName1 := "topic-one"
|
|
request = append(request, byte(len(topicName1)+1)) // Compact string length
|
|
request = append(request, []byte(topicName1)...) // Topic name
|
|
|
|
// num_partitions = 1
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// configs array (compact: empty = 0)
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields (empty)
|
|
request = append(request, 0x00)
|
|
|
|
// Topic 2: "topic-two"
|
|
topicName2 := "topic-two"
|
|
request = append(request, byte(len(topicName2)+1)) // Compact string length
|
|
request = append(request, []byte(topicName2)...) // Topic name
|
|
|
|
// num_partitions = 3
|
|
request = append(request, 0x00, 0x00, 0x00, 0x03)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// configs array (compact: empty = 0)
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields (empty)
|
|
request = append(request, 0x00)
|
|
|
|
// timeout_ms = 5000
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88)
|
|
|
|
// validate_only = false
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields at end
|
|
request = append(request, 0x00)
|
|
|
|
// Call handler
|
|
response, err := handler.handleCreateTopicsV2Plus(12349, 2, request)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
}
|
|
|
|
if len(response) < 4 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Verify both topics were created
|
|
if !handler.seaweedMQHandler.TopicExists("topic-one") {
|
|
t.Error("Topic 'topic-one' was not created")
|
|
}
|
|
|
|
if !handler.seaweedMQHandler.TopicExists("topic-two") {
|
|
t.Error("Topic 'topic-two' was not created")
|
|
}
|
|
}
|
|
|
|
// Integration test with actual Kafka-like workflow
|
|
func TestCreateTopics_Integration(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Test version routing
|
|
testCases := []struct {
|
|
name string
|
|
version uint16
|
|
topicName string
|
|
partitions int32
|
|
}{
|
|
{"Version0", 0, "integration-v0-topic", 2},
|
|
{"Version1", 1, "integration-v1-topic", 3},
|
|
{"Version2", 2, "integration-v2-topic", 1},
|
|
{"Version3", 3, "integration-v3-topic", 4},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
var request []byte
|
|
|
|
if tc.version <= 1 {
|
|
// Build v0/v1 format request
|
|
request = make([]byte, 0, 256)
|
|
|
|
// Topics array count (1 topic)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01)
|
|
|
|
// Topic name
|
|
request = append(request, 0x00, byte(len(tc.topicName)))
|
|
request = append(request, []byte(tc.topicName)...)
|
|
|
|
// num_partitions
|
|
partitionBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(partitionBytes, uint32(tc.partitions))
|
|
request = append(request, partitionBytes...)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// assignments array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// configs array (empty)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00)
|
|
|
|
// timeout_ms = 5000
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88)
|
|
} else {
|
|
// Build v2+ format request (compact)
|
|
request = make([]byte, 0, 256)
|
|
|
|
// Topics array count (compact: 1 topic = 2)
|
|
request = append(request, 0x02)
|
|
|
|
// Topic name (compact string)
|
|
request = append(request, byte(len(tc.topicName)+1))
|
|
request = append(request, []byte(tc.topicName)...)
|
|
|
|
// num_partitions
|
|
partitionBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(partitionBytes, uint32(tc.partitions))
|
|
request = append(request, partitionBytes...)
|
|
|
|
// replication_factor = 1
|
|
request = append(request, 0x00, 0x01)
|
|
|
|
// configs array (compact: empty = 0)
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields (empty)
|
|
request = append(request, 0x00)
|
|
|
|
// timeout_ms = 5000
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88)
|
|
|
|
// validate_only = false
|
|
request = append(request, 0x00)
|
|
|
|
// tagged fields at end
|
|
request = append(request, 0x00)
|
|
}
|
|
|
|
// Call the main handler (which routes to version-specific handlers)
|
|
response, err := handler.handleCreateTopics(uint32(1000+tc.version), tc.version, request)
|
|
|
|
if err != nil {
|
|
t.Fatalf("CreateTopics v%d failed: %v", tc.version, err)
|
|
}
|
|
|
|
if len(response) == 0 {
|
|
t.Fatalf("CreateTopics v%d returned empty response", tc.version)
|
|
}
|
|
|
|
// Verify topic was created with correct partition count
|
|
if !handler.seaweedMQHandler.TopicExists(tc.topicName) {
|
|
t.Errorf("Topic '%s' was not created in v%d", tc.topicName, tc.version)
|
|
}
|
|
|
|
// Check partition count (create ledgers on-demand to verify partition setup)
|
|
for partitionID := int32(0); partitionID < tc.partitions; partitionID++ {
|
|
ledger := handler.seaweedMQHandler.GetOrCreateLedger(tc.topicName, partitionID)
|
|
if ledger == nil {
|
|
t.Errorf("Failed to get/create ledger for topic '%s' partition %d", tc.topicName, partitionID)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// Benchmark CreateTopics performance
|
|
func BenchmarkCreateTopicsV0(b *testing.B) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Pre-build request
|
|
request := make([]byte, 0, 256)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01) // 1 topic
|
|
|
|
topicName := "benchmark-topic"
|
|
request = append(request, 0x00, byte(len(topicName)))
|
|
request = append(request, []byte(topicName)...)
|
|
request = append(request, 0x00, 0x00, 0x00, 0x01) // 1 partition
|
|
request = append(request, 0x00, 0x01) // replication factor 1
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00) // empty assignments
|
|
request = append(request, 0x00, 0x00, 0x00, 0x00) // empty configs
|
|
request = append(request, 0x00, 0x00, 0x13, 0x88) // timeout 5000ms
|
|
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
// Create unique topic names to avoid "already exists" errors
|
|
uniqueRequest := make([]byte, len(request))
|
|
copy(uniqueRequest, request)
|
|
|
|
// Modify topic name to make it unique
|
|
topicSuffix := []byte(fmt.Sprintf("-%d", i))
|
|
uniqueRequest = append(uniqueRequest[:10+len(topicName)], topicSuffix...)
|
|
uniqueRequest = append(uniqueRequest, request[10+len(topicName):]...)
|
|
|
|
// Update topic name length
|
|
uniqueRequest[8] = byte(len(topicName) + len(topicSuffix))
|
|
|
|
_, err := handler.handleCreateTopicsV0V1(uint32(i), uniqueRequest)
|
|
if err != nil {
|
|
b.Fatalf("CreateTopics failed on iteration %d: %v", i, err)
|
|
}
|
|
}
|
|
}
|