mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Add gojsonschema dependency for JSON Schema validation and parsing - Implement JSONSchemaDecoder with validation and SMQ RecordValue conversion - Support all JSON Schema types: object, array, string, number, integer, boolean - Add format-specific type mapping (date-time, email, byte, etc.) - Include schema inference from JSON Schema to SeaweedMQ RecordType - Add round-trip encoding from RecordValue back to validated JSON - Integrate JSON Schema support into Schema Manager with caching - Comprehensive test coverage for validation, decoding, and type inference This completes schema format support for Avro, Protobuf, and JSON Schema.
387 lines
10 KiB
Go
387 lines
10 KiB
Go
package schema
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/xeipuuv/gojsonschema"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// JSONSchemaDecoder handles JSON Schema validation and conversion to SeaweedMQ format
|
|
type JSONSchemaDecoder struct {
|
|
schema *gojsonschema.Schema
|
|
schemaDoc map[string]interface{} // Parsed schema document for type inference
|
|
schemaJSON string // Original schema JSON
|
|
}
|
|
|
|
// NewJSONSchemaDecoder creates a new JSON Schema decoder from a schema string
|
|
func NewJSONSchemaDecoder(schemaJSON string) (*JSONSchemaDecoder, error) {
|
|
// Parse the schema JSON
|
|
var schemaDoc map[string]interface{}
|
|
if err := json.Unmarshal([]byte(schemaJSON), &schemaDoc); err != nil {
|
|
return nil, fmt.Errorf("failed to parse JSON schema: %w", err)
|
|
}
|
|
|
|
// Create JSON Schema validator
|
|
schemaLoader := gojsonschema.NewStringLoader(schemaJSON)
|
|
schema, err := gojsonschema.NewSchema(schemaLoader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create JSON schema validator: %w", err)
|
|
}
|
|
|
|
return &JSONSchemaDecoder{
|
|
schema: schema,
|
|
schemaDoc: schemaDoc,
|
|
schemaJSON: schemaJSON,
|
|
}, nil
|
|
}
|
|
|
|
// Decode decodes and validates JSON data against the schema, returning a Go map
|
|
func (jsd *JSONSchemaDecoder) Decode(data []byte) (map[string]interface{}, error) {
|
|
// Parse JSON data
|
|
var jsonData interface{}
|
|
if err := json.Unmarshal(data, &jsonData); err != nil {
|
|
return nil, fmt.Errorf("failed to parse JSON data: %w", err)
|
|
}
|
|
|
|
// Validate against schema
|
|
documentLoader := gojsonschema.NewGoLoader(jsonData)
|
|
result, err := jsd.schema.Validate(documentLoader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to validate JSON data: %w", err)
|
|
}
|
|
|
|
if !result.Valid() {
|
|
// Collect validation errors
|
|
var errorMsgs []string
|
|
for _, desc := range result.Errors() {
|
|
errorMsgs = append(errorMsgs, desc.String())
|
|
}
|
|
return nil, fmt.Errorf("JSON data validation failed: %v", errorMsgs)
|
|
}
|
|
|
|
// Convert to map[string]interface{} for consistency
|
|
switch v := jsonData.(type) {
|
|
case map[string]interface{}:
|
|
return v, nil
|
|
case []interface{}:
|
|
// Handle array at root level by wrapping in a map
|
|
return map[string]interface{}{"items": v}, nil
|
|
default:
|
|
// Handle primitive values at root level
|
|
return map[string]interface{}{"value": v}, nil
|
|
}
|
|
}
|
|
|
|
// DecodeToRecordValue decodes JSON data directly to SeaweedMQ RecordValue
|
|
func (jsd *JSONSchemaDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
|
|
jsonMap, err := jsd.Decode(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return MapToRecordValue(jsonMap), nil
|
|
}
|
|
|
|
// InferRecordType infers a SeaweedMQ RecordType from the JSON Schema
|
|
func (jsd *JSONSchemaDecoder) InferRecordType() (*schema_pb.RecordType, error) {
|
|
return jsd.jsonSchemaToRecordType(jsd.schemaDoc), nil
|
|
}
|
|
|
|
// ValidateOnly validates JSON data against the schema without decoding
|
|
func (jsd *JSONSchemaDecoder) ValidateOnly(data []byte) error {
|
|
_, err := jsd.Decode(data)
|
|
return err
|
|
}
|
|
|
|
// jsonSchemaToRecordType converts a JSON Schema to SeaweedMQ RecordType
|
|
func (jsd *JSONSchemaDecoder) jsonSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
|
|
schemaType, _ := schemaDoc["type"].(string)
|
|
|
|
if schemaType == "object" {
|
|
return jsd.objectSchemaToRecordType(schemaDoc)
|
|
}
|
|
|
|
// For non-object schemas, create a wrapper record
|
|
return &schema_pb.RecordType{
|
|
Fields: []*schema_pb.Field{
|
|
{
|
|
Name: "value",
|
|
FieldIndex: 0,
|
|
Type: jsd.jsonSchemaTypeToType(schemaDoc),
|
|
IsRequired: true,
|
|
IsRepeated: false,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// objectSchemaToRecordType converts an object JSON Schema to RecordType
|
|
func (jsd *JSONSchemaDecoder) objectSchemaToRecordType(schemaDoc map[string]interface{}) *schema_pb.RecordType {
|
|
properties, _ := schemaDoc["properties"].(map[string]interface{})
|
|
required, _ := schemaDoc["required"].([]interface{})
|
|
|
|
// Create set of required fields for quick lookup
|
|
requiredFields := make(map[string]bool)
|
|
for _, req := range required {
|
|
if reqStr, ok := req.(string); ok {
|
|
requiredFields[reqStr] = true
|
|
}
|
|
}
|
|
|
|
fields := make([]*schema_pb.Field, 0, len(properties))
|
|
fieldIndex := int32(0)
|
|
|
|
for fieldName, fieldSchema := range properties {
|
|
fieldSchemaMap, ok := fieldSchema.(map[string]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
field := &schema_pb.Field{
|
|
Name: fieldName,
|
|
FieldIndex: fieldIndex,
|
|
Type: jsd.jsonSchemaTypeToType(fieldSchemaMap),
|
|
IsRequired: requiredFields[fieldName],
|
|
IsRepeated: jsd.isArrayType(fieldSchemaMap),
|
|
}
|
|
|
|
fields = append(fields, field)
|
|
fieldIndex++
|
|
}
|
|
|
|
return &schema_pb.RecordType{
|
|
Fields: fields,
|
|
}
|
|
}
|
|
|
|
// jsonSchemaTypeToType converts a JSON Schema type to SeaweedMQ Type
|
|
func (jsd *JSONSchemaDecoder) jsonSchemaTypeToType(schemaDoc map[string]interface{}) *schema_pb.Type {
|
|
schemaType, _ := schemaDoc["type"].(string)
|
|
|
|
switch schemaType {
|
|
case "boolean":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BOOL,
|
|
},
|
|
}
|
|
case "integer":
|
|
// Check for format hints
|
|
format, _ := schemaDoc["format"].(string)
|
|
switch format {
|
|
case "int32":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT32,
|
|
},
|
|
}
|
|
default:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}
|
|
}
|
|
case "number":
|
|
// Check for format hints
|
|
format, _ := schemaDoc["format"].(string)
|
|
switch format {
|
|
case "float":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_FLOAT,
|
|
},
|
|
}
|
|
default:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_DOUBLE,
|
|
},
|
|
}
|
|
}
|
|
case "string":
|
|
// Check for format hints
|
|
format, _ := schemaDoc["format"].(string)
|
|
switch format {
|
|
case "date-time":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_TIMESTAMP,
|
|
},
|
|
}
|
|
case "byte", "binary":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}
|
|
default:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
case "array":
|
|
items, _ := schemaDoc["items"].(map[string]interface{})
|
|
elementType := jsd.jsonSchemaTypeToType(items)
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ListType{
|
|
ListType: &schema_pb.ListType{
|
|
ElementType: elementType,
|
|
},
|
|
},
|
|
}
|
|
case "object":
|
|
nestedRecordType := jsd.objectSchemaToRecordType(schemaDoc)
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_RecordType{
|
|
RecordType: nestedRecordType,
|
|
},
|
|
}
|
|
default:
|
|
// Handle union types (oneOf, anyOf, allOf)
|
|
if oneOf, exists := schemaDoc["oneOf"].([]interface{}); exists && len(oneOf) > 0 {
|
|
// For unions, use the first type as default
|
|
if firstType, ok := oneOf[0].(map[string]interface{}); ok {
|
|
return jsd.jsonSchemaTypeToType(firstType)
|
|
}
|
|
}
|
|
|
|
// Default to string for unknown types
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// isArrayType checks if a JSON Schema represents an array type
|
|
func (jsd *JSONSchemaDecoder) isArrayType(schemaDoc map[string]interface{}) bool {
|
|
schemaType, _ := schemaDoc["type"].(string)
|
|
return schemaType == "array"
|
|
}
|
|
|
|
// EncodeFromRecordValue encodes a RecordValue back to JSON format
|
|
func (jsd *JSONSchemaDecoder) EncodeFromRecordValue(recordValue *schema_pb.RecordValue) ([]byte, error) {
|
|
// Convert RecordValue back to Go map
|
|
goMap := recordValueToMap(recordValue)
|
|
|
|
// Encode to JSON
|
|
jsonData, err := json.Marshal(goMap)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to encode to JSON: %w", err)
|
|
}
|
|
|
|
// Validate the generated JSON against the schema
|
|
if err := jsd.ValidateOnly(jsonData); err != nil {
|
|
return nil, fmt.Errorf("generated JSON failed schema validation: %w", err)
|
|
}
|
|
|
|
return jsonData, nil
|
|
}
|
|
|
|
// GetSchemaInfo returns information about the JSON Schema
|
|
func (jsd *JSONSchemaDecoder) GetSchemaInfo() map[string]interface{} {
|
|
info := make(map[string]interface{})
|
|
|
|
if title, exists := jsd.schemaDoc["title"]; exists {
|
|
info["title"] = title
|
|
}
|
|
|
|
if description, exists := jsd.schemaDoc["description"]; exists {
|
|
info["description"] = description
|
|
}
|
|
|
|
if schemaVersion, exists := jsd.schemaDoc["$schema"]; exists {
|
|
info["schema_version"] = schemaVersion
|
|
}
|
|
|
|
if schemaType, exists := jsd.schemaDoc["type"]; exists {
|
|
info["type"] = schemaType
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
// Enhanced JSON value conversion with better type handling
|
|
func (jsd *JSONSchemaDecoder) convertJSONValue(value interface{}, expectedType string) interface{} {
|
|
if value == nil {
|
|
return nil
|
|
}
|
|
|
|
switch expectedType {
|
|
case "integer":
|
|
switch v := value.(type) {
|
|
case float64:
|
|
return int64(v)
|
|
case string:
|
|
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
|
|
return i
|
|
}
|
|
}
|
|
case "number":
|
|
switch v := value.(type) {
|
|
case string:
|
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
|
return f
|
|
}
|
|
}
|
|
case "boolean":
|
|
switch v := value.(type) {
|
|
case string:
|
|
if b, err := strconv.ParseBool(v); err == nil {
|
|
return b
|
|
}
|
|
}
|
|
case "string":
|
|
// Handle date-time format conversion
|
|
if str, ok := value.(string); ok {
|
|
// Try to parse as RFC3339 timestamp
|
|
if t, err := time.Parse(time.RFC3339, str); err == nil {
|
|
return t
|
|
}
|
|
}
|
|
}
|
|
|
|
return value
|
|
}
|
|
|
|
// ValidateAndNormalize validates JSON data and normalizes types according to schema
|
|
func (jsd *JSONSchemaDecoder) ValidateAndNormalize(data []byte) ([]byte, error) {
|
|
// First decode normally
|
|
jsonMap, err := jsd.Decode(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Normalize types based on schema
|
|
normalized := jsd.normalizeMapTypes(jsonMap, jsd.schemaDoc)
|
|
|
|
// Re-encode with normalized types
|
|
return json.Marshal(normalized)
|
|
}
|
|
|
|
// normalizeMapTypes normalizes map values according to JSON Schema types
|
|
func (jsd *JSONSchemaDecoder) normalizeMapTypes(data map[string]interface{}, schemaDoc map[string]interface{}) map[string]interface{} {
|
|
properties, _ := schemaDoc["properties"].(map[string]interface{})
|
|
result := make(map[string]interface{})
|
|
|
|
for key, value := range data {
|
|
if fieldSchema, exists := properties[key]; exists {
|
|
if fieldSchemaMap, ok := fieldSchema.(map[string]interface{}); ok {
|
|
fieldType, _ := fieldSchemaMap["type"].(string)
|
|
result[key] = jsd.convertJSONValue(value, fieldType)
|
|
continue
|
|
}
|
|
}
|
|
result[key] = value
|
|
}
|
|
|
|
return result
|
|
}
|