mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
Phase E2: Integrate Protobuf descriptor parser with decoder - Update NewProtobufDecoder to use ProtobufDescriptorParser - Add findFirstMessageName helper for automatic message detection - Fix ParseBinaryDescriptor to return schema even on resolution failure - Add comprehensive tests for protobuf decoder integration - Improve error handling and caching behavior This enables proper binary descriptor parsing in the protobuf decoder, completing the integration between descriptor parsing and decoding. Phase E3: Complete Protobuf message descriptor resolution - Implement full protobuf descriptor resolution using protoreflect API - Add buildFileDescriptor and findMessageInFileDescriptor methods - Support nested message resolution with findNestedMessageDescriptor - Add proper mutex protection for thread-safe cache access - Update all test data to use proper field cardinality labels - Update test expectations to handle successful descriptor resolution - Enable full protobuf decoder creation from binary descriptors Phase E (Protobuf Support) is now complete: ✅ E1: Binary descriptor parsing ✅ E2: Decoder integration ✅ E3: Full message descriptor resolution Protobuf messages can now be fully parsed and decoded Phase F: Implement Kafka record batch compression support - Add comprehensive compression module supporting gzip/snappy/lz4/zstd - Implement RecordBatchParser with full compression and CRC validation - Support compression codec extraction from record batch attributes - Add compression/decompression for all major Kafka codecs - Integrate compression support into Produce and Fetch handlers - Add extensive unit tests for all compression codecs - Support round-trip compression/decompression with proper error handling - Add performance benchmarks for compression operations Key features: ✅ Gzip compression (ratio: 0.02) ✅ Snappy compression (ratio: 0.06, fastest) ✅ LZ4 compression (ratio: 0.02) ✅ Zstd compression (ratio: 0.01, best compression) ✅ CRC32 validation for record batch integrity ✅ Proper Kafka record batch format v2 parsing ✅ Backward compatibility with uncompressed records Phase F (Compression Handling) is now complete. Phase G: Implement advanced schema compatibility checking and migration - Add comprehensive SchemaEvolutionChecker with full compatibility rules - Support BACKWARD, FORWARD, FULL, and NONE compatibility levels - Implement Avro schema compatibility checking with field analysis - Add JSON Schema compatibility validation - Support Protobuf compatibility checking (simplified implementation) - Add type promotion rules (int->long, float->double, string<->bytes) - Integrate schema evolution into Manager with validation methods - Add schema evolution suggestions and migration guidance - Support schema compatibility validation before evolution - Add comprehensive unit tests for all compatibility scenarios Key features: ✅ BACKWARD compatibility: New schema can read old data ✅ FORWARD compatibility: Old schema can read new data ✅ FULL compatibility: Both backward and forward compatible ✅ Type promotion support for safe schema evolution ✅ Field addition/removal validation with default value checks ✅ Schema evolution suggestions for incompatible changes ✅ Integration with schema registry for validation workflows Phase G (Schema Evolution) is now complete. fmt
344 lines
8.5 KiB
Go
344 lines
8.5 KiB
Go
package schema
|
|
|
|
import (
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestManager_SchemaEvolution tests schema evolution integration in the manager
|
|
func TestManager_SchemaEvolution(t *testing.T) {
|
|
// Create a manager without registry (for testing evolution logic only)
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Compatible Avro evolution", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
assert.Empty(t, result.Issues)
|
|
})
|
|
|
|
t.Run("Incompatible Avro evolution", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.False(t, result.Compatible)
|
|
assert.NotEmpty(t, result.Issues)
|
|
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
|
|
})
|
|
|
|
t.Run("Schema evolution suggestions", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
suggestions, err := manager.SuggestSchemaEvolution(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.NotEmpty(t, suggestions)
|
|
|
|
// Should suggest adding default values
|
|
found := false
|
|
for _, suggestion := range suggestions {
|
|
if strings.Contains(suggestion, "default") {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
assert.True(t, found, "Should suggest adding default values, got: %v", suggestions)
|
|
})
|
|
|
|
t.Run("JSON Schema evolution", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "integer"},
|
|
"name": {"type": "string"}
|
|
},
|
|
"required": ["id", "name"]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "integer"},
|
|
"name": {"type": "string"},
|
|
"email": {"type": "string"}
|
|
},
|
|
"required": ["id", "name"]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatJSONSchema, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
|
|
t.Run("Full compatibility check", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityFull)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
|
|
t.Run("Type promotion compatibility", func(t *testing.T) {
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "score", "type": "int"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "score", "type": "long"}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
}
|
|
|
|
// TestManager_CompatibilityLevels tests compatibility level management
|
|
func TestManager_CompatibilityLevels(t *testing.T) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Get default compatibility level", func(t *testing.T) {
|
|
level := manager.GetCompatibilityLevel("test-subject")
|
|
assert.Equal(t, CompatibilityBackward, level)
|
|
})
|
|
|
|
t.Run("Set compatibility level", func(t *testing.T) {
|
|
err := manager.SetCompatibilityLevel("test-subject", CompatibilityFull)
|
|
assert.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
// TestManager_CanEvolveSchema tests the CanEvolveSchema method
|
|
func TestManager_CanEvolveSchema(t *testing.T) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Compatible evolution", func(t *testing.T) {
|
|
currentSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
})
|
|
|
|
t.Run("Incompatible evolution", func(t *testing.T) {
|
|
currentSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.False(t, result.Compatible)
|
|
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
|
|
})
|
|
}
|
|
|
|
// TestManager_SchemaEvolutionWorkflow tests a complete schema evolution workflow
|
|
func TestManager_SchemaEvolutionWorkflow(t *testing.T) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
t.Run("Complete evolution workflow", func(t *testing.T) {
|
|
// Step 1: Define initial schema
|
|
initialSchema := `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"},
|
|
{"name": "action", "type": "string"}
|
|
]
|
|
}`
|
|
|
|
// Step 2: Propose schema evolution (compatible)
|
|
evolvedSchema := `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"},
|
|
{"name": "action", "type": "string"},
|
|
{"name": "timestamp", "type": "long", "default": 0}
|
|
]
|
|
}`
|
|
|
|
// Check compatibility explicitly
|
|
result, err := manager.CanEvolveSchema("user-events", initialSchema, evolvedSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.True(t, result.Compatible)
|
|
|
|
// Step 3: Try incompatible evolution
|
|
incompatibleSchema := `{
|
|
"type": "record",
|
|
"name": "UserEvent",
|
|
"fields": [
|
|
{"name": "userId", "type": "int"}
|
|
]
|
|
}`
|
|
|
|
result, err = manager.CanEvolveSchema("user-events", initialSchema, incompatibleSchema, FormatAvro)
|
|
require.NoError(t, err)
|
|
assert.False(t, result.Compatible)
|
|
assert.Contains(t, result.Issues[0], "Field 'action' was removed")
|
|
|
|
// Step 4: Get suggestions for incompatible evolution
|
|
suggestions, err := manager.SuggestSchemaEvolution(initialSchema, incompatibleSchema, FormatAvro, CompatibilityBackward)
|
|
require.NoError(t, err)
|
|
assert.NotEmpty(t, suggestions)
|
|
})
|
|
}
|
|
|
|
// BenchmarkSchemaEvolution benchmarks schema evolution operations
|
|
func BenchmarkSchemaEvolution(b *testing.B) {
|
|
manager := &Manager{
|
|
evolutionChecker: NewSchemaEvolutionChecker(),
|
|
}
|
|
|
|
oldSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""}
|
|
]
|
|
}`
|
|
|
|
newSchema := `{
|
|
"type": "record",
|
|
"name": "User",
|
|
"fields": [
|
|
{"name": "id", "type": "int"},
|
|
{"name": "name", "type": "string"},
|
|
{"name": "email", "type": "string", "default": ""},
|
|
{"name": "age", "type": "int", "default": 0}
|
|
]
|
|
}`
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
}
|