1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/compression/compression.go
chrislu deb315a8a9 persist kafka offset
Phase E2: Integrate Protobuf descriptor parser with decoder

- Update NewProtobufDecoder to use ProtobufDescriptorParser
- Add findFirstMessageName helper for automatic message detection
- Fix ParseBinaryDescriptor to return schema even on resolution failure
- Add comprehensive tests for protobuf decoder integration
- Improve error handling and caching behavior

This enables proper binary descriptor parsing in the protobuf decoder,
completing the integration between descriptor parsing and decoding.

Phase E3: Complete Protobuf message descriptor resolution

- Implement full protobuf descriptor resolution using protoreflect API
- Add buildFileDescriptor and findMessageInFileDescriptor methods
- Support nested message resolution with findNestedMessageDescriptor
- Add proper mutex protection for thread-safe cache access
- Update all test data to use proper field cardinality labels
- Update test expectations to handle successful descriptor resolution
- Enable full protobuf decoder creation from binary descriptors

Phase E (Protobuf Support) is now complete:
 E1: Binary descriptor parsing
 E2: Decoder integration
 E3: Full message descriptor resolution

Protobuf messages can now be fully parsed and decoded

Phase F: Implement Kafka record batch compression support

- Add comprehensive compression module supporting gzip/snappy/lz4/zstd
- Implement RecordBatchParser with full compression and CRC validation
- Support compression codec extraction from record batch attributes
- Add compression/decompression for all major Kafka codecs
- Integrate compression support into Produce and Fetch handlers
- Add extensive unit tests for all compression codecs
- Support round-trip compression/decompression with proper error handling
- Add performance benchmarks for compression operations

Key features:
 Gzip compression (ratio: 0.02)
 Snappy compression (ratio: 0.06, fastest)
 LZ4 compression (ratio: 0.02)
 Zstd compression (ratio: 0.01, best compression)
 CRC32 validation for record batch integrity
 Proper Kafka record batch format v2 parsing
 Backward compatibility with uncompressed records

Phase F (Compression Handling) is now complete.

Phase G: Implement advanced schema compatibility checking and migration

- Add comprehensive SchemaEvolutionChecker with full compatibility rules
- Support BACKWARD, FORWARD, FULL, and NONE compatibility levels
- Implement Avro schema compatibility checking with field analysis
- Add JSON Schema compatibility validation
- Support Protobuf compatibility checking (simplified implementation)
- Add type promotion rules (int->long, float->double, string<->bytes)
- Integrate schema evolution into Manager with validation methods
- Add schema evolution suggestions and migration guidance
- Support schema compatibility validation before evolution
- Add comprehensive unit tests for all compatibility scenarios

Key features:
 BACKWARD compatibility: New schema can read old data
 FORWARD compatibility: Old schema can read new data
 FULL compatibility: Both backward and forward compatible
 Type promotion support for safe schema evolution
 Field addition/removal validation with default value checks
 Schema evolution suggestions for incompatible changes
 Integration with schema registry for validation workflows

Phase G (Schema Evolution) is now complete.

fmt
2025-09-11 19:53:00 -07:00

203 lines
5 KiB
Go

