mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
249 lines
7.4 KiB
Go
249 lines
7.4 KiB
Go
package protocol
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
|
|
"github.com/linkedin/goavro/v2"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestProduceHandler_SchemaIntegration tests the Produce handler with schema integration
|
|
func TestProduceHandler_SchemaIntegration(t *testing.T) {
|
|
// Create mock schema registry
|
|
registry := createProduceTestRegistry(t)
|
|
defer registry.Close()
|
|
|
|
// Create handler with schema management
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Enable schema management
|
|
err := handler.EnableSchemaManagement(schema.ManagerConfig{
|
|
RegistryURL: registry.URL,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// For this test, don't enable broker integration to avoid connection issues
|
|
// We're testing schema processing, not broker connectivity
|
|
|
|
t.Run("Schematized Message Processing", func(t *testing.T) {
|
|
schemaID := int32(1)
|
|
schemaJSON := `{
|
|
"type": "record",
|
|
"name": "TestMessage",
|
|
"fields": [
|
|
{"name": "id", "type": "string"},
|
|
{"name": "message", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
// Register schema
|
|
registerProduceTestSchema(t, registry, schemaID, schemaJSON)
|
|
|
|
// Create test data
|
|
testData := map[string]interface{}{
|
|
"id": "test-123",
|
|
"message": "Hello Schema World",
|
|
}
|
|
|
|
// Encode with Avro
|
|
codec, err := goavro.NewCodec(schemaJSON)
|
|
require.NoError(t, err)
|
|
avroBinary, err := codec.BinaryFromNative(nil, testData)
|
|
require.NoError(t, err)
|
|
|
|
// Create Confluent envelope
|
|
envelope := createProduceTestEnvelope(schemaID, avroBinary)
|
|
|
|
// Test schema processing (without broker integration)
|
|
err = handler.processSchematizedMessage("test-topic", 0, envelope)
|
|
require.NoError(t, err)
|
|
|
|
// Verify handler state (schema enabled but no broker integration for this test)
|
|
assert.True(t, handler.IsSchemaEnabled())
|
|
assert.False(t, handler.IsBrokerIntegrationEnabled())
|
|
})
|
|
|
|
t.Run("Non-Schematized Message Processing", func(t *testing.T) {
|
|
// Test with raw message
|
|
rawMessage := []byte("This is not schematized")
|
|
|
|
// Should not fail, just skip schema processing
|
|
err := handler.processSchematizedMessage("test-topic", 0, rawMessage)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("Schema Validation", func(t *testing.T) {
|
|
schemaID := int32(2)
|
|
schemaJSON := `{
|
|
"type": "record",
|
|
"name": "ValidationTest",
|
|
"fields": [
|
|
{"name": "value", "type": "int"}
|
|
]
|
|
}`
|
|
|
|
registerProduceTestSchema(t, registry, schemaID, schemaJSON)
|
|
|
|
// Create valid test data
|
|
testData := map[string]interface{}{
|
|
"value": int32(42),
|
|
}
|
|
|
|
codec, err := goavro.NewCodec(schemaJSON)
|
|
require.NoError(t, err)
|
|
avroBinary, err := codec.BinaryFromNative(nil, testData)
|
|
require.NoError(t, err)
|
|
|
|
envelope := createProduceTestEnvelope(schemaID, avroBinary)
|
|
|
|
// Test schema compatibility validation
|
|
err = handler.validateSchemaCompatibility("validation-topic", envelope)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("Error Handling", func(t *testing.T) {
|
|
// Test with invalid schema ID
|
|
invalidEnvelope := createProduceTestEnvelope(999, []byte("invalid"))
|
|
|
|
err := handler.processSchematizedMessage("error-topic", 0, invalidEnvelope)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "schema decoding failed")
|
|
})
|
|
}
|
|
|
|
// TestProduceHandler_BrokerIntegration tests broker integration functionality
|
|
func TestProduceHandler_BrokerIntegration(t *testing.T) {
|
|
registry := createProduceTestRegistry(t)
|
|
defer registry.Close()
|
|
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
t.Run("Enable Broker Integration", func(t *testing.T) {
|
|
// Should fail without schema management
|
|
err := handler.EnableBrokerIntegration([]string{"localhost:17777"})
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "schema management must be enabled")
|
|
|
|
// Enable schema management first
|
|
err = handler.EnableSchemaManagement(schema.ManagerConfig{
|
|
RegistryURL: registry.URL,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Now broker integration should work (but may fail in tests due to missing broker)
|
|
err = handler.EnableBrokerIntegration([]string{"localhost:17777"})
|
|
require.NoError(t, err)
|
|
|
|
assert.True(t, handler.IsBrokerIntegrationEnabled())
|
|
})
|
|
|
|
t.Run("Disable Schema Management", func(t *testing.T) {
|
|
// Enable both
|
|
err := handler.EnableSchemaManagement(schema.ManagerConfig{
|
|
RegistryURL: registry.URL,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
err = handler.EnableBrokerIntegration([]string{"localhost:17777"})
|
|
require.NoError(t, err)
|
|
|
|
// Disable should clean up both
|
|
handler.DisableSchemaManagement()
|
|
|
|
assert.False(t, handler.IsSchemaEnabled())
|
|
assert.False(t, handler.IsBrokerIntegrationEnabled())
|
|
})
|
|
}
|
|
|
|
// TestProduceHandler_MessageExtraction tests message extraction from record sets
|
|
func TestProduceHandler_MessageExtraction(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
t.Run("Extract Messages From Record Set", func(t *testing.T) {
|
|
// Create a mock record set (arbitrary data)
|
|
recordSet := []byte("mock-record-set-data-with-sufficient-length-for-testing")
|
|
|
|
messages, err := handler.extractMessagesFromRecordSet(recordSet)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, 1, len(messages))
|
|
assert.Equal(t, recordSet, messages[0])
|
|
})
|
|
|
|
t.Run("Extract Messages Error Handling", func(t *testing.T) {
|
|
// Too short record set
|
|
shortRecordSet := []byte("short")
|
|
|
|
_, err := handler.extractMessagesFromRecordSet(shortRecordSet)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "record set too small")
|
|
})
|
|
}
|
|
|
|
// Helper functions for produce schema tests
|
|
|
|
func createProduceTestRegistry(t *testing.T) *httptest.Server {
|
|
schemas := make(map[int32]string)
|
|
|
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
switch r.URL.Path {
|
|
case "/subjects":
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("[]"))
|
|
default:
|
|
// Handle schema requests
|
|
var schemaID int32
|
|
if n, err := fmt.Sscanf(r.URL.Path, "/schemas/ids/%d", &schemaID); n == 1 && err == nil {
|
|
if schema, exists := schemas[schemaID]; exists {
|
|
response := fmt.Sprintf(`{"schema": %q}`, schema)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte(response))
|
|
} else {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
w.Write([]byte(`{"error_code": 40403, "message": "Schema not found"}`))
|
|
}
|
|
} else if r.Method == "POST" && r.URL.Path == "/register-schema" {
|
|
var req struct {
|
|
SchemaID int32 `json:"schema_id"`
|
|
Schema string `json:"schema"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err == nil {
|
|
schemas[req.SchemaID] = req.Schema
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte(`{"success": true}`))
|
|
} else {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
}
|
|
} else {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
}
|
|
}
|
|
}))
|
|
}
|
|
|
|
func registerProduceTestSchema(t *testing.T, registry *httptest.Server, schemaID int32, schema string) {
|
|
reqBody := fmt.Sprintf(`{"schema_id": %d, "schema": %q}`, schemaID, schema)
|
|
resp, err := http.Post(registry.URL+"/register-schema", "application/json", bytes.NewReader([]byte(reqBody)))
|
|
require.NoError(t, err)
|
|
defer resp.Body.Close()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
}
|
|
|
|
func createProduceTestEnvelope(schemaID int32, data []byte) []byte {
|
|
envelope := make([]byte, 5+len(data))
|
|
envelope[0] = 0x00 // Magic byte
|
|
binary.BigEndian.PutUint32(envelope[1:5], uint32(schemaID))
|
|
copy(envelope[5:], data)
|
|
return envelope
|
|
}
|