package schema import ( "bytes" "encoding/json" "fmt" "io" "net/http" "sync" "time" ) // RegistryClient provides access to a Confluent Schema Registry type RegistryClient struct { baseURL string httpClient *http.Client // Caching schemaCache map[uint32]*CachedSchema // schema ID -> schema subjectCache map[string]*CachedSubject // subject -> latest version info cacheMu sync.RWMutex cacheTTL time.Duration } // CachedSchema represents a cached schema with metadata type CachedSchema struct { ID uint32 `json:"id"` Schema string `json:"schema"` Subject string `json:"subject"` Version int `json:"version"` Format Format `json:"-"` // Derived from schema content CachedAt time.Time `json:"-"` } // CachedSubject represents cached subject information type CachedSubject struct { Subject string `json:"subject"` LatestID uint32 `json:"id"` Version int `json:"version"` Schema string `json:"schema"` CachedAt time.Time `json:"-"` } // RegistryConfig holds configuration for the Schema Registry client type RegistryConfig struct { URL string Username string // Optional basic auth Password string // Optional basic auth Timeout time.Duration CacheTTL time.Duration MaxRetries int } // NewRegistryClient creates a new Schema Registry client func NewRegistryClient(config RegistryConfig) *RegistryClient { if config.Timeout == 0 { config.Timeout = 30 * time.Second } if config.CacheTTL == 0 { config.CacheTTL = 5 * time.Minute } httpClient := &http.Client{ Timeout: config.Timeout, } return &RegistryClient{ baseURL: config.URL, httpClient: httpClient, schemaCache: make(map[uint32]*CachedSchema), subjectCache: make(map[string]*CachedSubject), cacheTTL: config.CacheTTL, } } // GetSchemaByID retrieves a schema by its ID func (rc *RegistryClient) GetSchemaByID(schemaID uint32) (*CachedSchema, error) { // Check cache first rc.cacheMu.RLock() if cached, exists := rc.schemaCache[schemaID]; exists { if time.Since(cached.CachedAt) < rc.cacheTTL { rc.cacheMu.RUnlock() return cached, nil } } rc.cacheMu.RUnlock() // Fetch from registry url := fmt.Sprintf("%s/schemas/ids/%d", rc.baseURL, schemaID) resp, err := rc.httpClient.Get(url) if err != nil { return nil, fmt.Errorf("failed to fetch schema %d: %w", schemaID, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) } var schemaResp struct { Schema string `json:"schema"` Subject string `json:"subject"` Version int `json:"version"` } if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil { return nil, fmt.Errorf("failed to decode schema response: %w", err) } // Determine format from schema content format := rc.detectSchemaFormat(schemaResp.Schema) cached := &CachedSchema{ ID: schemaID, Schema: schemaResp.Schema, Subject: schemaResp.Subject, Version: schemaResp.Version, Format: format, CachedAt: time.Now(), } // Update cache rc.cacheMu.Lock() rc.schemaCache[schemaID] = cached rc.cacheMu.Unlock() return cached, nil } // GetLatestSchema retrieves the latest schema for a subject func (rc *RegistryClient) GetLatestSchema(subject string) (*CachedSubject, error) { // Check cache first rc.cacheMu.RLock() if cached, exists := rc.subjectCache[subject]; exists { if time.Since(cached.CachedAt) < rc.cacheTTL { rc.cacheMu.RUnlock() return cached, nil } } rc.cacheMu.RUnlock() // Fetch from registry url := fmt.Sprintf("%s/subjects/%s/versions/latest", rc.baseURL, subject) resp, err := rc.httpClient.Get(url) if err != nil { return nil, fmt.Errorf("failed to fetch latest schema for %s: %w", subject, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) } var schemaResp struct { ID uint32 `json:"id"` Schema string `json:"schema"` Subject string `json:"subject"` Version int `json:"version"` } if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil { return nil, fmt.Errorf("failed to decode schema response: %w", err) } cached := &CachedSubject{ Subject: subject, LatestID: schemaResp.ID, Version: schemaResp.Version, Schema: schemaResp.Schema, CachedAt: time.Now(), } // Update cache rc.cacheMu.Lock() rc.subjectCache[subject] = cached rc.cacheMu.Unlock() return cached, nil } // RegisterSchema registers a new schema for a subject func (rc *RegistryClient) RegisterSchema(subject, schema string) (uint32, error) { url := fmt.Sprintf("%s/subjects/%s/versions", rc.baseURL, subject) reqBody := map[string]string{ "schema": schema, } jsonData, err := json.Marshal(reqBody) if err != nil { return 0, fmt.Errorf("failed to marshal schema request: %w", err) } resp, err := rc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return 0, fmt.Errorf("failed to register schema: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return 0, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) } var regResp struct { ID uint32 `json:"id"` } if err := json.NewDecoder(resp.Body).Decode(®Resp); err != nil { return 0, fmt.Errorf("failed to decode registration response: %w", err) } // Invalidate caches for this subject rc.cacheMu.Lock() delete(rc.subjectCache, subject) // Note: we don't cache the new schema here since we don't have full metadata rc.cacheMu.Unlock() return regResp.ID, nil } // CheckCompatibility checks if a schema is compatible with the subject func (rc *RegistryClient) CheckCompatibility(subject, schema string) (bool, error) { url := fmt.Sprintf("%s/compatibility/subjects/%s/versions/latest", rc.baseURL, subject) reqBody := map[string]string{ "schema": schema, } jsonData, err := json.Marshal(reqBody) if err != nil { return false, fmt.Errorf("failed to marshal compatibility request: %w", err) } resp, err := rc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return false, fmt.Errorf("failed to check compatibility: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return false, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) } var compatResp struct { IsCompatible bool `json:"is_compatible"` } if err := json.NewDecoder(resp.Body).Decode(&compatResp); err != nil { return false, fmt.Errorf("failed to decode compatibility response: %w", err) } return compatResp.IsCompatible, nil } // ListSubjects returns all subjects in the registry func (rc *RegistryClient) ListSubjects() ([]string, error) { url := fmt.Sprintf("%s/subjects", rc.baseURL) resp, err := rc.httpClient.Get(url) if err != nil { return nil, fmt.Errorf("failed to list subjects: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) } var subjects []string if err := json.NewDecoder(resp.Body).Decode(&subjects); err != nil { return nil, fmt.Errorf("failed to decode subjects response: %w", err) } return subjects, nil } // ClearCache clears all cached schemas and subjects func (rc *RegistryClient) ClearCache() { rc.cacheMu.Lock() defer rc.cacheMu.Unlock() rc.schemaCache = make(map[uint32]*CachedSchema) rc.subjectCache = make(map[string]*CachedSubject) } // GetCacheStats returns cache statistics func (rc *RegistryClient) GetCacheStats() (schemaCount, subjectCount int) { rc.cacheMu.RLock() defer rc.cacheMu.RUnlock() return len(rc.schemaCache), len(rc.subjectCache) } // detectSchemaFormat attempts to determine the schema format from content func (rc *RegistryClient) detectSchemaFormat(schema string) Format { // Try to parse as JSON first (Avro schemas are JSON) var jsonObj interface{} if err := json.Unmarshal([]byte(schema), &jsonObj); err == nil { // Check for Avro-specific fields if schemaMap, ok := jsonObj.(map[string]interface{}); ok { if schemaType, exists := schemaMap["type"]; exists { if typeStr, ok := schemaType.(string); ok { // Common Avro types avroTypes := []string{"record", "enum", "array", "map", "union", "fixed"} for _, avroType := range avroTypes { if typeStr == avroType { return FormatAvro } } // Common JSON Schema types (that are not Avro types) // Note: "string" is ambiguous - it could be Avro primitive or JSON Schema // We need to check other indicators first jsonSchemaTypes := []string{"object", "number", "integer", "boolean", "null"} for _, jsonSchemaType := range jsonSchemaTypes { if typeStr == jsonSchemaType { return FormatJSONSchema } } } } // Check for JSON Schema indicators if _, exists := schemaMap["$schema"]; exists { return FormatJSONSchema } // Check for JSON Schema properties field if _, exists := schemaMap["properties"]; exists { return FormatJSONSchema } } // Default JSON-based schema to Avro only if it doesn't look like JSON Schema return FormatAvro } // Check for Protobuf (typically not JSON) // Protobuf schemas in Schema Registry are usually stored as descriptors // For now, assume non-JSON schemas are Protobuf return FormatProtobuf } // HealthCheck verifies the registry is accessible func (rc *RegistryClient) HealthCheck() error { url := fmt.Sprintf("%s/subjects", rc.baseURL) resp, err := rc.httpClient.Get(url) if err != nil { return fmt.Errorf("schema registry health check failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("schema registry health check failed with status %d", resp.StatusCode) } return nil }