1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/query/engine/engine_test.go
Chris Lu a7fdc0d137
Message Queue: Add sql querying (#7185)
* feat: Phase 1 - Add SQL query engine foundation for MQ topics

Implements core SQL infrastructure with metadata operations:

New Components:
- SQL parser integration using github.com/xwb1989/sqlparser
- Query engine framework in weed/query/engine/
- Schema catalog mapping MQ topics to SQL tables
- Interactive SQL CLI command 'weed sql'

Supported Operations:
- SHOW DATABASES (lists MQ namespaces)
- SHOW TABLES (lists MQ topics)
- SQL statement parsing and routing
- Error handling and result formatting

Key Design Decisions:
- MQ namespaces ↔ SQL databases
- MQ topics ↔ SQL tables
- Parquet message storage ready for querying
- Backward-compatible schema evolution support

Testing:
- Unit tests for core engine functionality
- Command integration tests
- Parse error handling validation

Assumptions (documented in code):
- All MQ messages stored in Parquet format
- Schema evolution maintains backward compatibility
- MySQL-compatible SQL syntax via sqlparser
- Single-threaded usage per SQL session

Next Phase: DDL operations (CREATE/ALTER/DROP TABLE)

* feat: Phase 2 - Add DDL operations and real MQ broker integration

Implements comprehensive DDL support for MQ topic management:

New Components:
- Real MQ broker connectivity via BrokerClient
- CREATE TABLE → ConfigureTopic gRPC calls
- DROP TABLE → DeleteTopic operations
- DESCRIBE table → Schema introspection
- SQL type mapping (SQL ↔ MQ schema types)

Enhanced Features:
- Live topic discovery from MQ broker
- Fallback to cached/sample data when broker unavailable
- MySQL-compatible DESCRIBE output
- Schema validation and error handling
- CREATE TABLE with column definitions

Key Infrastructure:
- broker_client.go: gRPC communication with MQ broker
- sql_types.go: Bidirectional SQL/MQ type conversion
- describe.go: Table schema introspection
- Enhanced engine.go: Full DDL routing and execution

Supported SQL Operations:
 SHOW DATABASES, SHOW TABLES (live + fallback)
 CREATE TABLE table_name (col1 INT, col2 VARCHAR(50), ...)
 DROP TABLE table_name
 DESCRIBE table_name / SHOW COLUMNS FROM table_name

Known Limitations:
- SQL parser issues with reserved keywords (e.g., 'timestamp')
- Requires running MQ broker for full functionality
- ALTER TABLE not yet implemented
- DeleteTopic method needs broker-side implementation

Architecture Decisions:
- Broker discovery via filer lock mechanism (same as shell commands)
- Graceful fallback when broker unavailable
- ConfigureTopic for CREATE TABLE with 6 default partitions
- Schema versioning ready for ALTER TABLE support

Testing:
- Unit tests updated with filer address parameter
- Integration tests for DDL operations
- Error handling for connection failures

Next Phase: SELECT query execution with Parquet scanning

* fix: Resolve High Priority TODOs - Real MQ Broker Integration

 COMPLETED HIGH PRIORITY TODOs:

🔧 **Real FilerClient Integration** (engine.go:131)
- Implemented GetFilerClient() method in BrokerClient
- Added filerClientImpl with full FilerClient interface compliance
- Added AdjustedUrl() and GetDataCenter() methods
- Real filerClient connection replaces nil fallback

🔧 **Partition Discovery via MQ Broker** (hybrid_message_scanner.go:116)
- Added ListTopicPartitions() method using topic configuration
- Implemented discoverTopicPartitions() in HybridMessageScanner
- Reads actual partition count from BrokerPartitionAssignments
- Generates proper partition ranges based on topic.PartitionCount

📋 **Technical Fixes:**
- Fixed compilation errors with undefined variables
- Proper error handling with filerClientErr variable
- Corrected ConfigureTopicResponse field usage (BrokerPartitionAssignments vs PartitionCount)
- Complete FilerClient interface implementation

🎯 **Impact:**
- SQL engine now connects to real MQ broker infrastructure
- Actual topic partition discovery instead of hardcoded defaults
- Production-ready broker integration with graceful fallbacks
- Maintains backward compatibility with sample data when broker unavailable

 All tests passing - High priority TODO resolution complete!

Next: Schema-aware message parsing and time filter optimization.

* feat: Time Filter Extraction - Complete Performance Optimization

 FOURTH HIGH PRIORITY TODO COMPLETED!

 **Time Filter Extraction & Push-Down Optimization** (engine.go:198-199)
- Replaced hardcoded StartTimeNs=0, StopTimeNs=0 with intelligent extraction
- Added extractTimeFilters() with recursive WHERE clause analysis
- Smart time column detection (\_timestamp_ns, created_at, timestamp, etc.)
- Comprehensive time value parsing (nanoseconds, ISO dates, datetime formats)
- Operator reversal handling (column op value vs value op column)

🧠 **Intelligent WHERE Clause Processing:**
- AND expressions: Combine time bounds (intersection) 
- OR expressions: Skip extraction (safety) 
- Parentheses: Recursive unwrapping 
- Comparison operators: >, >=, <, <=, = 
- Multiple time formats: nanoseconds, RFC3339, date-only, datetime 

🚀 **Performance Impact:**
- Push-down filtering to hybrid scanner level
- Reduced data scanning at source (live logs + Parquet files)
- Time-based partition pruning potential
- Significant performance gains for time-series queries

📊 **Comprehensive Testing (21 tests passing):**
-  Time filter extraction (6 test scenarios)
-  Time column recognition (case-insensitive)
-  Time value parsing (5 formats)
-  Full integration with SELECT queries
-  Backward compatibility maintained

💡 **Real-World Query Examples:**
Before: Scans ALL data, filters in memory
  SELECT * FROM events WHERE \_timestamp_ns > 1672531200000000000;

After: Scans ONLY relevant time range at source level
  → StartTimeNs=1672531200000000000, StopTimeNs=0
  → Massive performance improvement for large datasets!

🎯 **Production Ready Features:**
- Multiple time column formats supported
- Graceful fallbacks for invalid dates
- OR clause safety (avoids incorrect optimization)
- Comprehensive error handling

**ALL MEDIUM PRIORITY TODOs NOW READY FOR NEXT PHASEtest ./weed/query/engine/ -v* 🎉

* feat: Extended WHERE Operators - Complete Advanced Filtering

 **EXTENDED WHERE OPERATORS IMPLEMENTEDtest ./weed/query/engine/ -v | grep -E PASS

* feat: Enhanced SQL CLI Experience

 COMPLETE ENHANCED CLI IMPLEMENTATION:

🚀 **Multiple Execution Modes:**
- Interactive shell with enhanced prompts and context
- Single query execution: --query 'SQL' --output format
- Batch file processing: --file queries.sql --output csv
- Database context switching: --database dbname

📊 **Multi-Format Output:**
- Table format (ASCII) - default for interactive
- JSON format - structured data for programmatic use
- CSV format - spreadsheet-friendly output
- Smart auto-detection based on execution mode

⚙️ **Enhanced Interactive Shell:**
- Database context switching: USE database_name;
- Output format switching: \format table|json|csv
- Command history tracking (basic implementation)
- Enhanced help with WHERE operator examples
- Contextual prompts: seaweedfs:dbname>

🛠️ **Production Features:**
- Comprehensive error handling (JSON + user-friendly)
- Query execution timing and performance metrics
- 30-second timeout protection with graceful handling
- Real MQ integration with hybrid data scanning

📖 **Complete CLI Interface:**
- Full flag support: --server, --interactive, --file, --output, --database, --query
- Auto-detection of execution mode and output format
- Structured help system with practical examples
- Batch processing with multi-query file support

💡 **Advanced WHERE Integration:**
All extended operators (<=, >=, !=, LIKE, IN) fully supported
across all execution modes and output formats.

🎯 **Usage Examples:**
- weed sql --interactive
- weed sql --query 'SHOW DATABASES' --output json
- weed sql --file queries.sql --output csv
- weed sql --database analytics --interactive

Enhanced CLI experience complete - production ready! 🚀

* Delete test_utils_test.go

* fmt

* integer conversion

* show databases works

* show tables works

* Update describe.go

* actual column types

* Update .gitignore

* scan topic messages

* remove emoji

* support aggregation functions

* column name case insensitive, better auto column names

* fmt

* fix reading system fields

* use parquet statistics for optimization

* remove emoji

* parquet file generate stats

* scan all files

* parquet file generation remember the sources also

* fmt

* sql

* truncate topic

* combine parquet results with live logs

* explain

* explain the execution plan

* add tests

* improve tests

* skip

* use mock for testing

* add tests

* refactor

* fix after refactoring

* detailed logs during explain. Fix bugs on reading live logs.

* fix decoding data

* save source buffer index start for log files

* process buffer from brokers

* filter out already flushed messages

* dedup with buffer start index

* explain with broker buffer

* the parquet file should also remember the first buffer_start attribute from the sources

* parquet file can query messages in broker memory, if log files do not exist

* buffer start stored as 8 bytes

* add jdbc

* add postgres protocol

* Revert "add jdbc"

This reverts commit a6e48b7690.

* hook up seaweed sql engine

* setup integration test for postgres

* rename to "weed db"

* return fast on error

* fix versioning

* address comments

* address some comments

* column name can be on left or right in where conditions

* avoid sample data

* remove sample data

* de-support alter table and drop table

* address comments

* read broker, logs, and parquet files

* Update engine.go

* address some comments

* use schema instead of inferred result types

* fix tests

* fix todo

* fix empty spaces and coercion

* fmt

* change to pg_query_go

* fix tests

* fix tests

* fmt

* fix: Enable CGO in Docker build for pg_query_go dependency

The pg_query_go library requires CGO to be enabled as it wraps the libpg_query C library.
Added gcc and musl-dev dependencies to the Docker build for proper compilation.

* feat: Replace pg_query_go with lightweight SQL parser (no CGO required)

- Remove github.com/pganalyze/pg_query_go/v6 dependency to avoid CGO requirement
- Implement lightweight SQL parser for basic SELECT, SHOW, and DDL statements
- Fix operator precedence in WHERE clause parsing (handle AND/OR before comparisons)
- Support INTEGER, FLOAT, and STRING literals in WHERE conditions
- All SQL engine tests passing with new parser
- PostgreSQL integration tests can now build without CGO

The lightweight parser handles the essential SQL features needed for the
SeaweedFS query engine while maintaining compatibility and avoiding CGO
dependencies that caused Docker build issues.

* feat: Add Parquet logical types to mq_schema.proto

Added support for Parquet logical types in SeaweedFS message queue schema:
- TIMESTAMP: UTC timestamp in microseconds since epoch with timezone flag
- DATE: Date as days since Unix epoch (1970-01-01)
- DECIMAL: Arbitrary precision decimal with configurable precision/scale
- TIME: Time of day in microseconds since midnight

These types enable advanced analytics features:
- Time-based filtering and window functions
- Date arithmetic and year/month/day extraction
- High-precision numeric calculations
- Proper time zone handling for global deployments

Regenerated protobuf Go code with new scalar types and value messages.

* feat: Enable publishers to use Parquet logical types

Enhanced MQ publishers to utilize the new logical types:
- Updated convertToRecordValue() to use TimestampValue instead of string RFC3339
- Added DateValue support for birth_date field (days since epoch)
- Added DecimalValue support for precise_amount field with configurable precision/scale
- Enhanced UserEvent struct with PreciseAmount and BirthDate fields
- Added convertToDecimal() helper using big.Rat for precise decimal conversion
- Updated test data generator to produce varied birth dates (1970-2005) and precise amounts

Publishers now generate structured data with proper logical types:
-  TIMESTAMP: Microsecond precision UTC timestamps
-  DATE: Birth dates as days since Unix epoch
-  DECIMAL: Precise amounts with 18-digit precision, 4-decimal scale

Successfully tested with PostgreSQL integration - all topics created with logical type data.

* feat: Add logical type support to SQL query engine

Extended SQL engine to handle new Parquet logical types:
- Added TimestampValue comparison support (microsecond precision)
- Added DateValue comparison support (days since epoch)
- Added DecimalValue comparison support with string conversion
- Added TimeValue comparison support (microseconds since midnight)
- Enhanced valuesEqual(), valueLessThan(), valueGreaterThan() functions
- Added decimalToString() helper for precise decimal-to-string conversion
- Imported math/big for arbitrary precision decimal handling

The SQL engine can now:
-  Compare TIMESTAMP values for filtering (e.g., WHERE timestamp > 1672531200000000000)
-  Compare DATE values for date-based queries (e.g., WHERE birth_date >= 12345)
-  Compare DECIMAL values for precise financial calculations
-  Compare TIME values for time-of-day filtering

Next: Add YEAR(), MONTH(), DAY() extraction functions for date analytics.

* feat: Add window function foundation with timestamp support

Added comprehensive foundation for SQL window functions with timestamp analytics:

Core Window Function Types:
- WindowSpec with PartitionBy and OrderBy support
- WindowFunction struct for ROW_NUMBER, RANK, LAG, LEAD
- OrderByClause for timestamp-based ordering
- Extended SelectStatement to support WindowFunctions field

Timestamp Analytics Functions:
 ApplyRowNumber() - ROW_NUMBER() OVER (ORDER BY timestamp)
 ExtractYear() - Extract year from TIMESTAMP logical type
 ExtractMonth() - Extract month from TIMESTAMP logical type
 ExtractDay() - Extract day from TIMESTAMP logical type
 FilterByYear() - Filter records by timestamp year

Foundation for Advanced Window Functions:
- LAG/LEAD for time-series access to previous/next values
- RANK/DENSE_RANK for temporal ranking
- FIRST_VALUE/LAST_VALUE for window boundaries
- PARTITION BY support for grouped analytics

This enables sophisticated time-series analytics like:
- SELECT *, ROW_NUMBER() OVER (ORDER BY timestamp) FROM user_events WHERE EXTRACT(YEAR FROM timestamp) = 2024
- Trend analysis over time windows
- Session analytics with LAG/LEAD functions
- Time-based ranking and percentiles

Ready for production time-series analytics with proper timestamp logical type support! 🚀

* fmt

* fix

* fix describe issue

* fix tests, avoid panic

* no more mysql

* timeout client connections

* Update SQL_FEATURE_PLAN.md

* handling errors

* remove sleep

* fix splitting multiple SQLs

* fixes

* fmt

* fix

* Update weed/util/log_buffer/log_buffer.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update SQL_FEATURE_PLAN.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* code reuse

* fix

* fix

* feat: Add basic arithmetic operators (+, -, *, /, %) with comprehensive tests

- Implement EvaluateArithmeticExpression with support for all basic operators
- Handle type conversions between int, float, string, and boolean
- Add proper error handling for division/modulo by zero
- Include 14 comprehensive test cases covering all edge cases
- Support mixed type arithmetic (int + float, string numbers, etc.)

All tests passing 

* feat: Add mathematical functions ROUND, CEIL, FLOOR, ABS with comprehensive tests

- Implement ROUND with optional precision parameter
- Add CEIL function for rounding up to nearest integer
- Add FLOOR function for rounding down to nearest integer
- Add ABS function for absolute values with type preservation
- Support all numeric types (int32, int64, float32, double)
- Comprehensive test suite with 20+ test cases covering:
  - Positive/negative numbers
  - Integer/float type preservation
  - Precision handling for ROUND
  - Null value error handling
  - Edge cases (zero, large numbers)

All tests passing 

* feat: Add date/time functions CURRENT_DATE, CURRENT_TIMESTAMP, EXTRACT with comprehensive tests

- Implement CURRENT_DATE returning YYYY-MM-DD format
- Add CURRENT_TIMESTAMP returning TimestampValue with microseconds
- Add CURRENT_TIME returning HH:MM:SS format
- Add NOW() as alias for CURRENT_TIMESTAMP
- Implement comprehensive EXTRACT function supporting:
  - YEAR, MONTH, DAY, HOUR, MINUTE, SECOND
  - QUARTER, WEEK, DOY (day of year), DOW (day of week)
  - EPOCH (Unix timestamp)
- Support multiple input formats:
  - TimestampValue (microseconds)
  - String dates (multiple formats)
  - Unix timestamps (int64 seconds)
- Comprehensive test suite with 15+ test cases covering:
  - All date/time constants
  - Extract from different value types
  - Error handling for invalid inputs
  - Timezone handling

All tests passing 

* feat: Add DATE_TRUNC function with comprehensive tests

- Implement comprehensive DATE_TRUNC function supporting:
  - Time precisions: microsecond, millisecond, second, minute, hour
  - Date precisions: day, week, month, quarter, year, decade, century, millennium
  - Support both singular and plural forms (e.g., 'minute' and 'minutes')
- Enhanced date/time parsing with proper timezone handling:
  - Assume local timezone for non-timezone string formats
  - Support UTC formats with explicit timezone indicators
  - Consistent behavior between parsing and truncation
- Comprehensive test suite with 11 test cases covering:
  - All supported precisions from microsecond to year
  - Multiple input types (TimestampValue, string dates)
  - Edge cases (null values, invalid precisions)
  - Timezone consistency validation

All tests passing 

* feat: Add comprehensive string functions with extensive tests

Implemented String Functions:
- LENGTH: Get string length (supports all value types)
- UPPER/LOWER: Case conversion
- TRIM/LTRIM/RTRIM: Whitespace removal (space, tab, newline, carriage return)
- SUBSTRING: Extract substring with optional length (SQL 1-based indexing)
- CONCAT: Concatenate multiple values (supports mixed types, skips nulls)
- REPLACE: Replace all occurrences of substring
- POSITION: Find substring position (1-based, 0 if not found)
- LEFT/RIGHT: Extract leftmost/rightmost characters
- REVERSE: Reverse string with proper Unicode support

Key Features:
- Robust type conversion (string, int, float, bool, bytes)
- Unicode-safe operations (proper rune handling in REVERSE)
- SQL-compatible indexing (1-based for SUBSTRING, POSITION)
- Comprehensive error handling with descriptive messages
- Mixed-type support (e.g., CONCAT number with string)

Helper Functions:
- valueToString: Convert any schema_pb.Value to string
- valueToInt64: Convert numeric values to int64

Comprehensive test suite with 25+ test cases covering:
- All string functions with typical use cases
- Type conversion scenarios (numbers, booleans)
- Edge cases (empty strings, null values, Unicode)
- Error conditions and boundary testing

All tests passing 

* refactor: Split sql_functions.go into smaller, focused files

**File Structure Before:**
- sql_functions.go (850+ lines)
- sql_functions_test.go (1,205+ lines)

**File Structure After:**
- function_helpers.go (105 lines) - shared utility functions
- arithmetic_functions.go (205 lines) - arithmetic operators & math functions
- datetime_functions.go (170 lines) - date/time functions & constants
- string_functions.go (335 lines) - string manipulation functions
- arithmetic_functions_test.go (560 lines) - tests for arithmetic & math
- datetime_functions_test.go (370 lines) - tests for date/time functions
- string_functions_test.go (270 lines) - tests for string functions

**Benefits:**
 Better organization by functional domain
 Easier to find and maintain specific function types
 Smaller, more manageable file sizes
 Clear separation of concerns
 Improved code readability and navigation
 All tests passing - no functionality lost

**Total:** 7 focused files (1,455 lines) vs 2 monolithic files (2,055+ lines)

This refactoring improves maintainability while preserving all functionality.

* fix: Improve test stability for date/time functions

**Problem:**
- CURRENT_TIMESTAMP test had timing race condition that could cause flaky failures
- CURRENT_DATE test could fail if run exactly at midnight boundary
- Tests were too strict about timing precision without accounting for system variations

**Root Cause:**
- Test captured before/after timestamps and expected function result to be exactly between them
- No tolerance for clock precision differences, NTP adjustments, or system timing variations
- Date boundary race condition around midnight transitions

**Solution:**
 **CURRENT_TIMESTAMP test**: Added 100ms tolerance buffer to account for:
  - Clock precision differences between time.Now() calls
  - System timing variations and NTP corrections
  - Microsecond vs nanosecond precision differences

 **CURRENT_DATE test**: Enhanced to handle midnight boundary crossings:
  - Captures date before and after function call
  - Accepts either date value in case of midnight transition
  - Prevents false failures during overnight test runs

**Testing:**
- Verified with repeated test runs (5x iterations) - all pass consistently
- Full test suite passes - no regressions introduced
- Tests are now robust against timing edge cases

**Impact:**
🚀 **Eliminated flaky test failures** while maintaining function correctness validation
🔧 **Production-ready testing** that works across different system environments
 **CI/CD reliability** - tests won't fail due to timing variations

* heap sort the data sources

* int overflow

* Update README.md

* redirect GetUnflushedMessages to brokers hosting the topic partition

* Update postgres-examples/README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* clean up

* support limit with offset

* Update SQL_FEATURE_PLAN.md

* limit with offset

* ensure int conversion correctness

* Update weed/query/engine/hybrid_message_scanner.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* avoid closing closed channel

* support string concatenation ||

* int range

* using consts; avoid test data in production binary

* fix tests

* Update SQL_FEATURE_PLAN.md

* fix "use db"

* address comments

* fix comments

* Update mocks_test.go

* comment

* improve docker build

* normal if no partitions found

* fix build docker

* Update SQL_FEATURE_PLAN.md

* upgrade to raft v1.1.4 resolving race in leader

* raft 1.1.5

* Update SQL_FEATURE_PLAN.md

* Revert "raft 1.1.5"

This reverts commit 5f3bdfadbf.

* Revert "upgrade to raft v1.1.4 resolving race in leader"

This reverts commit fa620f0223.

* Fix data race in FUSE GetAttr operation

- Add shared lock to GetAttr when accessing file handle entries
- Prevents concurrent access between Write (ExclusiveLock) and GetAttr (SharedLock)
- Fixes race on entry.Attributes.FileSize field during concurrent operations
- Write operations already use ExclusiveLock, now GetAttr uses SharedLock for consistency

Resolves race condition:
Write at weedfs_file_write.go:62 vs Read at filechunks.go:28

* Update weed/mq/broker/broker_grpc_query.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* clean up

* Update db.go

* limit with offset

* Update Makefile

* fix id*2

* fix math

* fix string function bugs and add tests

* fix string concat

* ensure empty spaces for literals

* add ttl for catalog

* fix time functions

* unused code path

* database qualifier

* refactor

* extract

* recursive functions

* add cockroachdb parser

* postgres only

* test SQLs

* fix tests

* fix count *

* fix where clause

* fix limit offset

* fix  count fast path

* fix tests

* func name

* fix database qualifier

* fix tests

* Update engine.go

* fix tests

* fix jaeger

https://github.com/advisories/GHSA-2w8w-qhg4-f78j

* remove order by, group by, join

* fix extract

* prevent single quote in the string

* skip control messages

* skip control message when converting to parquet files

* psql change database

* remove old code

* remove old parser code

* rename file

* use db

* fix alias

* add alias test

* compare int64

* fix _timestamp_ns comparing

* alias support

* fix fast path count

* rendering data sources tree

* reading data sources

* reading parquet logic types

* convert logic types to parquet

* go mod

* fmt

* skip decimal types

* use UTC

* add warning if broker fails

* add user password file

* support IN

* support INTERVAL

* _ts as timestamp column

* _ts can compare with string

* address comments

* is null / is not null

* go mod

* clean up

* restructure execution plan

* remove extra double quotes

* fix converting logical types to parquet

* decimal

* decimal support

* do not skip decimal logical types

* making row-building schema-aware and alignment-safe

Emit parquet.NullValue() for missing fields to keep row shapes aligned.
Always advance list level and safely handle nil list values.
Add toParquetValueForType(...) to coerce values to match the declared Parquet type (e.g., STRING/BYTES via byte array; numeric/string conversions for INT32/INT64/DOUBLE/FLOAT/BOOL/TIMESTAMP/DATE/TIME).
Keep nil-byte guards for ByteArray.

* tests for growslice

* do not batch

* live logs in sources can be skipped in execution plan

* go mod tidy

* Update fuse-integration.yml

* Update Makefile

* fix deprecated

* fix deprecated

* remove deep-clean all rows

* broker memory count

* fix FieldIndex

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-09-09 01:01:03 -07:00

1392 lines
43 KiB
Go

package engine
import (
"context"
"encoding/binary"
"errors"
"testing"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/protobuf/proto"
)
// Mock implementations for testing
type MockHybridMessageScanner struct {
mock.Mock
topic topic.Topic
}
func (m *MockHybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
args := m.Called(partitionPath)
return args.Get(0).([]*ParquetFileStats), args.Error(1)
}
type MockSQLEngine struct {
*SQLEngine
mockPartitions map[string][]string
mockParquetSourceFiles map[string]map[string]bool
mockLiveLogRowCounts map[string]int64
mockColumnStats map[string]map[string]*ParquetColumnStats
}
func NewMockSQLEngine() *MockSQLEngine {
return &MockSQLEngine{
SQLEngine: &SQLEngine{
catalog: &SchemaCatalog{
databases: make(map[string]*DatabaseInfo),
currentDatabase: "test",
},
},
mockPartitions: make(map[string][]string),
mockParquetSourceFiles: make(map[string]map[string]bool),
mockLiveLogRowCounts: make(map[string]int64),
mockColumnStats: make(map[string]map[string]*ParquetColumnStats),
}
}
func (m *MockSQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) {
key := namespace + "." + topicName
if partitions, exists := m.mockPartitions[key]; exists {
return partitions, nil
}
return []string{"partition-1", "partition-2"}, nil
}
func (m *MockSQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool {
if len(fileStats) == 0 {
return make(map[string]bool)
}
return map[string]bool{"converted-log-1": true}
}
func (m *MockSQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partition string, parquetSources map[string]bool) (int64, error) {
if count, exists := m.mockLiveLogRowCounts[partition]; exists {
return count, nil
}
return 25, nil
}
func (m *MockSQLEngine) computeLiveLogMinMax(partition, column string, parquetSources map[string]bool) (interface{}, interface{}, error) {
switch column {
case "id":
return int64(1), int64(50), nil
case "value":
return 10.5, 99.9, nil
default:
return nil, nil, nil
}
}
func (m *MockSQLEngine) getSystemColumnGlobalMin(column string, allFileStats map[string][]*ParquetFileStats) interface{} {
return int64(1000000000)
}
func (m *MockSQLEngine) getSystemColumnGlobalMax(column string, allFileStats map[string][]*ParquetFileStats) interface{} {
return int64(2000000000)
}
func createMockColumnStats(column string, minVal, maxVal interface{}) *ParquetColumnStats {
return &ParquetColumnStats{
ColumnName: column,
MinValue: convertToSchemaValue(minVal),
MaxValue: convertToSchemaValue(maxVal),
NullCount: 0,
}
}
func convertToSchemaValue(val interface{}) *schema_pb.Value {
switch v := val.(type) {
case int64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}}
case float64:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}
case string:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}}
}
return nil
}
// Test FastPathOptimizer
func TestFastPathOptimizer_DetermineStrategy(t *testing.T) {
engine := NewMockSQLEngine()
optimizer := NewFastPathOptimizer(engine.SQLEngine)
tests := []struct {
name string
aggregations []AggregationSpec
expected AggregationStrategy
}{
{
name: "Supported aggregations",
aggregations: []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
{Function: FuncMAX, Column: "id"},
{Function: FuncMIN, Column: "value"},
},
expected: AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
UnsupportedSpecs: []AggregationSpec{},
},
},
{
name: "Unsupported aggregation",
aggregations: []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
{Function: FuncAVG, Column: "value"}, // Not supported
},
expected: AggregationStrategy{
CanUseFastPath: false,
Reason: "unsupported_aggregation_functions",
},
},
{
name: "Empty aggregations",
aggregations: []AggregationSpec{},
expected: AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
UnsupportedSpecs: []AggregationSpec{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
strategy := optimizer.DetermineStrategy(tt.aggregations)
assert.Equal(t, tt.expected.CanUseFastPath, strategy.CanUseFastPath)
assert.Equal(t, tt.expected.Reason, strategy.Reason)
if !tt.expected.CanUseFastPath {
assert.NotEmpty(t, strategy.UnsupportedSpecs)
}
})
}
}
// Test AggregationComputer
func TestAggregationComputer_ComputeFastPathAggregations(t *testing.T) {
engine := NewMockSQLEngine()
computer := NewAggregationComputer(engine.SQLEngine)
dataSources := &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/topic1/partition-1": {
{
RowCount: 30,
ColumnStats: map[string]*ParquetColumnStats{
"id": createMockColumnStats("id", int64(10), int64(40)),
},
},
},
},
ParquetRowCount: 30,
LiveLogRowCount: 25,
PartitionsCount: 1,
}
partitions := []string{"/topics/test/topic1/partition-1"}
tests := []struct {
name string
aggregations []AggregationSpec
validate func(t *testing.T, results []AggregationResult)
}{
{
name: "COUNT aggregation",
aggregations: []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
},
validate: func(t *testing.T, results []AggregationResult) {
assert.Len(t, results, 1)
assert.Equal(t, int64(55), results[0].Count) // 30 + 25
},
},
{
name: "MAX aggregation",
aggregations: []AggregationSpec{
{Function: FuncMAX, Column: "id"},
},
validate: func(t *testing.T, results []AggregationResult) {
assert.Len(t, results, 1)
// Should be max of parquet stats (40) - mock doesn't combine with live log
assert.Equal(t, int64(40), results[0].Max)
},
},
{
name: "MIN aggregation",
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "id"},
},
validate: func(t *testing.T, results []AggregationResult) {
assert.Len(t, results, 1)
// Should be min of parquet stats (10) - mock doesn't combine with live log
assert.Equal(t, int64(10), results[0].Min)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, dataSources, partitions)
assert.NoError(t, err)
tt.validate(t, results)
})
}
}
// Test case-insensitive column lookup and null handling for MIN/MAX aggregations
func TestAggregationComputer_MinMaxEdgeCases(t *testing.T) {
engine := NewMockSQLEngine()
computer := NewAggregationComputer(engine.SQLEngine)
tests := []struct {
name string
dataSources *TopicDataSources
aggregations []AggregationSpec
validate func(t *testing.T, results []AggregationResult, err error)
}{
{
name: "Case insensitive column lookup",
dataSources: &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/partition-1": {
{
RowCount: 50,
ColumnStats: map[string]*ParquetColumnStats{
"ID": createMockColumnStats("ID", int64(5), int64(95)), // Uppercase column name
},
},
},
},
ParquetRowCount: 50,
LiveLogRowCount: 0,
PartitionsCount: 1,
},
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "id"}, // lowercase column name
{Function: FuncMAX, Column: "id"},
},
validate: func(t *testing.T, results []AggregationResult, err error) {
assert.NoError(t, err)
assert.Len(t, results, 2)
assert.Equal(t, int64(5), results[0].Min, "MIN should work with case-insensitive lookup")
assert.Equal(t, int64(95), results[1].Max, "MAX should work with case-insensitive lookup")
},
},
{
name: "Null column stats handling",
dataSources: &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/partition-1": {
{
RowCount: 50,
ColumnStats: map[string]*ParquetColumnStats{
"id": {
ColumnName: "id",
MinValue: nil, // Null min value
MaxValue: nil, // Null max value
NullCount: 50,
RowCount: 50,
},
},
},
},
},
ParquetRowCount: 50,
LiveLogRowCount: 0,
PartitionsCount: 1,
},
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "id"},
{Function: FuncMAX, Column: "id"},
},
validate: func(t *testing.T, results []AggregationResult, err error) {
assert.NoError(t, err)
assert.Len(t, results, 2)
// When stats are null, should fall back to system column or return nil
// This tests that we don't crash on null stats
},
},
{
name: "Mixed data types - string column",
dataSources: &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/partition-1": {
{
RowCount: 30,
ColumnStats: map[string]*ParquetColumnStats{
"name": createMockColumnStats("name", "Alice", "Zoe"),
},
},
},
},
ParquetRowCount: 30,
LiveLogRowCount: 0,
PartitionsCount: 1,
},
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "name"},
{Function: FuncMAX, Column: "name"},
},
validate: func(t *testing.T, results []AggregationResult, err error) {
assert.NoError(t, err)
assert.Len(t, results, 2)
assert.Equal(t, "Alice", results[0].Min)
assert.Equal(t, "Zoe", results[1].Max)
},
},
{
name: "Mixed data types - float column",
dataSources: &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/partition-1": {
{
RowCount: 25,
ColumnStats: map[string]*ParquetColumnStats{
"price": createMockColumnStats("price", float64(19.99), float64(299.50)),
},
},
},
},
ParquetRowCount: 25,
LiveLogRowCount: 0,
PartitionsCount: 1,
},
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "price"},
{Function: FuncMAX, Column: "price"},
},
validate: func(t *testing.T, results []AggregationResult, err error) {
assert.NoError(t, err)
assert.Len(t, results, 2)
assert.Equal(t, float64(19.99), results[0].Min)
assert.Equal(t, float64(299.50), results[1].Max)
},
},
{
name: "Column not found in parquet stats",
dataSources: &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/partition-1": {
{
RowCount: 20,
ColumnStats: map[string]*ParquetColumnStats{
"id": createMockColumnStats("id", int64(1), int64(100)),
// Note: "nonexistent_column" is not in stats
},
},
},
},
ParquetRowCount: 20,
LiveLogRowCount: 10, // Has live logs to fall back to
PartitionsCount: 1,
},
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "nonexistent_column"},
{Function: FuncMAX, Column: "nonexistent_column"},
},
validate: func(t *testing.T, results []AggregationResult, err error) {
assert.NoError(t, err)
assert.Len(t, results, 2)
// Should fall back to live log processing or return nil
// The key is that it shouldn't crash
},
},
{
name: "Multiple parquet files with different ranges",
dataSources: &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/partition-1": {
{
RowCount: 30,
ColumnStats: map[string]*ParquetColumnStats{
"score": createMockColumnStats("score", int64(10), int64(50)),
},
},
{
RowCount: 40,
ColumnStats: map[string]*ParquetColumnStats{
"score": createMockColumnStats("score", int64(5), int64(75)), // Lower min, higher max
},
},
},
},
ParquetRowCount: 70,
LiveLogRowCount: 0,
PartitionsCount: 1,
},
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "score"},
{Function: FuncMAX, Column: "score"},
},
validate: func(t *testing.T, results []AggregationResult, err error) {
assert.NoError(t, err)
assert.Len(t, results, 2)
assert.Equal(t, int64(5), results[0].Min, "Should find global minimum across all files")
assert.Equal(t, int64(75), results[1].Max, "Should find global maximum across all files")
},
},
}
partitions := []string{"/topics/test/partition-1"}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, tt.dataSources, partitions)
tt.validate(t, results, err)
})
}
}
// Test the specific bug where MIN/MAX was returning empty values
func TestAggregationComputer_MinMaxEmptyValuesBugFix(t *testing.T) {
engine := NewMockSQLEngine()
computer := NewAggregationComputer(engine.SQLEngine)
// This test specifically addresses the bug where MIN/MAX returned empty
// due to improper null checking and extraction logic
dataSources := &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/test-topic/partition1": {
{
RowCount: 100,
ColumnStats: map[string]*ParquetColumnStats{
"id": {
ColumnName: "id",
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 0}}, // Min should be 0
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 99}}, // Max should be 99
NullCount: 0,
RowCount: 100,
},
},
},
},
},
ParquetRowCount: 100,
LiveLogRowCount: 0, // No live logs, pure parquet stats
PartitionsCount: 1,
}
partitions := []string{"/topics/test/test-topic/partition1"}
tests := []struct {
name string
aggregSpec AggregationSpec
expected interface{}
}{
{
name: "MIN should return 0 not empty",
aggregSpec: AggregationSpec{Function: FuncMIN, Column: "id"},
expected: int32(0), // Should extract the actual minimum value
},
{
name: "MAX should return 99 not empty",
aggregSpec: AggregationSpec{Function: FuncMAX, Column: "id"},
expected: int32(99), // Should extract the actual maximum value
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
results, err := computer.ComputeFastPathAggregations(ctx, []AggregationSpec{tt.aggregSpec}, dataSources, partitions)
assert.NoError(t, err)
assert.Len(t, results, 1)
// Verify the result is not nil/empty
if tt.aggregSpec.Function == FuncMIN {
assert.NotNil(t, results[0].Min, "MIN result should not be nil")
assert.Equal(t, tt.expected, results[0].Min)
} else if tt.aggregSpec.Function == FuncMAX {
assert.NotNil(t, results[0].Max, "MAX result should not be nil")
assert.Equal(t, tt.expected, results[0].Max)
}
})
}
}
// Test the formatAggregationResult function with MIN/MAX edge cases
func TestSQLEngine_FormatAggregationResult_MinMax(t *testing.T) {
engine := NewTestSQLEngine()
tests := []struct {
name string
spec AggregationSpec
result AggregationResult
expected string
}{
{
name: "MIN with zero value should not be empty",
spec: AggregationSpec{Function: FuncMIN, Column: "id"},
result: AggregationResult{Min: int32(0)},
expected: "0",
},
{
name: "MAX with large value",
spec: AggregationSpec{Function: FuncMAX, Column: "id"},
result: AggregationResult{Max: int32(99)},
expected: "99",
},
{
name: "MIN with negative value",
spec: AggregationSpec{Function: FuncMIN, Column: "score"},
result: AggregationResult{Min: int64(-50)},
expected: "-50",
},
{
name: "MAX with float value",
spec: AggregationSpec{Function: FuncMAX, Column: "price"},
result: AggregationResult{Max: float64(299.99)},
expected: "299.99",
},
{
name: "MIN with string value",
spec: AggregationSpec{Function: FuncMIN, Column: "name"},
result: AggregationResult{Min: "Alice"},
expected: "Alice",
},
{
name: "MIN with nil should return NULL",
spec: AggregationSpec{Function: FuncMIN, Column: "missing"},
result: AggregationResult{Min: nil},
expected: "", // NULL values display as empty
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sqlValue := engine.formatAggregationResult(tt.spec, tt.result)
assert.Equal(t, tt.expected, sqlValue.String())
})
}
}
// Test the direct formatAggregationResult scenario that was originally broken
func TestSQLEngine_MinMaxBugFixIntegration(t *testing.T) {
// This test focuses on the core bug fix without the complexity of table discovery
// It directly tests the scenario where MIN/MAX returned empty due to the bug
engine := NewTestSQLEngine()
// Test the direct formatting path that was failing
tests := []struct {
name string
aggregSpec AggregationSpec
aggResult AggregationResult
expectedEmpty bool
expectedValue string
}{
{
name: "MIN with zero should not be empty (the original bug)",
aggregSpec: AggregationSpec{Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
aggResult: AggregationResult{Min: int32(0)}, // This was returning empty before fix
expectedEmpty: false,
expectedValue: "0",
},
{
name: "MAX with valid value should not be empty",
aggregSpec: AggregationSpec{Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
aggResult: AggregationResult{Max: int32(99)},
expectedEmpty: false,
expectedValue: "99",
},
{
name: "MIN with negative value should work",
aggregSpec: AggregationSpec{Function: FuncMIN, Column: "score", Alias: "MIN(score)"},
aggResult: AggregationResult{Min: int64(-10)},
expectedEmpty: false,
expectedValue: "-10",
},
{
name: "MIN with nil should be empty (expected behavior)",
aggregSpec: AggregationSpec{Function: FuncMIN, Column: "missing", Alias: "MIN(missing)"},
aggResult: AggregationResult{Min: nil},
expectedEmpty: true,
expectedValue: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test the formatAggregationResult function directly
sqlValue := engine.formatAggregationResult(tt.aggregSpec, tt.aggResult)
result := sqlValue.String()
if tt.expectedEmpty {
assert.Empty(t, result, "Result should be empty for nil values")
} else {
assert.NotEmpty(t, result, "Result should not be empty")
assert.Equal(t, tt.expectedValue, result)
}
})
}
}
// Test the tryFastParquetAggregation method specifically for the bug
func TestSQLEngine_FastParquetAggregationBugFix(t *testing.T) {
// This test verifies that the fast path aggregation logic works correctly
// and doesn't return nil/empty values when it should return actual data
engine := NewMockSQLEngine()
computer := NewAggregationComputer(engine.SQLEngine)
// Create realistic data sources that mimic the user's scenario
dataSources := &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630": {
{
RowCount: 100,
ColumnStats: map[string]*ParquetColumnStats{
"id": {
ColumnName: "id",
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 0}},
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 99}},
NullCount: 0,
RowCount: 100,
},
},
},
},
},
ParquetRowCount: 100,
LiveLogRowCount: 0, // Pure parquet scenario
PartitionsCount: 1,
}
partitions := []string{"/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630"}
tests := []struct {
name string
aggregations []AggregationSpec
validateResults func(t *testing.T, results []AggregationResult)
}{
{
name: "Single MIN aggregation should return value not nil",
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
},
validateResults: func(t *testing.T, results []AggregationResult) {
assert.Len(t, results, 1)
assert.NotNil(t, results[0].Min, "MIN result should not be nil")
assert.Equal(t, int32(0), results[0].Min, "MIN should return the correct minimum value")
},
},
{
name: "Single MAX aggregation should return value not nil",
aggregations: []AggregationSpec{
{Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
},
validateResults: func(t *testing.T, results []AggregationResult) {
assert.Len(t, results, 1)
assert.NotNil(t, results[0].Max, "MAX result should not be nil")
assert.Equal(t, int32(99), results[0].Max, "MAX should return the correct maximum value")
},
},
{
name: "Combined MIN/MAX should both return values",
aggregations: []AggregationSpec{
{Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
{Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
},
validateResults: func(t *testing.T, results []AggregationResult) {
assert.Len(t, results, 2)
assert.NotNil(t, results[0].Min, "MIN result should not be nil")
assert.NotNil(t, results[1].Max, "MAX result should not be nil")
assert.Equal(t, int32(0), results[0].Min)
assert.Equal(t, int32(99), results[1].Max)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, dataSources, partitions)
assert.NoError(t, err, "ComputeFastPathAggregations should not error")
tt.validateResults(t, results)
})
}
}
// Test ExecutionPlanBuilder
func TestExecutionPlanBuilder_BuildAggregationPlan(t *testing.T) {
engine := NewMockSQLEngine()
builder := NewExecutionPlanBuilder(engine.SQLEngine)
// Parse a simple SELECT statement using the native parser
stmt, err := ParseSQL("SELECT COUNT(*) FROM test_topic")
assert.NoError(t, err)
selectStmt := stmt.(*SelectStatement)
aggregations := []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
}
strategy := AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
}
dataSources := &TopicDataSources{
ParquetRowCount: 100,
LiveLogRowCount: 50,
PartitionsCount: 3,
ParquetFiles: map[string][]*ParquetFileStats{
"partition-1": {{RowCount: 50}},
"partition-2": {{RowCount: 50}},
},
}
plan := builder.BuildAggregationPlan(selectStmt, aggregations, strategy, dataSources)
assert.Equal(t, "SELECT", plan.QueryType)
assert.Equal(t, "hybrid_fast_path", plan.ExecutionStrategy)
assert.Contains(t, plan.DataSources, "parquet_stats")
assert.Contains(t, plan.DataSources, "live_logs")
assert.Equal(t, 3, plan.PartitionsScanned)
assert.Equal(t, 2, plan.ParquetFilesScanned)
assert.Contains(t, plan.OptimizationsUsed, "parquet_statistics")
assert.Equal(t, []string{"COUNT(*)"}, plan.Aggregations)
assert.Equal(t, int64(50), plan.TotalRowsProcessed) // Only live logs scanned
}
// Test Error Types
func TestErrorTypes(t *testing.T) {
t.Run("AggregationError", func(t *testing.T) {
err := AggregationError{
Operation: "MAX",
Column: "id",
Cause: errors.New("column not found"),
}
expected := "aggregation error in MAX(id): column not found"
assert.Equal(t, expected, err.Error())
})
t.Run("DataSourceError", func(t *testing.T) {
err := DataSourceError{
Source: "partition_discovery:test.topic1",
Cause: errors.New("network timeout"),
}
expected := "data source error in partition_discovery:test.topic1: network timeout"
assert.Equal(t, expected, err.Error())
})
t.Run("OptimizationError", func(t *testing.T) {
err := OptimizationError{
Strategy: "fast_path_aggregation",
Reason: "unsupported function: AVG",
}
expected := "optimization failed for fast_path_aggregation: unsupported function: AVG"
assert.Equal(t, expected, err.Error())
})
}
// Integration Tests
func TestIntegration_FastPathOptimization(t *testing.T) {
engine := NewMockSQLEngine()
// Setup components
optimizer := NewFastPathOptimizer(engine.SQLEngine)
computer := NewAggregationComputer(engine.SQLEngine)
// Mock data setup
aggregations := []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
{Function: FuncMAX, Column: "id"},
}
// Step 1: Determine strategy
strategy := optimizer.DetermineStrategy(aggregations)
assert.True(t, strategy.CanUseFastPath)
// Step 2: Mock data sources
dataSources := &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/topic1/partition-1": {{
RowCount: 75,
ColumnStats: map[string]*ParquetColumnStats{
"id": createMockColumnStats("id", int64(1), int64(100)),
},
}},
},
ParquetRowCount: 75,
LiveLogRowCount: 25,
PartitionsCount: 1,
}
partitions := []string{"/topics/test/topic1/partition-1"}
// Step 3: Compute aggregations
ctx := context.Background()
results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
assert.NoError(t, err)
assert.Len(t, results, 2)
assert.Equal(t, int64(100), results[0].Count) // 75 + 25
assert.Equal(t, int64(100), results[1].Max) // From parquet stats mock
}
func TestIntegration_FallbackToFullScan(t *testing.T) {
engine := NewMockSQLEngine()
optimizer := NewFastPathOptimizer(engine.SQLEngine)
// Unsupported aggregations
aggregations := []AggregationSpec{
{Function: "AVG", Column: "value"}, // Not supported
}
// Step 1: Strategy should reject fast path
strategy := optimizer.DetermineStrategy(aggregations)
assert.False(t, strategy.CanUseFastPath)
assert.Equal(t, "unsupported_aggregation_functions", strategy.Reason)
assert.NotEmpty(t, strategy.UnsupportedSpecs)
}
// Benchmark Tests
func BenchmarkFastPathOptimizer_DetermineStrategy(b *testing.B) {
engine := NewMockSQLEngine()
optimizer := NewFastPathOptimizer(engine.SQLEngine)
aggregations := []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
{Function: FuncMAX, Column: "id"},
{Function: "MIN", Column: "value"},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
strategy := optimizer.DetermineStrategy(aggregations)
_ = strategy.CanUseFastPath
}
}
func BenchmarkAggregationComputer_ComputeFastPathAggregations(b *testing.B) {
engine := NewMockSQLEngine()
computer := NewAggregationComputer(engine.SQLEngine)
dataSources := &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"partition-1": {{
RowCount: 1000,
ColumnStats: map[string]*ParquetColumnStats{
"id": createMockColumnStats("id", int64(1), int64(1000)),
},
}},
},
ParquetRowCount: 1000,
LiveLogRowCount: 100,
}
aggregations := []AggregationSpec{
{Function: FuncCOUNT, Column: "*"},
{Function: FuncMAX, Column: "id"},
}
partitions := []string{"partition-1"}
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
if err != nil {
b.Fatal(err)
}
_ = results
}
}
// Tests for convertLogEntryToRecordValue - Protocol Buffer parsing bug fix
func TestSQLEngine_ConvertLogEntryToRecordValue_ValidProtobuf(t *testing.T) {
engine := NewTestSQLEngine()
// Create a valid RecordValue protobuf with user data
originalRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 42}},
"name": {Kind: &schema_pb.Value_StringValue{StringValue: "test-user"}},
"score": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 95.5}},
},
}
// Serialize the protobuf (this is what MQ actually stores)
protobufData, err := proto.Marshal(originalRecord)
assert.NoError(t, err)
// Create a LogEntry with the serialized data
logEntry := &filer_pb.LogEntry{
TsNs: 1609459200000000000, // 2021-01-01 00:00:00 UTC
PartitionKeyHash: 123,
Data: protobufData, // Protocol buffer data (not JSON!)
Key: []byte("test-key-001"),
}
// Test the conversion
result, source, err := engine.convertLogEntryToRecordValue(logEntry)
// Verify no error
assert.NoError(t, err)
assert.Equal(t, "live_log", source)
assert.NotNil(t, result)
assert.NotNil(t, result.Fields)
// Verify system columns are added correctly
assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
assert.Equal(t, []byte("test-key-001"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
// Verify user data is preserved
assert.Contains(t, result.Fields, "id")
assert.Contains(t, result.Fields, "name")
assert.Contains(t, result.Fields, "score")
assert.Equal(t, int32(42), result.Fields["id"].GetInt32Value())
assert.Equal(t, "test-user", result.Fields["name"].GetStringValue())
assert.Equal(t, 95.5, result.Fields["score"].GetDoubleValue())
}
func TestSQLEngine_ConvertLogEntryToRecordValue_InvalidProtobuf(t *testing.T) {
engine := NewTestSQLEngine()
// Create LogEntry with invalid protobuf data (this would cause the original JSON parsing bug)
logEntry := &filer_pb.LogEntry{
TsNs: 1609459200000000000,
PartitionKeyHash: 123,
Data: []byte{0x17, 0x00, 0xFF, 0xFE}, // Invalid protobuf data (starts with \x17 like in the original error)
Key: []byte("test-key"),
}
// Test the conversion
result, source, err := engine.convertLogEntryToRecordValue(logEntry)
// Should return error for invalid protobuf
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to unmarshal log entry protobuf")
assert.Nil(t, result)
assert.Empty(t, source)
}
func TestSQLEngine_ConvertLogEntryToRecordValue_EmptyProtobuf(t *testing.T) {
engine := NewTestSQLEngine()
// Create a minimal valid RecordValue (empty fields)
emptyRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{},
}
protobufData, err := proto.Marshal(emptyRecord)
assert.NoError(t, err)
logEntry := &filer_pb.LogEntry{
TsNs: 1609459200000000000,
PartitionKeyHash: 456,
Data: protobufData,
Key: []byte("empty-key"),
}
// Test the conversion
result, source, err := engine.convertLogEntryToRecordValue(logEntry)
// Should succeed and add system columns
assert.NoError(t, err)
assert.Equal(t, "live_log", source)
assert.NotNil(t, result)
assert.NotNil(t, result.Fields)
// Should have system columns
assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
assert.Equal(t, []byte("empty-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
// Should have no user fields
userFieldCount := 0
for fieldName := range result.Fields {
if fieldName != SW_COLUMN_NAME_TIMESTAMP && fieldName != SW_COLUMN_NAME_KEY {
userFieldCount++
}
}
assert.Equal(t, 0, userFieldCount)
}
func TestSQLEngine_ConvertLogEntryToRecordValue_NilFieldsMap(t *testing.T) {
engine := NewTestSQLEngine()
// Create RecordValue with nil Fields map (edge case)
recordWithNilFields := &schema_pb.RecordValue{
Fields: nil, // This should be handled gracefully
}
protobufData, err := proto.Marshal(recordWithNilFields)
assert.NoError(t, err)
logEntry := &filer_pb.LogEntry{
TsNs: 1609459200000000000,
PartitionKeyHash: 789,
Data: protobufData,
Key: []byte("nil-fields-key"),
}
// Test the conversion
result, source, err := engine.convertLogEntryToRecordValue(logEntry)
// Should succeed and create Fields map
assert.NoError(t, err)
assert.Equal(t, "live_log", source)
assert.NotNil(t, result)
assert.NotNil(t, result.Fields) // Should be created by the function
// Should have system columns
assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
assert.Equal(t, []byte("nil-fields-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
}
func TestSQLEngine_ConvertLogEntryToRecordValue_SystemColumnOverride(t *testing.T) {
engine := NewTestSQLEngine()
// Create RecordValue that already has system column names (should be overridden)
recordWithSystemCols := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"user_field": {Kind: &schema_pb.Value_StringValue{StringValue: "user-data"}},
SW_COLUMN_NAME_TIMESTAMP: {Kind: &schema_pb.Value_Int64Value{Int64Value: 999999999}}, // Should be overridden
SW_COLUMN_NAME_KEY: {Kind: &schema_pb.Value_StringValue{StringValue: "old-key"}}, // Should be overridden
},
}
protobufData, err := proto.Marshal(recordWithSystemCols)
assert.NoError(t, err)
logEntry := &filer_pb.LogEntry{
TsNs: 1609459200000000000,
PartitionKeyHash: 100,
Data: protobufData,
Key: []byte("actual-key"),
}
// Test the conversion
result, source, err := engine.convertLogEntryToRecordValue(logEntry)
// Should succeed
assert.NoError(t, err)
assert.Equal(t, "live_log", source)
assert.NotNil(t, result)
// System columns should use LogEntry values, not protobuf values
assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
assert.Equal(t, []byte("actual-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
// User field should be preserved
assert.Contains(t, result.Fields, "user_field")
assert.Equal(t, "user-data", result.Fields["user_field"].GetStringValue())
}
func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) {
engine := NewTestSQLEngine()
// Test with various data types
complexRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"int32_field": {Kind: &schema_pb.Value_Int32Value{Int32Value: -42}},
"int64_field": {Kind: &schema_pb.Value_Int64Value{Int64Value: 9223372036854775807}},
"float_field": {Kind: &schema_pb.Value_FloatValue{FloatValue: 3.14159}},
"double_field": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 2.718281828}},
"bool_field": {Kind: &schema_pb.Value_BoolValue{BoolValue: true}},
"string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode 🎉"}},
"bytes_field": {Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{0x01, 0x02, 0x03}}},
},
}
protobufData, err := proto.Marshal(complexRecord)
assert.NoError(t, err)
logEntry := &filer_pb.LogEntry{
TsNs: 1609459200000000000,
PartitionKeyHash: 200,
Data: protobufData,
Key: []byte("complex-key"),
}
// Test the conversion
result, source, err := engine.convertLogEntryToRecordValue(logEntry)
// Should succeed
assert.NoError(t, err)
assert.Equal(t, "live_log", source)
assert.NotNil(t, result)
// Verify all data types are preserved
assert.Equal(t, int32(-42), result.Fields["int32_field"].GetInt32Value())
assert.Equal(t, int64(9223372036854775807), result.Fields["int64_field"].GetInt64Value())
assert.Equal(t, float32(3.14159), result.Fields["float_field"].GetFloatValue())
assert.Equal(t, 2.718281828, result.Fields["double_field"].GetDoubleValue())
assert.Equal(t, true, result.Fields["bool_field"].GetBoolValue())
assert.Equal(t, "test string with unicode 🎉", result.Fields["string_field"].GetStringValue())
assert.Equal(t, []byte{0x01, 0x02, 0x03}, result.Fields["bytes_field"].GetBytesValue())
// System columns should still be present
assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
}
// Tests for log buffer deduplication functionality
func TestSQLEngine_GetLogBufferStartFromFile_BinaryFormat(t *testing.T) {
engine := NewTestSQLEngine()
// Create sample buffer start (binary format)
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(1609459100000000001))
// Create file entry with buffer start + some chunks
entry := &filer_pb.Entry{
Name: "test-log-file",
Extended: map[string][]byte{
"buffer_start": bufferStartBytes,
},
Chunks: []*filer_pb.FileChunk{
{FileId: "chunk1", Offset: 0, Size: 1000},
{FileId: "chunk2", Offset: 1000, Size: 1000},
{FileId: "chunk3", Offset: 2000, Size: 1000},
},
}
// Test extraction
result, err := engine.getLogBufferStartFromFile(entry)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, int64(1609459100000000001), result.StartIndex)
// Test extraction works correctly with the binary format
}
func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) {
engine := NewTestSQLEngine()
// Create file entry without buffer start
entry := &filer_pb.Entry{
Name: "test-log-file",
Extended: nil,
}
// Test extraction
result, err := engine.getLogBufferStartFromFile(entry)
assert.NoError(t, err)
assert.Nil(t, result)
}
func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) {
engine := NewTestSQLEngine()
// Create file entry with invalid buffer start (wrong size)
entry := &filer_pb.Entry{
Name: "test-log-file",
Extended: map[string][]byte{
"buffer_start": []byte("invalid-binary"),
},
}
// Test extraction
result, err := engine.getLogBufferStartFromFile(entry)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid buffer_start format: expected 8 bytes")
assert.Nil(t, result)
}
func TestSQLEngine_BuildLogBufferDeduplicationMap_NoBrokerClient(t *testing.T) {
engine := NewTestSQLEngine()
engine.catalog.brokerClient = nil // Simulate no broker client
ctx := context.Background()
result, err := engine.buildLogBufferDeduplicationMap(ctx, "/topics/test/test-topic")
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
}
func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) {
// Simulate scenario: Buffer indexes are now initialized with process start time
// This tests that buffer start indexes are globally unique across server restarts
// Before server restart: Process 1 buffer start (3 chunks)
beforeRestartStart := LogBufferStart{
StartIndex: 1609459100000000000, // Process 1 start time
}
// After server restart: Process 2 buffer start (3 chunks)
afterRestartStart := LogBufferStart{
StartIndex: 1609459300000000000, // Process 2 start time (DIFFERENT)
}
// Simulate 3 chunks for each file
chunkCount := int64(3)
// Calculate end indexes for range comparison
beforeEnd := beforeRestartStart.StartIndex + chunkCount - 1 // [start, start+2]
afterStart := afterRestartStart.StartIndex // [start, start+2]
// Test range overlap detection (should NOT overlap)
overlaps := beforeRestartStart.StartIndex <= (afterStart+chunkCount-1) && beforeEnd >= afterStart
assert.False(t, overlaps, "Buffer ranges after restart should not overlap")
// Verify the start indexes are globally unique
assert.NotEqual(t, beforeRestartStart.StartIndex, afterRestartStart.StartIndex, "Start indexes should be different")
assert.Less(t, beforeEnd, afterStart, "Ranges should be completely separate")
// Expected values:
// Before restart: [1609459100000000000, 1609459100000000002]
// After restart: [1609459300000000000, 1609459300000000002]
expectedBeforeEnd := int64(1609459100000000002)
expectedAfterStart := int64(1609459300000000000)
assert.Equal(t, expectedBeforeEnd, beforeEnd)
assert.Equal(t, expectedAfterStart, afterStart)
// This demonstrates that buffer start indexes initialized with process start time
// prevent false positive duplicates across server restarts
}
func TestBrokerClient_BinaryBufferStartFormat(t *testing.T) {
// Test scenario: getBufferStartFromEntry should only support binary format
// This tests the standardized binary format for buffer_start metadata
realBrokerClient := &BrokerClient{}
// Test binary format (used by both log files and Parquet files)
binaryEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30-45",
IsDirectory: false,
Extended: map[string][]byte{
"buffer_start": func() []byte {
// Binary format: 8-byte BigEndian
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(2000001))
return buf
}(),
},
}
bufferStart := realBrokerClient.getBufferStartFromEntry(binaryEntry)
assert.NotNil(t, bufferStart)
assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start metadata")
// Test Parquet file (same binary format)
parquetEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30.parquet",
IsDirectory: false,
Extended: map[string][]byte{
"buffer_start": func() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(1500001))
return buf
}(),
},
}
bufferStart = realBrokerClient.getBufferStartFromEntry(parquetEntry)
assert.NotNil(t, bufferStart)
assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file")
// Test missing metadata
emptyEntry := &filer_pb.Entry{
Name: "no-metadata",
IsDirectory: false,
Extended: nil,
}
bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry)
assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata")
// Test invalid format (wrong size)
invalidEntry := &filer_pb.Entry{
Name: "invalid-metadata",
IsDirectory: false,
Extended: map[string][]byte{
"buffer_start": []byte("invalid"),
},
}
bufferStart = realBrokerClient.getBufferStartFromEntry(invalidEntry)
assert.Nil(t, bufferStart, "Should return nil for invalid buffer_start metadata")
}
// TestGetSQLValAlias tests the getSQLValAlias function, particularly for SQL injection prevention
func TestGetSQLValAlias(t *testing.T) {
engine := &SQLEngine{}
tests := []struct {
name string
sqlVal *SQLVal
expected string
desc string
}{
{
name: "simple string",
sqlVal: &SQLVal{
Type: StrVal,
Val: []byte("hello"),
},
expected: "'hello'",
desc: "Simple string should be wrapped in single quotes",
},
{
name: "string with single quote",
sqlVal: &SQLVal{
Type: StrVal,
Val: []byte("don't"),
},
expected: "'don''t'",
desc: "String with single quote should have the quote escaped by doubling it",
},
{
name: "string with multiple single quotes",
sqlVal: &SQLVal{
Type: StrVal,
Val: []byte("'malicious'; DROP TABLE users; --"),
},
expected: "'''malicious''; DROP TABLE users; --'",
desc: "String with SQL injection attempt should have all single quotes properly escaped",
},
{
name: "empty string",
sqlVal: &SQLVal{
Type: StrVal,
Val: []byte(""),
},
expected: "''",
desc: "Empty string should result in empty quoted string",
},
{
name: "integer value",
sqlVal: &SQLVal{
Type: IntVal,
Val: []byte("123"),
},
expected: "123",
desc: "Integer value should not be quoted",
},
{
name: "float value",
sqlVal: &SQLVal{
Type: FloatVal,
Val: []byte("123.45"),
},
expected: "123.45",
desc: "Float value should not be quoted",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := engine.getSQLValAlias(tt.sqlVal)
assert.Equal(t, tt.expected, result, tt.desc)
})
}
}