package compression
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)
// nopCloser wraps an io.Reader to provide a no-op Close method
type nopCloser struct {
io.Reader
}
func (nopCloser) Close() error { return nil }
// CompressionCodec represents the compression codec used in Kafka record batches
type CompressionCodec int8
const (
None CompressionCodec = 0
Gzip CompressionCodec = 1
Snappy CompressionCodec = 2
Lz4 CompressionCodec = 3
Zstd CompressionCodec = 4
)
// String returns the string representation of the compression codec
func (c CompressionCodec) String() string {
switch c {
case None:
return "none"
case Gzip:
return "gzip"
case Snappy:
return "snappy"
case Lz4:
return "lz4"
case Zstd:
return "zstd"
default:
return fmt.Sprintf("unknown(%d)", c)
}
}
// IsValid returns true if the compression codec is valid
func (c CompressionCodec) IsValid() bool {
return c >= None && c <= Zstd
}
// ExtractCompressionCodec extracts the compression codec from record batch attributes
func ExtractCompressionCodec(attributes int16) CompressionCodec {
return CompressionCodec(attributes & 0x07) // Lower 3 bits
}
// SetCompressionCodec sets the compression codec in record batch attributes
func SetCompressionCodec(attributes int16, codec CompressionCodec) int16 {
return (attributes &^ 0x07) | int16(codec)
}
// Compress compresses data using the specified codec
func Compress(codec CompressionCodec, data []byte) ([]byte, error) {
if codec == None {
return data, nil
}
var buf bytes.Buffer
var writer io.WriteCloser
var err error
switch codec {
case Gzip:
writer = gzip.NewWriter(&buf)
case Snappy:
// Snappy doesn't have a streaming writer, so we compress directly
compressed := snappy.Encode(nil, data)
if compressed == nil {
compressed = []byte{}
}
return compressed, nil
case Lz4:
writer = lz4.NewWriter(&buf)
case Zstd:
writer, err = zstd.NewWriter(&buf)
if err != nil {
return nil, fmt.Errorf("failed to create zstd writer: %w", err)
}
default:
return nil, fmt.Errorf("unsupported compression codec: %s", codec)
}
if _, err := writer.Write(data); err != nil {
writer.Close()
return nil, fmt.Errorf("failed to write compressed data: %w", err)
}
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("failed to close compressor: %w", err)
}
return buf.Bytes(), nil
}
// Decompress decompresses data using the specified codec
func Decompress(codec CompressionCodec, data []byte) ([]byte, error) {
if codec == None {
return data, nil
}
var reader io.ReadCloser
var err error
buf := bytes.NewReader(data)
switch codec {
case Gzip:
reader, err = gzip.NewReader(buf)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
case Snappy:
// Snappy doesn't have a streaming reader, so we decompress directly
decompressed, err := snappy.Decode(nil, data)
if err != nil {
return nil, fmt.Errorf("failed to decompress snappy data: %w", err)
}
if decompressed == nil {
decompressed = []byte{}
}
return decompressed, nil
case Lz4:
lz4Reader := lz4.NewReader(buf)
// lz4.Reader doesn't implement Close, so we wrap it
reader = &nopCloser{Reader: lz4Reader}
case Zstd:
zstdReader, err := zstd.NewReader(buf)
if err != nil {
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
}
defer zstdReader.Close()
var result bytes.Buffer
if _, err := io.Copy(&result, zstdReader); err != nil {
return nil, fmt.Errorf("failed to decompress zstd data: %w", err)
}
decompressed := result.Bytes()
if decompressed == nil {
decompressed = []byte{}
}
return decompressed, nil
default:
return nil, fmt.Errorf("unsupported compression codec: %s", codec)
}
defer reader.Close()
var result bytes.Buffer
if _, err := io.Copy(&result, reader); err != nil {
return nil, fmt.Errorf("failed to decompress data: %w", err)
}
decompressed := result.Bytes()
if decompressed == nil {
decompressed = []byte{}
}
return decompressed, nil
}
// CompressRecordBatch compresses the records portion of a Kafka record batch
// This function compresses only the records data, not the entire batch header
func CompressRecordBatch(codec CompressionCodec, recordsData []byte) ([]byte, int16, error) {
if codec == None {
return recordsData, 0, nil
}
compressed, err := Compress(codec, recordsData)
if err != nil {
return nil, 0, fmt.Errorf("failed to compress record batch: %w", err)
}
attributes := int16(codec)
return compressed, attributes, nil
}
// DecompressRecordBatch decompresses the records portion of a Kafka record batch
func DecompressRecordBatch(attributes int16, compressedData []byte) ([]byte, error) {
codec := ExtractCompressionCodec(attributes)
if codec == None {
return compressedData, nil
}
decompressed, err := Decompress(codec, compressedData)
if err != nil {
return nil, fmt.Errorf("failed to decompress record batch: %w", err)
}
return decompressed, nil
}