1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/schema/registry_client.go
2025-09-13 08:06:36 -07:00

358 lines
9.9 KiB
Go

package schema
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
// RegistryClient provides access to a Confluent Schema Registry
type RegistryClient struct {
baseURL string
httpClient *http.Client
// Caching
schemaCache map[uint32]*CachedSchema // schema ID -> schema
subjectCache map[string]*CachedSubject // subject -> latest version info
cacheMu sync.RWMutex
cacheTTL time.Duration
}
// CachedSchema represents a cached schema with metadata
type CachedSchema struct {
ID uint32 `json:"id"`
Schema string `json:"schema"`
Subject string `json:"subject"`
Version int `json:"version"`
Format Format `json:"-"` // Derived from schema content
CachedAt time.Time `json:"-"`
}
// CachedSubject represents cached subject information
type CachedSubject struct {
Subject string `json:"subject"`
LatestID uint32 `json:"id"`
Version int `json:"version"`
Schema string `json:"schema"`
CachedAt time.Time `json:"-"`
}
// RegistryConfig holds configuration for the Schema Registry client
type RegistryConfig struct {
URL string
Username string // Optional basic auth
Password string // Optional basic auth
Timeout time.Duration
CacheTTL time.Duration
MaxRetries int
}
// NewRegistryClient creates a new Schema Registry client
func NewRegistryClient(config RegistryConfig) *RegistryClient {
if config.Timeout == 0 {
config.Timeout = 30 * time.Second
}
if config.CacheTTL == 0 {
config.CacheTTL = 5 * time.Minute
}
httpClient := &http.Client{
Timeout: config.Timeout,
}
return &RegistryClient{
baseURL: config.URL,
httpClient: httpClient,
schemaCache: make(map[uint32]*CachedSchema),
subjectCache: make(map[string]*CachedSubject),
cacheTTL: config.CacheTTL,
}
}
// GetSchemaByID retrieves a schema by its ID
func (rc *RegistryClient) GetSchemaByID(schemaID uint32) (*CachedSchema, error) {
// Check cache first
rc.cacheMu.RLock()
if cached, exists := rc.schemaCache[schemaID]; exists {
if time.Since(cached.CachedAt) < rc.cacheTTL {
rc.cacheMu.RUnlock()
return cached, nil
}
}
rc.cacheMu.RUnlock()
// Fetch from registry
url := fmt.Sprintf("%s/schemas/ids/%d", rc.baseURL, schemaID)
resp, err := rc.httpClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to fetch schema %d: %w", schemaID, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body))
}
var schemaResp struct {
Schema string `json:"schema"`
Subject string `json:"subject"`
Version int `json:"version"`
}
if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil {
return nil, fmt.Errorf("failed to decode schema response: %w", err)
}
// Determine format from schema content
format := rc.detectSchemaFormat(schemaResp.Schema)
cached := &CachedSchema{
ID: schemaID,
Schema: schemaResp.Schema,
Subject: schemaResp.Subject,
Version: schemaResp.Version,
Format: format,
CachedAt: time.Now(),
}
// Update cache
rc.cacheMu.Lock()
rc.schemaCache[schemaID] = cached
rc.cacheMu.Unlock()
return cached, nil
}
// GetLatestSchema retrieves the latest schema for a subject
func (rc *RegistryClient) GetLatestSchema(subject string) (*CachedSubject, error) {
// Check cache first
rc.cacheMu.RLock()
if cached, exists := rc.subjectCache[subject]; exists {
if time.Since(cached.CachedAt) < rc.cacheTTL {
rc.cacheMu.RUnlock()
return cached, nil
}
}
rc.cacheMu.RUnlock()
// Fetch from registry
url := fmt.Sprintf("%s/subjects/%s/versions/latest", rc.baseURL, subject)
resp, err := rc.httpClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to fetch latest schema for %s: %w", subject, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body))
}
var schemaResp struct {
ID uint32 `json:"id"`
Schema string `json:"schema"`
Subject string `json:"subject"`
Version int `json:"version"`
}
if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil {
return nil, fmt.Errorf("failed to decode schema response: %w", err)
}
cached := &CachedSubject{
Subject: subject,
LatestID: schemaResp.ID,
Version: schemaResp.Version,
Schema: schemaResp.Schema,
CachedAt: time.Now(),
}
// Update cache
rc.cacheMu.Lock()
rc.subjectCache[subject] = cached
rc.cacheMu.Unlock()
return cached, nil
}
// RegisterSchema registers a new schema for a subject
func (rc *RegistryClient) RegisterSchema(subject, schema string) (uint32, error) {
url := fmt.Sprintf("%s/subjects/%s/versions", rc.baseURL, subject)
reqBody := map[string]string{
"schema": schema,
}
jsonData, err := json.Marshal(reqBody)
if err != nil {
return 0, fmt.Errorf("failed to marshal schema request: %w", err)
}
resp, err := rc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return 0, fmt.Errorf("failed to register schema: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return 0, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body))
}
var regResp struct {
ID uint32 `json:"id"`
}
if err := json.NewDecoder(resp.Body).Decode(&regResp); err != nil {
return 0, fmt.Errorf("failed to decode registration response: %w", err)
}
// Invalidate caches for this subject
rc.cacheMu.Lock()
delete(rc.subjectCache, subject)
// Note: we don't cache the new schema here since we don't have full metadata
rc.cacheMu.Unlock()
return regResp.ID, nil
}
// CheckCompatibility checks if a schema is compatible with the subject
func (rc *RegistryClient) CheckCompatibility(subject, schema string) (bool, error) {
url := fmt.Sprintf("%s/compatibility/subjects/%s/versions/latest", rc.baseURL, subject)
reqBody := map[string]string{
"schema": schema,
}
jsonData, err := json.Marshal(reqBody)
if err != nil {
return false, fmt.Errorf("failed to marshal compatibility request: %w", err)
}
resp, err := rc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return false, fmt.Errorf("failed to check compatibility: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return false, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body))
}
var compatResp struct {
IsCompatible bool `json:"is_compatible"`
}
if err := json.NewDecoder(resp.Body).Decode(&compatResp); err != nil {
return false, fmt.Errorf("failed to decode compatibility response: %w", err)
}
return compatResp.IsCompatible, nil
}
// ListSubjects returns all subjects in the registry
func (rc *RegistryClient) ListSubjects() ([]string, error) {
url := fmt.Sprintf("%s/subjects", rc.baseURL)
resp, err := rc.httpClient.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to list subjects: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body))
}
var subjects []string
if err := json.NewDecoder(resp.Body).Decode(&subjects); err != nil {
return nil, fmt.Errorf("failed to decode subjects response: %w", err)
}
return subjects, nil
}
// ClearCache clears all cached schemas and subjects
func (rc *RegistryClient) ClearCache() {
rc.cacheMu.Lock()
defer rc.cacheMu.Unlock()
rc.schemaCache = make(map[uint32]*CachedSchema)
rc.subjectCache = make(map[string]*CachedSubject)
}
// GetCacheStats returns cache statistics
func (rc *RegistryClient) GetCacheStats() (schemaCount, subjectCount int) {
rc.cacheMu.RLock()
defer rc.cacheMu.RUnlock()
return len(rc.schemaCache), len(rc.subjectCache)
}
// detectSchemaFormat attempts to determine the schema format from content
func (rc *RegistryClient) detectSchemaFormat(schema string) Format {
// Try to parse as JSON first (Avro schemas are JSON)
var jsonObj interface{}
if err := json.Unmarshal([]byte(schema), &jsonObj); err == nil {
// Check for Avro-specific fields
if schemaMap, ok := jsonObj.(map[string]interface{}); ok {
if schemaType, exists := schemaMap["type"]; exists {
if typeStr, ok := schemaType.(string); ok {
// Common Avro types
avroTypes := []string{"record", "enum", "array", "map", "union", "fixed"}
for _, avroType := range avroTypes {
if typeStr == avroType {
return FormatAvro
}
}
// Common JSON Schema types (that are not Avro types)
// Note: "string" is ambiguous - it could be Avro primitive or JSON Schema
// We need to check other indicators first
jsonSchemaTypes := []string{"object", "number", "integer", "boolean", "null"}
for _, jsonSchemaType := range jsonSchemaTypes {
if typeStr == jsonSchemaType {
return FormatJSONSchema
}
}
}
}
// Check for JSON Schema indicators
if _, exists := schemaMap["$schema"]; exists {
return FormatJSONSchema
}
// Check for JSON Schema properties field
if _, exists := schemaMap["properties"]; exists {
return FormatJSONSchema
}
}
// Default JSON-based schema to Avro only if it doesn't look like JSON Schema
return FormatAvro
}
// Check for Protobuf (typically not JSON)
// Protobuf schemas in Schema Registry are usually stored as descriptors
// For now, assume non-JSON schemas are Protobuf
return FormatProtobuf
}
// HealthCheck verifies the registry is accessible
func (rc *RegistryClient) HealthCheck() error {
url := fmt.Sprintf("%s/subjects", rc.baseURL)
resp, err := rc.httpClient.Get(url)
if err != nil {
return fmt.Errorf("schema registry health check failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("schema registry health check failed with status %d", resp.StatusCode)
}
return nil
}