1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/schema/registry_client_test.go
chrislu c688bd1806 Phase 2: Add Schema Registry HTTP client with caching
- Implement RegistryClient with full REST API support
- Add LRU caching for schemas and subjects with configurable TTL
- Support schema registration, compatibility checking, and listing
- Include automatic format detection (Avro/Protobuf/JSON Schema)
- Add health check and cache management functionality
- Comprehensive test coverage with mock HTTP server

This provides the foundation for schema resolution and validation.
2025-09-11 11:25:09 -07:00

362 lines
9.6 KiB
Go

package schema
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestNewRegistryClient(t *testing.T) {
config := RegistryConfig{
URL: "http://localhost:8081",
}
client := NewRegistryClient(config)
if client.baseURL != config.URL {
t.Errorf("Expected baseURL %s, got %s", config.URL, client.baseURL)
}
if client.cacheTTL != 5*time.Minute {
t.Errorf("Expected default cacheTTL 5m, got %v", client.cacheTTL)
}
if client.httpClient.Timeout != 30*time.Second {
t.Errorf("Expected default timeout 30s, got %v", client.httpClient.Timeout)
}
}
func TestRegistryClient_GetSchemaByID(t *testing.T) {
// Mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/schemas/ids/1" {
response := map[string]interface{}{
"schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`,
"subject": "user-value",
"version": 1,
}
json.NewEncoder(w).Encode(response)
} else if r.URL.Path == "/schemas/ids/999" {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`{"error_code":40403,"message":"Schema not found"}`))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
config := RegistryConfig{
URL: server.URL,
CacheTTL: 1 * time.Minute,
}
client := NewRegistryClient(config)
t.Run("successful fetch", func(t *testing.T) {
schema, err := client.GetSchemaByID(1)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if schema.ID != 1 {
t.Errorf("Expected schema ID 1, got %d", schema.ID)
}
if schema.Subject != "user-value" {
t.Errorf("Expected subject 'user-value', got %s", schema.Subject)
}
if schema.Format != FormatAvro {
t.Errorf("Expected Avro format, got %v", schema.Format)
}
})
t.Run("schema not found", func(t *testing.T) {
_, err := client.GetSchemaByID(999)
if err == nil {
t.Fatal("Expected error for non-existent schema")
}
})
t.Run("cache hit", func(t *testing.T) {
// First call should cache the result
schema1, err := client.GetSchemaByID(1)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
// Second call should hit cache (same timestamp)
schema2, err := client.GetSchemaByID(1)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if schema1.CachedAt != schema2.CachedAt {
t.Error("Expected cache hit with same timestamp")
}
})
}
func TestRegistryClient_GetLatestSchema(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/subjects/user-value/versions/latest" {
response := map[string]interface{}{
"id": uint32(1),
"schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`,
"subject": "user-value",
"version": 1,
}
json.NewEncoder(w).Encode(response)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
schema, err := client.GetLatestSchema("user-value")
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if schema.LatestID != 1 {
t.Errorf("Expected schema ID 1, got %d", schema.LatestID)
}
if schema.Subject != "user-value" {
t.Errorf("Expected subject 'user-value', got %s", schema.Subject)
}
}
func TestRegistryClient_RegisterSchema(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" && r.URL.Path == "/subjects/test-value/versions" {
response := map[string]interface{}{
"id": uint32(123),
}
json.NewEncoder(w).Encode(response)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
schemaStr := `{"type":"record","name":"Test","fields":[{"name":"id","type":"int"}]}`
id, err := client.RegisterSchema("test-value", schemaStr)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if id != 123 {
t.Errorf("Expected schema ID 123, got %d", id)
}
}
func TestRegistryClient_CheckCompatibility(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" && r.URL.Path == "/compatibility/subjects/test-value/versions/latest" {
response := map[string]interface{}{
"is_compatible": true,
}
json.NewEncoder(w).Encode(response)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
schemaStr := `{"type":"record","name":"Test","fields":[{"name":"id","type":"int"}]}`
compatible, err := client.CheckCompatibility("test-value", schemaStr)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if !compatible {
t.Error("Expected schema to be compatible")
}
}
func TestRegistryClient_ListSubjects(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/subjects" {
subjects := []string{"user-value", "order-value", "product-key"}
json.NewEncoder(w).Encode(subjects)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
subjects, err := client.ListSubjects()
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
expectedSubjects := []string{"user-value", "order-value", "product-key"}
if len(subjects) != len(expectedSubjects) {
t.Errorf("Expected %d subjects, got %d", len(expectedSubjects), len(subjects))
}
for i, expected := range expectedSubjects {
if subjects[i] != expected {
t.Errorf("Expected subject %s, got %s", expected, subjects[i])
}
}
}
func TestRegistryClient_DetectSchemaFormat(t *testing.T) {
config := RegistryConfig{URL: "http://localhost:8081"}
client := NewRegistryClient(config)
tests := []struct {
name string
schema string
expected Format
}{
{
name: "Avro record schema",
schema: `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`,
expected: FormatAvro,
},
{
name: "Avro enum schema",
schema: `{"type":"enum","name":"Color","symbols":["RED","GREEN","BLUE"]}`,
expected: FormatAvro,
},
{
name: "JSON Schema",
schema: `{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}`,
expected: FormatJSONSchema,
},
{
name: "Protobuf (non-JSON)",
schema: "syntax = \"proto3\"; message User { int32 id = 1; }",
expected: FormatProtobuf,
},
{
name: "Simple Avro primitive",
schema: `{"type":"string"}`,
expected: FormatAvro,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
format := client.detectSchemaFormat(tt.schema)
if format != tt.expected {
t.Errorf("Expected format %v, got %v", tt.expected, format)
}
})
}
}
func TestRegistryClient_CacheManagement(t *testing.T) {
config := RegistryConfig{
URL: "http://localhost:8081",
CacheTTL: 100 * time.Millisecond, // Short TTL for testing
}
client := NewRegistryClient(config)
// Add some cache entries manually
client.schemaCache[1] = &CachedSchema{
ID: 1,
Schema: "test",
CachedAt: time.Now(),
}
client.subjectCache["test"] = &CachedSubject{
Subject: "test",
CachedAt: time.Now(),
}
// Check cache stats
schemaCount, subjectCount := client.GetCacheStats()
if schemaCount != 1 || subjectCount != 1 {
t.Errorf("Expected 1 schema and 1 subject in cache, got %d and %d", schemaCount, subjectCount)
}
// Clear cache
client.ClearCache()
schemaCount, subjectCount = client.GetCacheStats()
if schemaCount != 0 || subjectCount != 0 {
t.Errorf("Expected empty cache after clear, got %d schemas and %d subjects", schemaCount, subjectCount)
}
}
func TestRegistryClient_HealthCheck(t *testing.T) {
t.Run("healthy registry", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/subjects" {
json.NewEncoder(w).Encode([]string{})
}
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
err := client.HealthCheck()
if err != nil {
t.Errorf("Expected healthy registry, got error: %v", err)
}
})
t.Run("unhealthy registry", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
err := client.HealthCheck()
if err == nil {
t.Error("Expected error for unhealthy registry")
}
})
}
// Benchmark tests
func BenchmarkRegistryClient_GetSchemaByID(b *testing.B) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := map[string]interface{}{
"schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`,
"subject": "user-value",
"version": 1,
}
json.NewEncoder(w).Encode(response)
}))
defer server.Close()
config := RegistryConfig{URL: server.URL}
client := NewRegistryClient(config)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = client.GetSchemaByID(1)
}
}
func BenchmarkRegistryClient_DetectSchemaFormat(b *testing.B) {
config := RegistryConfig{URL: "http://localhost:8081"}
client := NewRegistryClient(config)
avroSchema := `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = client.detectSchemaFormat(avroSchema)
}
}