mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
Phase E2: Integrate Protobuf descriptor parser with decoder - Update NewProtobufDecoder to use ProtobufDescriptorParser - Add findFirstMessageName helper for automatic message detection - Fix ParseBinaryDescriptor to return schema even on resolution failure - Add comprehensive tests for protobuf decoder integration - Improve error handling and caching behavior This enables proper binary descriptor parsing in the protobuf decoder, completing the integration between descriptor parsing and decoding. Phase E3: Complete Protobuf message descriptor resolution - Implement full protobuf descriptor resolution using protoreflect API - Add buildFileDescriptor and findMessageInFileDescriptor methods - Support nested message resolution with findNestedMessageDescriptor - Add proper mutex protection for thread-safe cache access - Update all test data to use proper field cardinality labels - Update test expectations to handle successful descriptor resolution - Enable full protobuf decoder creation from binary descriptors Phase E (Protobuf Support) is now complete: ✅ E1: Binary descriptor parsing ✅ E2: Decoder integration ✅ E3: Full message descriptor resolution Protobuf messages can now be fully parsed and decoded Phase F: Implement Kafka record batch compression support - Add comprehensive compression module supporting gzip/snappy/lz4/zstd - Implement RecordBatchParser with full compression and CRC validation - Support compression codec extraction from record batch attributes - Add compression/decompression for all major Kafka codecs - Integrate compression support into Produce and Fetch handlers - Add extensive unit tests for all compression codecs - Support round-trip compression/decompression with proper error handling - Add performance benchmarks for compression operations Key features: ✅ Gzip compression (ratio: 0.02) ✅ Snappy compression (ratio: 0.06, fastest) ✅ LZ4 compression (ratio: 0.02) ✅ Zstd compression (ratio: 0.01, best compression) ✅ CRC32 validation for record batch integrity ✅ Proper Kafka record batch format v2 parsing ✅ Backward compatibility with uncompressed records Phase F (Compression Handling) is now complete. Phase G: Implement advanced schema compatibility checking and migration - Add comprehensive SchemaEvolutionChecker with full compatibility rules - Support BACKWARD, FORWARD, FULL, and NONE compatibility levels - Implement Avro schema compatibility checking with field analysis - Add JSON Schema compatibility validation - Support Protobuf compatibility checking (simplified implementation) - Add type promotion rules (int->long, float->double, string<->bytes) - Integrate schema evolution into Manager with validation methods - Add schema evolution suggestions and migration guidance - Support schema compatibility validation before evolution - Add comprehensive unit tests for all compatibility scenarios Key features: ✅ BACKWARD compatibility: New schema can read old data ✅ FORWARD compatibility: Old schema can read new data ✅ FULL compatibility: Both backward and forward compatible ✅ Type promotion support for safe schema evolution ✅ Field addition/removal validation with default value checks ✅ Schema evolution suggestions for incompatible changes ✅ Integration with schema registry for validation workflows Phase G (Schema Evolution) is now complete. fmt
203 lines
5 KiB
Go
203 lines
5 KiB
Go
package compression
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/golang/snappy"
|
|
"github.com/klauspost/compress/zstd"
|
|
"github.com/pierrec/lz4/v4"
|
|
)
|
|
|
|
// nopCloser wraps an io.Reader to provide a no-op Close method
|
|
type nopCloser struct {
|
|
io.Reader
|
|
}
|
|
|
|
func (nopCloser) Close() error { return nil }
|
|
|
|
// CompressionCodec represents the compression codec used in Kafka record batches
|
|
type CompressionCodec int8
|
|
|
|
const (
|
|
None CompressionCodec = 0
|
|
Gzip CompressionCodec = 1
|
|
Snappy CompressionCodec = 2
|
|
Lz4 CompressionCodec = 3
|
|
Zstd CompressionCodec = 4
|
|
)
|
|
|
|
// String returns the string representation of the compression codec
|
|
func (c CompressionCodec) String() string {
|
|
switch c {
|
|
case None:
|
|
return "none"
|
|
case Gzip:
|
|
return "gzip"
|
|
case Snappy:
|
|
return "snappy"
|
|
case Lz4:
|
|
return "lz4"
|
|
case Zstd:
|
|
return "zstd"
|
|
default:
|
|
return fmt.Sprintf("unknown(%d)", c)
|
|
}
|
|
}
|
|
|
|
// IsValid returns true if the compression codec is valid
|
|
func (c CompressionCodec) IsValid() bool {
|
|
return c >= None && c <= Zstd
|
|
}
|
|
|
|
// ExtractCompressionCodec extracts the compression codec from record batch attributes
|
|
func ExtractCompressionCodec(attributes int16) CompressionCodec {
|
|
return CompressionCodec(attributes & 0x07) // Lower 3 bits
|
|
}
|
|
|
|
// SetCompressionCodec sets the compression codec in record batch attributes
|
|
func SetCompressionCodec(attributes int16, codec CompressionCodec) int16 {
|
|
return (attributes &^ 0x07) | int16(codec)
|
|
}
|
|
|
|
// Compress compresses data using the specified codec
|
|
func Compress(codec CompressionCodec, data []byte) ([]byte, error) {
|
|
if codec == None {
|
|
return data, nil
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
var writer io.WriteCloser
|
|
var err error
|
|
|
|
switch codec {
|
|
case Gzip:
|
|
writer = gzip.NewWriter(&buf)
|
|
case Snappy:
|
|
// Snappy doesn't have a streaming writer, so we compress directly
|
|
compressed := snappy.Encode(nil, data)
|
|
if compressed == nil {
|
|
compressed = []byte{}
|
|
}
|
|
return compressed, nil
|
|
case Lz4:
|
|
writer = lz4.NewWriter(&buf)
|
|
case Zstd:
|
|
writer, err = zstd.NewWriter(&buf)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create zstd writer: %w", err)
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported compression codec: %s", codec)
|
|
}
|
|
|
|
if _, err := writer.Write(data); err != nil {
|
|
writer.Close()
|
|
return nil, fmt.Errorf("failed to write compressed data: %w", err)
|
|
}
|
|
|
|
if err := writer.Close(); err != nil {
|
|
return nil, fmt.Errorf("failed to close compressor: %w", err)
|
|
}
|
|
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
// Decompress decompresses data using the specified codec
|
|
func Decompress(codec CompressionCodec, data []byte) ([]byte, error) {
|
|
if codec == None {
|
|
return data, nil
|
|
}
|
|
|
|
var reader io.ReadCloser
|
|
var err error
|
|
|
|
buf := bytes.NewReader(data)
|
|
|
|
switch codec {
|
|
case Gzip:
|
|
reader, err = gzip.NewReader(buf)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
|
|
}
|
|
case Snappy:
|
|
// Snappy doesn't have a streaming reader, so we decompress directly
|
|
decompressed, err := snappy.Decode(nil, data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decompress snappy data: %w", err)
|
|
}
|
|
if decompressed == nil {
|
|
decompressed = []byte{}
|
|
}
|
|
return decompressed, nil
|
|
case Lz4:
|
|
lz4Reader := lz4.NewReader(buf)
|
|
// lz4.Reader doesn't implement Close, so we wrap it
|
|
reader = &nopCloser{Reader: lz4Reader}
|
|
case Zstd:
|
|
zstdReader, err := zstd.NewReader(buf)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
|
|
}
|
|
defer zstdReader.Close()
|
|
|
|
var result bytes.Buffer
|
|
if _, err := io.Copy(&result, zstdReader); err != nil {
|
|
return nil, fmt.Errorf("failed to decompress zstd data: %w", err)
|
|
}
|
|
decompressed := result.Bytes()
|
|
if decompressed == nil {
|
|
decompressed = []byte{}
|
|
}
|
|
return decompressed, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported compression codec: %s", codec)
|
|
}
|
|
|
|
defer reader.Close()
|
|
|
|
var result bytes.Buffer
|
|
if _, err := io.Copy(&result, reader); err != nil {
|
|
return nil, fmt.Errorf("failed to decompress data: %w", err)
|
|
}
|
|
|
|
decompressed := result.Bytes()
|
|
if decompressed == nil {
|
|
decompressed = []byte{}
|
|
}
|
|
return decompressed, nil
|
|
}
|
|
|
|
// CompressRecordBatch compresses the records portion of a Kafka record batch
|
|
// This function compresses only the records data, not the entire batch header
|
|
func CompressRecordBatch(codec CompressionCodec, recordsData []byte) ([]byte, int16, error) {
|
|
if codec == None {
|
|
return recordsData, 0, nil
|
|
}
|
|
|
|
compressed, err := Compress(codec, recordsData)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to compress record batch: %w", err)
|
|
}
|
|
|
|
attributes := int16(codec)
|
|
return compressed, attributes, nil
|
|
}
|
|
|
|
// DecompressRecordBatch decompresses the records portion of a Kafka record batch
|
|
func DecompressRecordBatch(attributes int16, compressedData []byte) ([]byte, error) {
|
|
codec := ExtractCompressionCodec(attributes)
|
|
|
|
if codec == None {
|
|
return compressedData, nil
|
|
}
|
|
|
|
decompressed, err := Decompress(codec, compressedData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decompress record batch: %w", err)
|
|
}
|
|
|
|
return decompressed, nil
|
|
}
|