mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
* feat: Phase 1 - Add SQL query engine foundation for MQ topics Implements core SQL infrastructure with metadata operations: New Components: - SQL parser integration using github.com/xwb1989/sqlparser - Query engine framework in weed/query/engine/ - Schema catalog mapping MQ topics to SQL tables - Interactive SQL CLI command 'weed sql' Supported Operations: - SHOW DATABASES (lists MQ namespaces) - SHOW TABLES (lists MQ topics) - SQL statement parsing and routing - Error handling and result formatting Key Design Decisions: - MQ namespaces ↔ SQL databases - MQ topics ↔ SQL tables - Parquet message storage ready for querying - Backward-compatible schema evolution support Testing: - Unit tests for core engine functionality - Command integration tests - Parse error handling validation Assumptions (documented in code): - All MQ messages stored in Parquet format - Schema evolution maintains backward compatibility - MySQL-compatible SQL syntax via sqlparser - Single-threaded usage per SQL session Next Phase: DDL operations (CREATE/ALTER/DROP TABLE) * feat: Phase 2 - Add DDL operations and real MQ broker integration Implements comprehensive DDL support for MQ topic management: New Components: - Real MQ broker connectivity via BrokerClient - CREATE TABLE → ConfigureTopic gRPC calls - DROP TABLE → DeleteTopic operations - DESCRIBE table → Schema introspection - SQL type mapping (SQL ↔ MQ schema types) Enhanced Features: - Live topic discovery from MQ broker - Fallback to cached/sample data when broker unavailable - MySQL-compatible DESCRIBE output - Schema validation and error handling - CREATE TABLE with column definitions Key Infrastructure: - broker_client.go: gRPC communication with MQ broker - sql_types.go: Bidirectional SQL/MQ type conversion - describe.go: Table schema introspection - Enhanced engine.go: Full DDL routing and execution Supported SQL Operations: ✅ SHOW DATABASES, SHOW TABLES (live + fallback) ✅ CREATE TABLE table_name (col1 INT, col2 VARCHAR(50), ...) ✅ DROP TABLE table_name ✅ DESCRIBE table_name / SHOW COLUMNS FROM table_name Known Limitations: - SQL parser issues with reserved keywords (e.g., 'timestamp') - Requires running MQ broker for full functionality - ALTER TABLE not yet implemented - DeleteTopic method needs broker-side implementation Architecture Decisions: - Broker discovery via filer lock mechanism (same as shell commands) - Graceful fallback when broker unavailable - ConfigureTopic for CREATE TABLE with 6 default partitions - Schema versioning ready for ALTER TABLE support Testing: - Unit tests updated with filer address parameter - Integration tests for DDL operations - Error handling for connection failures Next Phase: SELECT query execution with Parquet scanning * fix: Resolve High Priority TODOs - Real MQ Broker Integration ✅ COMPLETED HIGH PRIORITY TODOs: 🔧 **Real FilerClient Integration** (engine.go:131) - Implemented GetFilerClient() method in BrokerClient - Added filerClientImpl with full FilerClient interface compliance - Added AdjustedUrl() and GetDataCenter() methods - Real filerClient connection replaces nil fallback 🔧 **Partition Discovery via MQ Broker** (hybrid_message_scanner.go:116) - Added ListTopicPartitions() method using topic configuration - Implemented discoverTopicPartitions() in HybridMessageScanner - Reads actual partition count from BrokerPartitionAssignments - Generates proper partition ranges based on topic.PartitionCount 📋 **Technical Fixes:** - Fixed compilation errors with undefined variables - Proper error handling with filerClientErr variable - Corrected ConfigureTopicResponse field usage (BrokerPartitionAssignments vs PartitionCount) - Complete FilerClient interface implementation 🎯 **Impact:** - SQL engine now connects to real MQ broker infrastructure - Actual topic partition discovery instead of hardcoded defaults - Production-ready broker integration with graceful fallbacks - Maintains backward compatibility with sample data when broker unavailable ✅ All tests passing - High priority TODO resolution complete! Next: Schema-aware message parsing and time filter optimization. * feat: Time Filter Extraction - Complete Performance Optimization ✅ FOURTH HIGH PRIORITY TODO COMPLETED! ⏰ **Time Filter Extraction & Push-Down Optimization** (engine.go:198-199) - Replaced hardcoded StartTimeNs=0, StopTimeNs=0 with intelligent extraction - Added extractTimeFilters() with recursive WHERE clause analysis - Smart time column detection (\_timestamp_ns, created_at, timestamp, etc.) - Comprehensive time value parsing (nanoseconds, ISO dates, datetime formats) - Operator reversal handling (column op value vs value op column) 🧠 **Intelligent WHERE Clause Processing:** - AND expressions: Combine time bounds (intersection) ✅ - OR expressions: Skip extraction (safety) ✅ - Parentheses: Recursive unwrapping ✅ - Comparison operators: >, >=, <, <=, = ✅ - Multiple time formats: nanoseconds, RFC3339, date-only, datetime ✅ 🚀 **Performance Impact:** - Push-down filtering to hybrid scanner level - Reduced data scanning at source (live logs + Parquet files) - Time-based partition pruning potential - Significant performance gains for time-series queries 📊 **Comprehensive Testing (21 tests passing):** - ✅ Time filter extraction (6 test scenarios) - ✅ Time column recognition (case-insensitive) - ✅ Time value parsing (5 formats) - ✅ Full integration with SELECT queries - ✅ Backward compatibility maintained 💡 **Real-World Query Examples:** Before: Scans ALL data, filters in memory SELECT * FROM events WHERE \_timestamp_ns > 1672531200000000000; After: Scans ONLY relevant time range at source level → StartTimeNs=1672531200000000000, StopTimeNs=0 → Massive performance improvement for large datasets! 🎯 **Production Ready Features:** - Multiple time column formats supported - Graceful fallbacks for invalid dates - OR clause safety (avoids incorrect optimization) - Comprehensive error handling **ALL MEDIUM PRIORITY TODOs NOW READY FOR NEXT PHASEtest ./weed/query/engine/ -v* 🎉 * feat: Extended WHERE Operators - Complete Advanced Filtering ✅ **EXTENDED WHERE OPERATORS IMPLEMENTEDtest ./weed/query/engine/ -v | grep -E PASS * feat: Enhanced SQL CLI Experience ✅ COMPLETE ENHANCED CLI IMPLEMENTATION: 🚀 **Multiple Execution Modes:** - Interactive shell with enhanced prompts and context - Single query execution: --query 'SQL' --output format - Batch file processing: --file queries.sql --output csv - Database context switching: --database dbname 📊 **Multi-Format Output:** - Table format (ASCII) - default for interactive - JSON format - structured data for programmatic use - CSV format - spreadsheet-friendly output - Smart auto-detection based on execution mode ⚙️ **Enhanced Interactive Shell:** - Database context switching: USE database_name; - Output format switching: \format table|json|csv - Command history tracking (basic implementation) - Enhanced help with WHERE operator examples - Contextual prompts: seaweedfs:dbname> 🛠️ **Production Features:** - Comprehensive error handling (JSON + user-friendly) - Query execution timing and performance metrics - 30-second timeout protection with graceful handling - Real MQ integration with hybrid data scanning 📖 **Complete CLI Interface:** - Full flag support: --server, --interactive, --file, --output, --database, --query - Auto-detection of execution mode and output format - Structured help system with practical examples - Batch processing with multi-query file support 💡 **Advanced WHERE Integration:** All extended operators (<=, >=, !=, LIKE, IN) fully supported across all execution modes and output formats. 🎯 **Usage Examples:** - weed sql --interactive - weed sql --query 'SHOW DATABASES' --output json - weed sql --file queries.sql --output csv - weed sql --database analytics --interactive Enhanced CLI experience complete - production ready! 🚀 * Delete test_utils_test.go * fmt * integer conversion * show databases works * show tables works * Update describe.go * actual column types * Update .gitignore * scan topic messages * remove emoji * support aggregation functions * column name case insensitive, better auto column names * fmt * fix reading system fields * use parquet statistics for optimization * remove emoji * parquet file generate stats * scan all files * parquet file generation remember the sources also * fmt * sql * truncate topic * combine parquet results with live logs * explain * explain the execution plan * add tests * improve tests * skip * use mock for testing * add tests * refactor * fix after refactoring * detailed logs during explain. Fix bugs on reading live logs. * fix decoding data * save source buffer index start for log files * process buffer from brokers * filter out already flushed messages * dedup with buffer start index * explain with broker buffer * the parquet file should also remember the first buffer_start attribute from the sources * parquet file can query messages in broker memory, if log files do not exist * buffer start stored as 8 bytes * add jdbc * add postgres protocol * Revert "add jdbc" This reverts commita6e48b7690
. * hook up seaweed sql engine * setup integration test for postgres * rename to "weed db" * return fast on error * fix versioning * address comments * address some comments * column name can be on left or right in where conditions * avoid sample data * remove sample data * de-support alter table and drop table * address comments * read broker, logs, and parquet files * Update engine.go * address some comments * use schema instead of inferred result types * fix tests * fix todo * fix empty spaces and coercion * fmt * change to pg_query_go * fix tests * fix tests * fmt * fix: Enable CGO in Docker build for pg_query_go dependency The pg_query_go library requires CGO to be enabled as it wraps the libpg_query C library. Added gcc and musl-dev dependencies to the Docker build for proper compilation. * feat: Replace pg_query_go with lightweight SQL parser (no CGO required) - Remove github.com/pganalyze/pg_query_go/v6 dependency to avoid CGO requirement - Implement lightweight SQL parser for basic SELECT, SHOW, and DDL statements - Fix operator precedence in WHERE clause parsing (handle AND/OR before comparisons) - Support INTEGER, FLOAT, and STRING literals in WHERE conditions - All SQL engine tests passing with new parser - PostgreSQL integration tests can now build without CGO The lightweight parser handles the essential SQL features needed for the SeaweedFS query engine while maintaining compatibility and avoiding CGO dependencies that caused Docker build issues. * feat: Add Parquet logical types to mq_schema.proto Added support for Parquet logical types in SeaweedFS message queue schema: - TIMESTAMP: UTC timestamp in microseconds since epoch with timezone flag - DATE: Date as days since Unix epoch (1970-01-01) - DECIMAL: Arbitrary precision decimal with configurable precision/scale - TIME: Time of day in microseconds since midnight These types enable advanced analytics features: - Time-based filtering and window functions - Date arithmetic and year/month/day extraction - High-precision numeric calculations - Proper time zone handling for global deployments Regenerated protobuf Go code with new scalar types and value messages. * feat: Enable publishers to use Parquet logical types Enhanced MQ publishers to utilize the new logical types: - Updated convertToRecordValue() to use TimestampValue instead of string RFC3339 - Added DateValue support for birth_date field (days since epoch) - Added DecimalValue support for precise_amount field with configurable precision/scale - Enhanced UserEvent struct with PreciseAmount and BirthDate fields - Added convertToDecimal() helper using big.Rat for precise decimal conversion - Updated test data generator to produce varied birth dates (1970-2005) and precise amounts Publishers now generate structured data with proper logical types: - ✅ TIMESTAMP: Microsecond precision UTC timestamps - ✅ DATE: Birth dates as days since Unix epoch - ✅ DECIMAL: Precise amounts with 18-digit precision, 4-decimal scale Successfully tested with PostgreSQL integration - all topics created with logical type data. * feat: Add logical type support to SQL query engine Extended SQL engine to handle new Parquet logical types: - Added TimestampValue comparison support (microsecond precision) - Added DateValue comparison support (days since epoch) - Added DecimalValue comparison support with string conversion - Added TimeValue comparison support (microseconds since midnight) - Enhanced valuesEqual(), valueLessThan(), valueGreaterThan() functions - Added decimalToString() helper for precise decimal-to-string conversion - Imported math/big for arbitrary precision decimal handling The SQL engine can now: - ✅ Compare TIMESTAMP values for filtering (e.g., WHERE timestamp > 1672531200000000000) - ✅ Compare DATE values for date-based queries (e.g., WHERE birth_date >= 12345) - ✅ Compare DECIMAL values for precise financial calculations - ✅ Compare TIME values for time-of-day filtering Next: Add YEAR(), MONTH(), DAY() extraction functions for date analytics. * feat: Add window function foundation with timestamp support Added comprehensive foundation for SQL window functions with timestamp analytics: Core Window Function Types: - WindowSpec with PartitionBy and OrderBy support - WindowFunction struct for ROW_NUMBER, RANK, LAG, LEAD - OrderByClause for timestamp-based ordering - Extended SelectStatement to support WindowFunctions field Timestamp Analytics Functions: ✅ ApplyRowNumber() - ROW_NUMBER() OVER (ORDER BY timestamp) ✅ ExtractYear() - Extract year from TIMESTAMP logical type ✅ ExtractMonth() - Extract month from TIMESTAMP logical type ✅ ExtractDay() - Extract day from TIMESTAMP logical type ✅ FilterByYear() - Filter records by timestamp year Foundation for Advanced Window Functions: - LAG/LEAD for time-series access to previous/next values - RANK/DENSE_RANK for temporal ranking - FIRST_VALUE/LAST_VALUE for window boundaries - PARTITION BY support for grouped analytics This enables sophisticated time-series analytics like: - SELECT *, ROW_NUMBER() OVER (ORDER BY timestamp) FROM user_events WHERE EXTRACT(YEAR FROM timestamp) = 2024 - Trend analysis over time windows - Session analytics with LAG/LEAD functions - Time-based ranking and percentiles Ready for production time-series analytics with proper timestamp logical type support! 🚀 * fmt * fix * fix describe issue * fix tests, avoid panic * no more mysql * timeout client connections * Update SQL_FEATURE_PLAN.md * handling errors * remove sleep * fix splitting multiple SQLs * fixes * fmt * fix * Update weed/util/log_buffer/log_buffer.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update SQL_FEATURE_PLAN.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * code reuse * fix * fix * feat: Add basic arithmetic operators (+, -, *, /, %) with comprehensive tests - Implement EvaluateArithmeticExpression with support for all basic operators - Handle type conversions between int, float, string, and boolean - Add proper error handling for division/modulo by zero - Include 14 comprehensive test cases covering all edge cases - Support mixed type arithmetic (int + float, string numbers, etc.) All tests passing ✅ * feat: Add mathematical functions ROUND, CEIL, FLOOR, ABS with comprehensive tests - Implement ROUND with optional precision parameter - Add CEIL function for rounding up to nearest integer - Add FLOOR function for rounding down to nearest integer - Add ABS function for absolute values with type preservation - Support all numeric types (int32, int64, float32, double) - Comprehensive test suite with 20+ test cases covering: - Positive/negative numbers - Integer/float type preservation - Precision handling for ROUND - Null value error handling - Edge cases (zero, large numbers) All tests passing ✅ * feat: Add date/time functions CURRENT_DATE, CURRENT_TIMESTAMP, EXTRACT with comprehensive tests - Implement CURRENT_DATE returning YYYY-MM-DD format - Add CURRENT_TIMESTAMP returning TimestampValue with microseconds - Add CURRENT_TIME returning HH:MM:SS format - Add NOW() as alias for CURRENT_TIMESTAMP - Implement comprehensive EXTRACT function supporting: - YEAR, MONTH, DAY, HOUR, MINUTE, SECOND - QUARTER, WEEK, DOY (day of year), DOW (day of week) - EPOCH (Unix timestamp) - Support multiple input formats: - TimestampValue (microseconds) - String dates (multiple formats) - Unix timestamps (int64 seconds) - Comprehensive test suite with 15+ test cases covering: - All date/time constants - Extract from different value types - Error handling for invalid inputs - Timezone handling All tests passing ✅ * feat: Add DATE_TRUNC function with comprehensive tests - Implement comprehensive DATE_TRUNC function supporting: - Time precisions: microsecond, millisecond, second, minute, hour - Date precisions: day, week, month, quarter, year, decade, century, millennium - Support both singular and plural forms (e.g., 'minute' and 'minutes') - Enhanced date/time parsing with proper timezone handling: - Assume local timezone for non-timezone string formats - Support UTC formats with explicit timezone indicators - Consistent behavior between parsing and truncation - Comprehensive test suite with 11 test cases covering: - All supported precisions from microsecond to year - Multiple input types (TimestampValue, string dates) - Edge cases (null values, invalid precisions) - Timezone consistency validation All tests passing ✅ * feat: Add comprehensive string functions with extensive tests Implemented String Functions: - LENGTH: Get string length (supports all value types) - UPPER/LOWER: Case conversion - TRIM/LTRIM/RTRIM: Whitespace removal (space, tab, newline, carriage return) - SUBSTRING: Extract substring with optional length (SQL 1-based indexing) - CONCAT: Concatenate multiple values (supports mixed types, skips nulls) - REPLACE: Replace all occurrences of substring - POSITION: Find substring position (1-based, 0 if not found) - LEFT/RIGHT: Extract leftmost/rightmost characters - REVERSE: Reverse string with proper Unicode support Key Features: - Robust type conversion (string, int, float, bool, bytes) - Unicode-safe operations (proper rune handling in REVERSE) - SQL-compatible indexing (1-based for SUBSTRING, POSITION) - Comprehensive error handling with descriptive messages - Mixed-type support (e.g., CONCAT number with string) Helper Functions: - valueToString: Convert any schema_pb.Value to string - valueToInt64: Convert numeric values to int64 Comprehensive test suite with 25+ test cases covering: - All string functions with typical use cases - Type conversion scenarios (numbers, booleans) - Edge cases (empty strings, null values, Unicode) - Error conditions and boundary testing All tests passing ✅ * refactor: Split sql_functions.go into smaller, focused files **File Structure Before:** - sql_functions.go (850+ lines) - sql_functions_test.go (1,205+ lines) **File Structure After:** - function_helpers.go (105 lines) - shared utility functions - arithmetic_functions.go (205 lines) - arithmetic operators & math functions - datetime_functions.go (170 lines) - date/time functions & constants - string_functions.go (335 lines) - string manipulation functions - arithmetic_functions_test.go (560 lines) - tests for arithmetic & math - datetime_functions_test.go (370 lines) - tests for date/time functions - string_functions_test.go (270 lines) - tests for string functions **Benefits:** ✅ Better organization by functional domain ✅ Easier to find and maintain specific function types ✅ Smaller, more manageable file sizes ✅ Clear separation of concerns ✅ Improved code readability and navigation ✅ All tests passing - no functionality lost **Total:** 7 focused files (1,455 lines) vs 2 monolithic files (2,055+ lines) This refactoring improves maintainability while preserving all functionality. * fix: Improve test stability for date/time functions **Problem:** - CURRENT_TIMESTAMP test had timing race condition that could cause flaky failures - CURRENT_DATE test could fail if run exactly at midnight boundary - Tests were too strict about timing precision without accounting for system variations **Root Cause:** - Test captured before/after timestamps and expected function result to be exactly between them - No tolerance for clock precision differences, NTP adjustments, or system timing variations - Date boundary race condition around midnight transitions **Solution:** ✅ **CURRENT_TIMESTAMP test**: Added 100ms tolerance buffer to account for: - Clock precision differences between time.Now() calls - System timing variations and NTP corrections - Microsecond vs nanosecond precision differences ✅ **CURRENT_DATE test**: Enhanced to handle midnight boundary crossings: - Captures date before and after function call - Accepts either date value in case of midnight transition - Prevents false failures during overnight test runs **Testing:** - Verified with repeated test runs (5x iterations) - all pass consistently - Full test suite passes - no regressions introduced - Tests are now robust against timing edge cases **Impact:** 🚀 **Eliminated flaky test failures** while maintaining function correctness validation 🔧 **Production-ready testing** that works across different system environments ⚡ **CI/CD reliability** - tests won't fail due to timing variations * heap sort the data sources * int overflow * Update README.md * redirect GetUnflushedMessages to brokers hosting the topic partition * Update postgres-examples/README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * clean up * support limit with offset * Update SQL_FEATURE_PLAN.md * limit with offset * ensure int conversion correctness * Update weed/query/engine/hybrid_message_scanner.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * avoid closing closed channel * support string concatenation || * int range * using consts; avoid test data in production binary * fix tests * Update SQL_FEATURE_PLAN.md * fix "use db" * address comments * fix comments * Update mocks_test.go * comment * improve docker build * normal if no partitions found * fix build docker * Update SQL_FEATURE_PLAN.md * upgrade to raft v1.1.4 resolving race in leader * raft 1.1.5 * Update SQL_FEATURE_PLAN.md * Revert "raft 1.1.5" This reverts commit5f3bdfadbf
. * Revert "upgrade to raft v1.1.4 resolving race in leader" This reverts commitfa620f0223
. * Fix data race in FUSE GetAttr operation - Add shared lock to GetAttr when accessing file handle entries - Prevents concurrent access between Write (ExclusiveLock) and GetAttr (SharedLock) - Fixes race on entry.Attributes.FileSize field during concurrent operations - Write operations already use ExclusiveLock, now GetAttr uses SharedLock for consistency Resolves race condition: Write at weedfs_file_write.go:62 vs Read at filechunks.go:28 * Update weed/mq/broker/broker_grpc_query.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * clean up * Update db.go * limit with offset * Update Makefile * fix id*2 * fix math * fix string function bugs and add tests * fix string concat * ensure empty spaces for literals * add ttl for catalog * fix time functions * unused code path * database qualifier * refactor * extract * recursive functions * add cockroachdb parser * postgres only * test SQLs * fix tests * fix count * * fix where clause * fix limit offset * fix count fast path * fix tests * func name * fix database qualifier * fix tests * Update engine.go * fix tests * fix jaeger https://github.com/advisories/GHSA-2w8w-qhg4-f78j * remove order by, group by, join * fix extract * prevent single quote in the string * skip control messages * skip control message when converting to parquet files * psql change database * remove old code * remove old parser code * rename file * use db * fix alias * add alias test * compare int64 * fix _timestamp_ns comparing * alias support * fix fast path count * rendering data sources tree * reading data sources * reading parquet logic types * convert logic types to parquet * go mod * fmt * skip decimal types * use UTC * add warning if broker fails * add user password file * support IN * support INTERVAL * _ts as timestamp column * _ts can compare with string * address comments * is null / is not null * go mod * clean up * restructure execution plan * remove extra double quotes * fix converting logical types to parquet * decimal * decimal support * do not skip decimal logical types * making row-building schema-aware and alignment-safe Emit parquet.NullValue() for missing fields to keep row shapes aligned. Always advance list level and safely handle nil list values. Add toParquetValueForType(...) to coerce values to match the declared Parquet type (e.g., STRING/BYTES via byte array; numeric/string conversions for INT32/INT64/DOUBLE/FLOAT/BOOL/TIMESTAMP/DATE/TIME). Keep nil-byte guards for ByteArray. * tests for growslice * do not batch * live logs in sources can be skipped in execution plan * go mod tidy * Update fuse-integration.yml * Update Makefile * fix deprecated * fix deprecated * remove deep-clean all rows * broker memory count * fix FieldIndex --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1668 lines
52 KiB
Go
1668 lines
52 KiB
Go
package engine
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/parquet-go/parquet-go"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// HybridMessageScanner scans from ALL data sources:
|
|
// Architecture:
|
|
// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
|
|
// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
|
|
// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
|
|
// 4. Seamlessly merges data from all sources chronologically
|
|
// 5. Provides complete real-time view of all messages in a topic
|
|
type HybridMessageScanner struct {
|
|
filerClient filer_pb.FilerClient
|
|
brokerClient BrokerClientInterface // For querying unflushed data
|
|
topic topic.Topic
|
|
recordSchema *schema_pb.RecordType
|
|
parquetLevels *schema.ParquetLevels
|
|
engine *SQLEngine // Reference for system column formatting
|
|
}
|
|
|
|
// NewHybridMessageScanner creates a scanner that reads from all data sources
|
|
// This provides complete real-time message coverage including unflushed data
|
|
func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
|
|
// Check if filerClient is available
|
|
if filerClient == nil {
|
|
return nil, fmt.Errorf("filerClient is required but not available")
|
|
}
|
|
|
|
// Create topic reference
|
|
t := topic.Topic{
|
|
Namespace: namespace,
|
|
Name: topicName,
|
|
}
|
|
|
|
// Get topic schema from broker client (works with both real and mock clients)
|
|
recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get topic schema: %v", err)
|
|
}
|
|
if recordType == nil {
|
|
return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
|
|
}
|
|
|
|
// Create a copy of the recordType to avoid modifying the original
|
|
recordTypeCopy := &schema_pb.RecordType{
|
|
Fields: make([]*schema_pb.Field, len(recordType.Fields)),
|
|
}
|
|
copy(recordTypeCopy.Fields, recordType.Fields)
|
|
|
|
// Add system columns that MQ adds to all records
|
|
recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
|
|
WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
|
|
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
|
|
RecordTypeEnd()
|
|
|
|
// Convert to Parquet levels for efficient reading
|
|
parquetLevels, err := schema.ToParquetLevels(recordType)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
|
|
}
|
|
|
|
return &HybridMessageScanner{
|
|
filerClient: filerClient,
|
|
brokerClient: brokerClient,
|
|
topic: t,
|
|
recordSchema: recordType,
|
|
parquetLevels: parquetLevels,
|
|
engine: engine,
|
|
}, nil
|
|
}
|
|
|
|
// HybridScanOptions configure how the scanner reads from both live and archived data
|
|
type HybridScanOptions struct {
|
|
// Time range filtering (Unix nanoseconds)
|
|
StartTimeNs int64
|
|
StopTimeNs int64
|
|
|
|
// Column projection - if empty, select all columns
|
|
Columns []string
|
|
|
|
// Row limit - 0 means no limit
|
|
Limit int
|
|
|
|
// Row offset - 0 means no offset
|
|
Offset int
|
|
|
|
// Predicate for WHERE clause filtering
|
|
Predicate func(*schema_pb.RecordValue) bool
|
|
}
|
|
|
|
// HybridScanResult represents a message from either live logs or Parquet files
|
|
type HybridScanResult struct {
|
|
Values map[string]*schema_pb.Value // Column name -> value
|
|
Timestamp int64 // Message timestamp (_ts_ns)
|
|
Key []byte // Message key (_key)
|
|
Source string // "live_log" or "parquet_archive" or "in_memory_broker"
|
|
}
|
|
|
|
// HybridScanStats contains statistics about data sources scanned
|
|
type HybridScanStats struct {
|
|
BrokerBufferQueried bool
|
|
BrokerBufferMessages int
|
|
BufferStartIndex int64
|
|
PartitionsScanned int
|
|
LiveLogFilesScanned int // Number of live log files processed
|
|
}
|
|
|
|
// ParquetColumnStats holds statistics for a single column from parquet metadata
|
|
type ParquetColumnStats struct {
|
|
ColumnName string
|
|
MinValue *schema_pb.Value
|
|
MaxValue *schema_pb.Value
|
|
NullCount int64
|
|
RowCount int64
|
|
}
|
|
|
|
// ParquetFileStats holds aggregated statistics for a parquet file
|
|
type ParquetFileStats struct {
|
|
FileName string
|
|
RowCount int64
|
|
ColumnStats map[string]*ParquetColumnStats
|
|
}
|
|
|
|
// StreamingDataSource provides a streaming interface for reading scan results
|
|
type StreamingDataSource interface {
|
|
Next() (*HybridScanResult, error) // Returns next result or nil when done
|
|
HasMore() bool // Returns true if more data available
|
|
Close() error // Clean up resources
|
|
}
|
|
|
|
// StreamingMergeItem represents an item in the priority queue for streaming merge
|
|
type StreamingMergeItem struct {
|
|
Result *HybridScanResult
|
|
SourceID int
|
|
DataSource StreamingDataSource
|
|
}
|
|
|
|
// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
|
|
type StreamingMergeHeap []*StreamingMergeItem
|
|
|
|
func (h StreamingMergeHeap) Len() int { return len(h) }
|
|
|
|
func (h StreamingMergeHeap) Less(i, j int) bool {
|
|
// Sort by timestamp (ascending order)
|
|
return h[i].Result.Timestamp < h[j].Result.Timestamp
|
|
}
|
|
|
|
func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h *StreamingMergeHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(*StreamingMergeItem))
|
|
}
|
|
|
|
func (h *StreamingMergeHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// Scan reads messages from both live logs and archived Parquet files
|
|
// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
|
|
// Assumptions:
|
|
// 1. Chronologically merges live and archived data
|
|
// 2. Applies filtering at the lowest level for efficiency
|
|
// 3. Handles schema evolution transparently
|
|
func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
|
|
results, _, err := hms.ScanWithStats(ctx, options)
|
|
return results, err
|
|
}
|
|
|
|
// ScanWithStats reads messages and returns scan statistics for execution plans
|
|
func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
|
|
var results []HybridScanResult
|
|
stats := &HybridScanStats{}
|
|
|
|
// Get all partitions for this topic via MQ broker discovery
|
|
partitions, err := hms.discoverTopicPartitions(ctx)
|
|
if err != nil {
|
|
return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
|
|
}
|
|
|
|
stats.PartitionsScanned = len(partitions)
|
|
|
|
for _, partition := range partitions {
|
|
partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
|
|
if err != nil {
|
|
return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
|
|
}
|
|
|
|
results = append(results, partitionResults...)
|
|
|
|
// Aggregate broker buffer stats
|
|
if partitionStats != nil {
|
|
if partitionStats.BrokerBufferQueried {
|
|
stats.BrokerBufferQueried = true
|
|
}
|
|
stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
|
|
if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
|
|
stats.BufferStartIndex = partitionStats.BufferStartIndex
|
|
}
|
|
}
|
|
|
|
// Apply global limit (without offset) across all partitions
|
|
// When OFFSET is used, collect more data to ensure we have enough after skipping
|
|
// Note: OFFSET will be applied at the end to avoid double-application
|
|
if options.Limit > 0 {
|
|
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
|
|
minRequired := options.Limit + options.Offset
|
|
// Small buffer only when needed to handle edge cases in distributed scanning
|
|
if options.Offset > 0 && minRequired < 10 {
|
|
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
|
|
}
|
|
if len(results) >= minRequired {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply final OFFSET and LIMIT processing (done once at the end)
|
|
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
|
|
if options.Offset > 0 || options.Limit >= 0 {
|
|
// Handle LIMIT 0 special case first
|
|
if options.Limit == 0 {
|
|
return []HybridScanResult{}, stats, nil
|
|
}
|
|
|
|
// Apply OFFSET first
|
|
if options.Offset > 0 {
|
|
if options.Offset >= len(results) {
|
|
results = []HybridScanResult{}
|
|
} else {
|
|
results = results[options.Offset:]
|
|
}
|
|
}
|
|
|
|
// Apply LIMIT after OFFSET (only if limit > 0)
|
|
if options.Limit > 0 && len(results) > options.Limit {
|
|
results = results[:options.Limit]
|
|
}
|
|
}
|
|
|
|
return results, stats, nil
|
|
}
|
|
|
|
// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
|
|
func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
|
|
results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
|
|
return results, err
|
|
}
|
|
|
|
// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
|
|
func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
|
|
var results []HybridScanResult
|
|
stats := &HybridScanStats{}
|
|
|
|
// Skip if no broker client available
|
|
if hms.brokerClient == nil {
|
|
return results, stats, nil
|
|
}
|
|
|
|
// Mark that we attempted to query broker buffer
|
|
stats.BrokerBufferQueried = true
|
|
|
|
// Step 1: Get unflushed data from broker using buffer_start-based method
|
|
// This method uses buffer_start metadata to avoid double-counting with exact precision
|
|
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
|
|
if err != nil {
|
|
// Log error but don't fail the query - continue with disk data only
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
|
|
}
|
|
// Reset queried flag on error
|
|
stats.BrokerBufferQueried = false
|
|
return results, stats, nil
|
|
}
|
|
|
|
// Capture stats for EXPLAIN
|
|
stats.BrokerBufferMessages = len(unflushedEntries)
|
|
|
|
// Debug logging for EXPLAIN mode
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
|
|
if len(unflushedEntries) > 0 {
|
|
fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
|
|
}
|
|
}
|
|
|
|
// Step 2: Process unflushed entries (already deduplicated by broker)
|
|
for _, logEntry := range unflushedEntries {
|
|
// Skip control entries without actual data
|
|
if hms.isControlEntry(logEntry) {
|
|
continue // Skip this entry
|
|
}
|
|
|
|
// Skip messages outside time range
|
|
if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
|
|
continue
|
|
}
|
|
if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
|
|
continue
|
|
}
|
|
|
|
// Convert LogEntry to RecordValue format (same as disk data)
|
|
recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
|
|
if err != nil {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
|
|
}
|
|
continue // Skip malformed messages
|
|
}
|
|
|
|
// Apply predicate filter if provided
|
|
if options.Predicate != nil && !options.Predicate(recordValue) {
|
|
continue
|
|
}
|
|
|
|
// Extract system columns for result
|
|
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
|
|
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
|
|
|
|
// Apply column projection
|
|
values := make(map[string]*schema_pb.Value)
|
|
if len(options.Columns) == 0 {
|
|
// Select all columns (excluding system columns from user view)
|
|
for name, value := range recordValue.Fields {
|
|
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
|
|
values[name] = value
|
|
}
|
|
}
|
|
} else {
|
|
// Select specified columns only
|
|
for _, columnName := range options.Columns {
|
|
if value, exists := recordValue.Fields[columnName]; exists {
|
|
values[columnName] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create result with proper source tagging
|
|
result := HybridScanResult{
|
|
Values: values,
|
|
Timestamp: timestamp,
|
|
Key: key,
|
|
Source: "live_log", // Data from broker's unflushed messages
|
|
}
|
|
|
|
results = append(results, result)
|
|
|
|
// Apply limit (accounting for offset) - collect exact amount needed
|
|
if options.Limit > 0 {
|
|
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
|
|
minRequired := options.Limit + options.Offset
|
|
// Small buffer only when needed to handle edge cases in message streaming
|
|
if options.Offset > 0 && minRequired < 10 {
|
|
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
|
|
}
|
|
if len(results) >= minRequired {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
|
|
}
|
|
|
|
return results, stats, nil
|
|
}
|
|
|
|
// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
|
|
func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
|
|
// Parse the message data as RecordValue
|
|
recordValue := &schema_pb.RecordValue{}
|
|
if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
|
|
return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
|
|
}
|
|
|
|
// Add system columns
|
|
if recordValue.Fields == nil {
|
|
recordValue.Fields = make(map[string]*schema_pb.Value)
|
|
}
|
|
|
|
// Add timestamp
|
|
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
|
|
}
|
|
|
|
return recordValue, string(msg.Key), nil
|
|
}
|
|
|
|
// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
|
|
// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
|
|
func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
|
|
if hms.filerClient == nil {
|
|
return nil, fmt.Errorf("filerClient not available for partition discovery")
|
|
}
|
|
|
|
var allPartitions []topic.Partition
|
|
var err error
|
|
|
|
// Scan the topic directory for actual partition versions (timestamped directories)
|
|
// List all version directories in the topic directory
|
|
err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
|
|
if !versionEntry.IsDirectory {
|
|
return nil // Skip non-directories
|
|
}
|
|
|
|
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
|
|
versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
|
|
if parseErr != nil {
|
|
// Skip directories that don't match the version format
|
|
return nil
|
|
}
|
|
|
|
// Scan partition directories within this version
|
|
versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
|
|
return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
|
|
if !partitionEntry.IsDirectory {
|
|
return nil // Skip non-directories
|
|
}
|
|
|
|
// Parse partition boundary from directory name (e.g., "0000-0630")
|
|
rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
|
|
if rangeStart == rangeStop {
|
|
return nil // Skip invalid partition names
|
|
}
|
|
|
|
// Create partition object
|
|
partition := topic.Partition{
|
|
RangeStart: rangeStart,
|
|
RangeStop: rangeStop,
|
|
RingSize: topic.PartitionCount,
|
|
UnixTimeNs: versionTime.UnixNano(),
|
|
}
|
|
|
|
allPartitions = append(allPartitions, partition)
|
|
return nil
|
|
})
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
|
|
}
|
|
|
|
// If no partitions found, return empty slice (valid for newly created or empty topics)
|
|
if len(allPartitions) == 0 {
|
|
fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
|
|
return []topic.Partition{}, nil
|
|
}
|
|
|
|
fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
|
|
return allPartitions, nil
|
|
}
|
|
|
|
// scanPartitionHybrid scans a specific partition using the hybrid approach
|
|
// This is where the magic happens - seamlessly reading ALL data sources:
|
|
// 1. Unflushed in-memory data from brokers (REAL-TIME)
|
|
// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
|
|
func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
|
|
results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
|
|
return results, err
|
|
}
|
|
|
|
// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
|
|
// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
|
|
// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
|
|
// - Scalable: Can handle large topics without LIMIT clauses efficiently
|
|
// - Streaming: Processes data as it arrives rather than buffering everything
|
|
func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
|
|
stats := &HybridScanStats{}
|
|
|
|
// STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
|
|
unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
|
|
if err != nil {
|
|
// Don't fail the query if broker scanning fails, but provide clear warning to user
|
|
// This ensures users are aware that results may not include the most recent data
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
|
|
} else {
|
|
fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
|
|
fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
|
|
}
|
|
} else if unflushedStats != nil {
|
|
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
|
|
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
|
|
stats.BufferStartIndex = unflushedStats.BufferStartIndex
|
|
}
|
|
|
|
// Count live log files for statistics
|
|
liveLogCount, err := hms.countLiveLogFiles(partition)
|
|
if err != nil {
|
|
// Don't fail the query, just log warning
|
|
fmt.Printf("Warning: Failed to count live log files: %v\n", err)
|
|
liveLogCount = 0
|
|
}
|
|
stats.LiveLogFilesScanned = liveLogCount
|
|
|
|
// STEP 2: Create streaming data sources for memory-efficient merge
|
|
var dataSources []StreamingDataSource
|
|
|
|
// Add unflushed data source (if we have unflushed results)
|
|
if len(unflushedResults) > 0 {
|
|
// Sort unflushed results by timestamp before creating stream
|
|
if len(unflushedResults) > 1 {
|
|
hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
|
|
}
|
|
dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
|
|
}
|
|
|
|
// Add streaming flushed data source (live logs + Parquet files)
|
|
flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
|
|
dataSources = append(dataSources, flushedDataSource)
|
|
|
|
// STEP 3: Use streaming merge for memory-efficient chronological ordering
|
|
var results []HybridScanResult
|
|
if len(dataSources) > 0 {
|
|
// Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
|
|
// For LIMIT N OFFSET M, we need to collect at least N+M rows
|
|
scanLimit := options.Limit
|
|
if options.Limit > 0 && options.Offset > 0 {
|
|
scanLimit = options.Limit + options.Offset
|
|
}
|
|
|
|
mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
|
|
if err != nil {
|
|
return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
|
|
}
|
|
results = mergedResults
|
|
}
|
|
|
|
return results, stats, nil
|
|
}
|
|
|
|
// countLiveLogFiles counts the number of live log files in a partition for statistics
|
|
func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
|
|
partitionDir := topic.PartitionDir(hms.topic, partition)
|
|
|
|
var fileCount int
|
|
err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// List all files in partition directory
|
|
request := &filer_pb.ListEntriesRequest{
|
|
Directory: partitionDir,
|
|
Prefix: "",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: true,
|
|
Limit: 10000, // reasonable limit for counting
|
|
}
|
|
|
|
stream, err := client.ListEntries(context.Background(), request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Count files that are not .parquet files (live log files)
|
|
// Live log files typically have timestamps or are named like log files
|
|
fileName := resp.Entry.Name
|
|
if !strings.HasSuffix(fileName, ".parquet") &&
|
|
!strings.HasSuffix(fileName, ".offset") &&
|
|
len(resp.Entry.Chunks) > 0 { // Has actual content
|
|
fileCount++
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return fileCount, nil
|
|
}
|
|
|
|
// isControlEntry checks if a log entry is a control entry without actual data
|
|
// Based on MQ system analysis, control entries are:
|
|
// 1. DataMessages with populated Ctrl field (publisher close signals)
|
|
// 2. Entries with empty keys (as filtered by subscriber)
|
|
// 3. Entries with no data
|
|
func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
|
|
// Skip entries with no data
|
|
if len(logEntry.Data) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Skip entries with empty keys (same logic as subscriber)
|
|
if len(logEntry.Key) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Check if this is a DataMessage with control field populated
|
|
dataMessage := &mq_pb.DataMessage{}
|
|
if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
|
|
// If it has a control field, it's a control message
|
|
if dataMessage.Ctrl != nil {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
|
|
// This handles both:
|
|
// 1. Live log entries (raw message format)
|
|
// 2. Parquet entries (already in schema_pb.RecordValue format)
|
|
func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
|
|
// Try to unmarshal as RecordValue first (Parquet format)
|
|
recordValue := &schema_pb.RecordValue{}
|
|
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
|
|
// This is an archived message from Parquet files
|
|
// FIX: Add system columns from LogEntry to RecordValue
|
|
if recordValue.Fields == nil {
|
|
recordValue.Fields = make(map[string]*schema_pb.Value)
|
|
}
|
|
|
|
// Add system columns from LogEntry
|
|
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
|
|
}
|
|
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
|
|
}
|
|
|
|
return recordValue, "parquet_archive", nil
|
|
}
|
|
|
|
// If not a RecordValue, this is raw live message data - parse with schema
|
|
return hms.parseRawMessageWithSchema(logEntry)
|
|
}
|
|
|
|
// parseRawMessageWithSchema parses raw live message data using the topic's schema
|
|
// This provides proper type conversion and field mapping instead of treating everything as strings
|
|
func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
|
|
recordValue := &schema_pb.RecordValue{
|
|
Fields: make(map[string]*schema_pb.Value),
|
|
}
|
|
|
|
// Add system columns (always present)
|
|
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
|
|
}
|
|
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
|
|
}
|
|
|
|
// Parse message data based on schema
|
|
if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
|
|
// Fallback: No schema available, treat as single "data" field
|
|
recordValue.Fields["data"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
|
|
}
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// Attempt schema-aware parsing
|
|
// Strategy 1: Try JSON parsing first (most common for live messages)
|
|
if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
|
|
// Successfully parsed as JSON, merge with system columns
|
|
for fieldName, fieldValue := range parsedRecord.Fields {
|
|
recordValue.Fields[fieldName] = fieldValue
|
|
}
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// Strategy 2: Try protobuf parsing (binary messages)
|
|
if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
|
|
// Successfully parsed as protobuf, merge with system columns
|
|
for fieldName, fieldValue := range parsedRecord.Fields {
|
|
recordValue.Fields[fieldName] = fieldValue
|
|
}
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// Strategy 3: Fallback to single field with raw data
|
|
// If schema has a single field, map the raw data to it with type conversion
|
|
if len(hms.recordSchema.Fields) == 1 {
|
|
field := hms.recordSchema.Fields[0]
|
|
convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
|
|
if err == nil {
|
|
recordValue.Fields[field.Name] = convertedValue
|
|
return recordValue, "live_log", nil
|
|
}
|
|
}
|
|
|
|
// Final fallback: treat as string data field
|
|
recordValue.Fields["data"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
|
|
}
|
|
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
|
|
func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
|
|
// Try to parse as JSON
|
|
var jsonData map[string]interface{}
|
|
if err := json.Unmarshal(data, &jsonData); err != nil {
|
|
return nil, fmt.Errorf("not valid JSON: %v", err)
|
|
}
|
|
|
|
recordValue := &schema_pb.RecordValue{
|
|
Fields: make(map[string]*schema_pb.Value),
|
|
}
|
|
|
|
// Map JSON fields to schema fields
|
|
for _, schemaField := range hms.recordSchema.Fields {
|
|
fieldName := schemaField.Name
|
|
if jsonValue, exists := jsonData[fieldName]; exists {
|
|
schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
|
|
if err != nil {
|
|
// Log conversion error but continue with other fields
|
|
continue
|
|
}
|
|
recordValue.Fields[fieldName] = schemaValue
|
|
}
|
|
}
|
|
|
|
return recordValue, nil
|
|
}
|
|
|
|
// parseProtobufMessage attempts to parse raw data as protobuf RecordValue
|
|
func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
|
|
// This might be a raw protobuf message that didn't parse correctly the first time
|
|
// Try alternative protobuf unmarshaling approaches
|
|
recordValue := &schema_pb.RecordValue{}
|
|
|
|
// Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
|
|
if err := proto.Unmarshal(data, recordValue); err == nil {
|
|
return recordValue, nil
|
|
}
|
|
|
|
// Strategy 2: Check if it's a different protobuf message type
|
|
// For now, return error as we need more specific knowledge of MQ message formats
|
|
return nil, fmt.Errorf("could not parse as protobuf RecordValue")
|
|
}
|
|
|
|
// convertRawDataToSchemaValue converts raw bytes to a specific schema type
|
|
func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
|
|
dataStr := string(data)
|
|
|
|
switch fieldType.Kind.(type) {
|
|
case *schema_pb.Type_ScalarType:
|
|
scalarType := fieldType.GetScalarType()
|
|
switch scalarType {
|
|
case schema_pb.ScalarType_STRING:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
|
|
}, nil
|
|
case schema_pb.ScalarType_INT32:
|
|
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_INT64:
|
|
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: val},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_FLOAT:
|
|
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_DOUBLE:
|
|
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BOOL:
|
|
lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
|
|
if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: true},
|
|
}, nil
|
|
} else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: false},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BYTES:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: data},
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
|
|
}
|
|
|
|
// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
|
|
func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
|
|
switch fieldType.Kind.(type) {
|
|
case *schema_pb.Type_ScalarType:
|
|
scalarType := fieldType.GetScalarType()
|
|
switch scalarType {
|
|
case schema_pb.ScalarType_STRING:
|
|
if str, ok := jsonValue.(string); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: str},
|
|
}, nil
|
|
}
|
|
// Convert other types to string
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
|
|
}, nil
|
|
case schema_pb.ScalarType_INT32:
|
|
if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_INT64:
|
|
if num, ok := jsonValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_FLOAT:
|
|
if num, ok := jsonValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_DOUBLE:
|
|
if num, ok := jsonValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BOOL:
|
|
if boolVal, ok := jsonValue.(bool); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BYTES:
|
|
if str, ok := jsonValue.(string); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
|
|
}, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
|
|
}
|
|
|
|
// ConvertToSQLResult converts HybridScanResults to SQL query results
|
|
func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
|
|
if len(results) == 0 {
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: [][]sqltypes.Value{},
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// Determine columns if not specified
|
|
if len(columns) == 0 {
|
|
columnSet := make(map[string]bool)
|
|
for _, result := range results {
|
|
for columnName := range result.Values {
|
|
columnSet[columnName] = true
|
|
}
|
|
}
|
|
|
|
columns = make([]string, 0, len(columnSet))
|
|
for columnName := range columnSet {
|
|
columns = append(columns, columnName)
|
|
}
|
|
}
|
|
|
|
// Convert to SQL rows
|
|
rows := make([][]sqltypes.Value, len(results))
|
|
for i, result := range results {
|
|
row := make([]sqltypes.Value, len(columns))
|
|
for j, columnName := range columns {
|
|
switch columnName {
|
|
case SW_COLUMN_NAME_SOURCE:
|
|
row[j] = sqltypes.NewVarChar(result.Source)
|
|
case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
|
|
// Format timestamp as proper timestamp type instead of raw nanoseconds
|
|
row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
|
|
case SW_COLUMN_NAME_KEY:
|
|
row[j] = sqltypes.NewVarBinary(string(result.Key))
|
|
default:
|
|
if value, exists := result.Values[columnName]; exists {
|
|
row[j] = convertSchemaValueToSQL(value)
|
|
} else {
|
|
row[j] = sqltypes.NULL
|
|
}
|
|
}
|
|
}
|
|
rows[i] = row
|
|
}
|
|
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: rows,
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
|
|
// Combines auto-discovered columns (from *) with explicitly requested columns
|
|
func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
|
|
if len(results) == 0 {
|
|
// For empty results, combine auto-discovered columns with explicit ones
|
|
columnSet := make(map[string]bool)
|
|
|
|
// Add explicit columns first
|
|
for _, col := range explicitColumns {
|
|
columnSet[col] = true
|
|
}
|
|
|
|
// Build final column list
|
|
columns := make([]string, 0, len(columnSet))
|
|
for col := range columnSet {
|
|
columns = append(columns, col)
|
|
}
|
|
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: [][]sqltypes.Value{},
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// Auto-discover columns from data (like SELECT *)
|
|
autoColumns := make(map[string]bool)
|
|
for _, result := range results {
|
|
for columnName := range result.Values {
|
|
autoColumns[columnName] = true
|
|
}
|
|
}
|
|
|
|
// Combine auto-discovered and explicit columns
|
|
columnSet := make(map[string]bool)
|
|
|
|
// Add auto-discovered columns first (regular data columns)
|
|
for col := range autoColumns {
|
|
columnSet[col] = true
|
|
}
|
|
|
|
// Add explicit columns (may include system columns like _source)
|
|
for _, col := range explicitColumns {
|
|
columnSet[col] = true
|
|
}
|
|
|
|
// Build final column list
|
|
columns := make([]string, 0, len(columnSet))
|
|
for col := range columnSet {
|
|
columns = append(columns, col)
|
|
}
|
|
|
|
// Convert to SQL rows
|
|
rows := make([][]sqltypes.Value, len(results))
|
|
for i, result := range results {
|
|
row := make([]sqltypes.Value, len(columns))
|
|
for j, columnName := range columns {
|
|
switch columnName {
|
|
case SW_COLUMN_NAME_TIMESTAMP:
|
|
row[j] = sqltypes.NewInt64(result.Timestamp)
|
|
case SW_COLUMN_NAME_KEY:
|
|
row[j] = sqltypes.NewVarBinary(string(result.Key))
|
|
case SW_COLUMN_NAME_SOURCE:
|
|
row[j] = sqltypes.NewVarChar(result.Source)
|
|
default:
|
|
// Regular data column
|
|
if value, exists := result.Values[columnName]; exists {
|
|
row[j] = convertSchemaValueToSQL(value)
|
|
} else {
|
|
row[j] = sqltypes.NULL
|
|
}
|
|
}
|
|
}
|
|
rows[i] = row
|
|
}
|
|
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: rows,
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// ReadParquetStatistics efficiently reads column statistics from parquet files
|
|
// without scanning the full file content - uses parquet's built-in metadata
|
|
func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
|
|
var fileStats []*ParquetFileStats
|
|
|
|
// Use the same chunk cache as the logstore package
|
|
chunkCache := chunk_cache.NewChunkCacheInMemory(256)
|
|
lookupFileIdFn := filer.LookupFn(h.filerClient)
|
|
|
|
err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
// Only process parquet files
|
|
if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
|
|
return nil
|
|
}
|
|
|
|
// Extract statistics from this parquet file
|
|
stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
|
|
if err != nil {
|
|
// Log error but continue processing other files
|
|
fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
|
|
return nil
|
|
}
|
|
|
|
if stats != nil {
|
|
fileStats = append(fileStats, stats)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return fileStats, err
|
|
}
|
|
|
|
// extractParquetFileStats extracts column statistics from a single parquet file
|
|
func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
|
|
// Create reader for the parquet file
|
|
fileSize := filer.FileSize(entry)
|
|
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
|
|
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
|
|
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
|
|
readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
|
|
|
|
// Create parquet reader - this only reads metadata, not data
|
|
parquetReader := parquet.NewReader(readerAt)
|
|
defer parquetReader.Close()
|
|
|
|
fileView := parquetReader.File()
|
|
|
|
fileStats := &ParquetFileStats{
|
|
FileName: entry.Name,
|
|
RowCount: fileView.NumRows(),
|
|
ColumnStats: make(map[string]*ParquetColumnStats),
|
|
}
|
|
|
|
// Get schema information
|
|
schema := fileView.Schema()
|
|
|
|
// Process each row group
|
|
rowGroups := fileView.RowGroups()
|
|
for _, rowGroup := range rowGroups {
|
|
columnChunks := rowGroup.ColumnChunks()
|
|
|
|
// Process each column chunk
|
|
for i, chunk := range columnChunks {
|
|
// Get column name from schema
|
|
columnName := h.getColumnNameFromSchema(schema, i)
|
|
if columnName == "" {
|
|
continue
|
|
}
|
|
|
|
// Try to get column statistics
|
|
columnIndex, err := chunk.ColumnIndex()
|
|
if err != nil {
|
|
// No column index available - skip this column
|
|
continue
|
|
}
|
|
|
|
// Extract min/max values from the first page (for simplicity)
|
|
// In a more sophisticated implementation, we could aggregate across all pages
|
|
numPages := columnIndex.NumPages()
|
|
if numPages == 0 {
|
|
continue
|
|
}
|
|
|
|
minParquetValue := columnIndex.MinValue(0)
|
|
maxParquetValue := columnIndex.MaxValue(numPages - 1)
|
|
nullCount := int64(0)
|
|
|
|
// Aggregate null counts across all pages
|
|
for pageIdx := 0; pageIdx < numPages; pageIdx++ {
|
|
nullCount += columnIndex.NullCount(pageIdx)
|
|
}
|
|
|
|
// Convert parquet values to schema_pb.Value
|
|
minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Store column statistics (aggregate across row groups if column already exists)
|
|
if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
|
|
// Update existing statistics
|
|
if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
|
|
existingStats.MinValue = minValue
|
|
}
|
|
if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
|
|
existingStats.MaxValue = maxValue
|
|
}
|
|
existingStats.NullCount += nullCount
|
|
} else {
|
|
// Create new column statistics
|
|
fileStats.ColumnStats[columnName] = &ParquetColumnStats{
|
|
ColumnName: columnName,
|
|
MinValue: minValue,
|
|
MaxValue: maxValue,
|
|
NullCount: nullCount,
|
|
RowCount: rowGroup.NumRows(),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return fileStats, nil
|
|
}
|
|
|
|
// getColumnNameFromSchema extracts column name from parquet schema by index
|
|
func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
|
|
// Get the leaf columns in order
|
|
var columnNames []string
|
|
h.collectColumnNames(schema.Fields(), &columnNames)
|
|
|
|
if columnIndex >= 0 && columnIndex < len(columnNames) {
|
|
return columnNames[columnIndex]
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// collectColumnNames recursively collects leaf column names from schema
|
|
func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
|
|
for _, field := range fields {
|
|
if len(field.Fields()) == 0 {
|
|
// This is a leaf field (no sub-fields)
|
|
*names = append(*names, field.Name())
|
|
} else {
|
|
// This is a group - recurse
|
|
h.collectColumnNames(field.Fields(), names)
|
|
}
|
|
}
|
|
}
|
|
|
|
// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
|
|
func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
|
|
switch pv.Kind() {
|
|
case parquet.Boolean:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
|
|
case parquet.Int32:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
|
|
case parquet.Int64:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
|
|
case parquet.Float:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
|
|
case parquet.Double:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
|
|
case parquet.ByteArray:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
|
|
}
|
|
}
|
|
|
|
// compareSchemaValues compares two schema_pb.Value objects
|
|
func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
|
|
if v1 == nil && v2 == nil {
|
|
return 0
|
|
}
|
|
if v1 == nil {
|
|
return -1
|
|
}
|
|
if v2 == nil {
|
|
return 1
|
|
}
|
|
|
|
// Extract raw values and compare
|
|
raw1 := h.extractRawValueFromSchema(v1)
|
|
raw2 := h.extractRawValueFromSchema(v2)
|
|
|
|
return h.compareRawValues(raw1, raw2)
|
|
}
|
|
|
|
// extractRawValueFromSchema extracts the raw value from schema_pb.Value
|
|
func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
|
|
switch v := value.Kind.(type) {
|
|
case *schema_pb.Value_BoolValue:
|
|
return v.BoolValue
|
|
case *schema_pb.Value_Int32Value:
|
|
return v.Int32Value
|
|
case *schema_pb.Value_Int64Value:
|
|
return v.Int64Value
|
|
case *schema_pb.Value_FloatValue:
|
|
return v.FloatValue
|
|
case *schema_pb.Value_DoubleValue:
|
|
return v.DoubleValue
|
|
case *schema_pb.Value_BytesValue:
|
|
return string(v.BytesValue) // Convert to string for comparison
|
|
case *schema_pb.Value_StringValue:
|
|
return v.StringValue
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// compareRawValues compares two raw values
|
|
func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
|
|
// Handle nil cases
|
|
if v1 == nil && v2 == nil {
|
|
return 0
|
|
}
|
|
if v1 == nil {
|
|
return -1
|
|
}
|
|
if v2 == nil {
|
|
return 1
|
|
}
|
|
|
|
// Compare based on type
|
|
switch val1 := v1.(type) {
|
|
case bool:
|
|
if val2, ok := v2.(bool); ok {
|
|
if val1 == val2 {
|
|
return 0
|
|
}
|
|
if val1 {
|
|
return 1
|
|
}
|
|
return -1
|
|
}
|
|
case int32:
|
|
if val2, ok := v2.(int32); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case int64:
|
|
if val2, ok := v2.(int64); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case float32:
|
|
if val2, ok := v2.(float32); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case float64:
|
|
if val2, ok := v2.(float64); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case string:
|
|
if val2, ok := v2.(string); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
}
|
|
|
|
// Default: try string comparison
|
|
str1 := fmt.Sprintf("%v", v1)
|
|
str2 := fmt.Sprintf("%v", v2)
|
|
if str1 < str2 {
|
|
return -1
|
|
} else if str1 > str2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// streamingMerge merges multiple sorted data sources using a heap-based approach
|
|
// This provides memory-efficient merging without loading all data into memory
|
|
func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
|
|
if len(dataSources) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var results []HybridScanResult
|
|
mergeHeap := &StreamingMergeHeap{}
|
|
heap.Init(mergeHeap)
|
|
|
|
// Initialize heap with first item from each data source
|
|
for i, source := range dataSources {
|
|
if source.HasMore() {
|
|
result, err := source.Next()
|
|
if err != nil {
|
|
// Close all sources and return error
|
|
for _, s := range dataSources {
|
|
s.Close()
|
|
}
|
|
return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
|
|
}
|
|
if result != nil {
|
|
heap.Push(mergeHeap, &StreamingMergeItem{
|
|
Result: result,
|
|
SourceID: i,
|
|
DataSource: source,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process results in chronological order
|
|
for mergeHeap.Len() > 0 {
|
|
// Get next chronologically ordered result
|
|
item := heap.Pop(mergeHeap).(*StreamingMergeItem)
|
|
results = append(results, *item.Result)
|
|
|
|
// Check limit
|
|
if limit > 0 && len(results) >= limit {
|
|
break
|
|
}
|
|
|
|
// Try to get next item from the same data source
|
|
if item.DataSource.HasMore() {
|
|
nextResult, err := item.DataSource.Next()
|
|
if err != nil {
|
|
// Log error but continue with other sources
|
|
fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
|
|
} else if nextResult != nil {
|
|
heap.Push(mergeHeap, &StreamingMergeItem{
|
|
Result: nextResult,
|
|
SourceID: item.SourceID,
|
|
DataSource: item.DataSource,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close all data sources
|
|
for _, source := range dataSources {
|
|
source.Close()
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
|
|
// This is used for unflushed data that is already loaded into memory
|
|
type SliceDataSource struct {
|
|
results []HybridScanResult
|
|
index int
|
|
}
|
|
|
|
func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
|
|
return &SliceDataSource{
|
|
results: results,
|
|
index: 0,
|
|
}
|
|
}
|
|
|
|
func (s *SliceDataSource) Next() (*HybridScanResult, error) {
|
|
if s.index >= len(s.results) {
|
|
return nil, nil
|
|
}
|
|
result := &s.results[s.index]
|
|
s.index++
|
|
return result, nil
|
|
}
|
|
|
|
func (s *SliceDataSource) HasMore() bool {
|
|
return s.index < len(s.results)
|
|
}
|
|
|
|
func (s *SliceDataSource) Close() error {
|
|
return nil // Nothing to clean up for slice-based source
|
|
}
|
|
|
|
// StreamingFlushedDataSource provides streaming access to flushed data
|
|
type StreamingFlushedDataSource struct {
|
|
hms *HybridMessageScanner
|
|
partition topic.Partition
|
|
options HybridScanOptions
|
|
mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
|
|
resultChan chan *HybridScanResult
|
|
errorChan chan error
|
|
doneChan chan struct{}
|
|
started bool
|
|
finished bool
|
|
closed int32 // atomic flag to prevent double close
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
|
|
mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
|
|
|
|
return &StreamingFlushedDataSource{
|
|
hms: hms,
|
|
partition: partition,
|
|
options: options,
|
|
mergedReadFn: mergedReadFn,
|
|
resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
|
|
errorChan: make(chan error, 1),
|
|
doneChan: make(chan struct{}),
|
|
started: false,
|
|
finished: false,
|
|
}
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) startStreaming() {
|
|
if s.started {
|
|
return
|
|
}
|
|
s.started = true
|
|
|
|
go func() {
|
|
defer func() {
|
|
// Use atomic flag to ensure channels are only closed once
|
|
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
|
|
close(s.resultChan)
|
|
close(s.errorChan)
|
|
close(s.doneChan)
|
|
}
|
|
}()
|
|
|
|
// Set up time range for scanning
|
|
startTime := time.Unix(0, s.options.StartTimeNs)
|
|
if s.options.StartTimeNs == 0 {
|
|
startTime = time.Unix(0, 0)
|
|
}
|
|
|
|
stopTsNs := s.options.StopTimeNs
|
|
// For SQL queries, stopTsNs = 0 means "no stop time restriction"
|
|
// This is different from message queue consumers which want to stop at "now"
|
|
// We detect SQL context by checking if we have a predicate function
|
|
if stopTsNs == 0 && s.options.Predicate == nil {
|
|
// Only set to current time for non-SQL queries (message queue consumers)
|
|
stopTsNs = time.Now().UnixNano()
|
|
}
|
|
// If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
|
|
|
|
// Message processing function
|
|
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
|
|
// Skip control entries without actual data
|
|
if s.hms.isControlEntry(logEntry) {
|
|
return false, nil // Skip this entry
|
|
}
|
|
|
|
// Convert log entry to schema_pb.RecordValue for consistent processing
|
|
recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
|
|
if convertErr != nil {
|
|
return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
|
|
}
|
|
|
|
// Apply predicate filtering (WHERE clause)
|
|
if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
|
|
return false, nil // Skip this message
|
|
}
|
|
|
|
// Extract system columns
|
|
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
|
|
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
|
|
|
|
// Apply column projection
|
|
values := make(map[string]*schema_pb.Value)
|
|
if len(s.options.Columns) == 0 {
|
|
// Select all columns (excluding system columns from user view)
|
|
for name, value := range recordValue.Fields {
|
|
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
|
|
values[name] = value
|
|
}
|
|
}
|
|
} else {
|
|
// Select specified columns only
|
|
for _, columnName := range s.options.Columns {
|
|
if value, exists := recordValue.Fields[columnName]; exists {
|
|
values[columnName] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
result := &HybridScanResult{
|
|
Values: values,
|
|
Timestamp: timestamp,
|
|
Key: key,
|
|
Source: source,
|
|
}
|
|
|
|
// Check if already closed before trying to send
|
|
if atomic.LoadInt32(&s.closed) != 0 {
|
|
return true, nil // Stop processing if closed
|
|
}
|
|
|
|
// Send result to channel with proper handling of closed channels
|
|
select {
|
|
case s.resultChan <- result:
|
|
return false, nil
|
|
case <-s.doneChan:
|
|
return true, nil // Stop processing if closed
|
|
default:
|
|
// Check again if closed (in case it was closed between the atomic check and select)
|
|
if atomic.LoadInt32(&s.closed) != 0 {
|
|
return true, nil
|
|
}
|
|
// If not closed, try sending again with blocking select
|
|
select {
|
|
case s.resultChan <- result:
|
|
return false, nil
|
|
case <-s.doneChan:
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start scanning from the specified position
|
|
startPosition := log_buffer.MessagePosition{Time: startTime}
|
|
_, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
|
|
|
|
if err != nil {
|
|
// Only try to send error if not already closed
|
|
if atomic.LoadInt32(&s.closed) == 0 {
|
|
select {
|
|
case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
|
|
case <-s.doneChan:
|
|
default:
|
|
// Channel might be full or closed, ignore
|
|
}
|
|
}
|
|
}
|
|
|
|
s.finished = true
|
|
}()
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
|
|
if !s.started {
|
|
s.startStreaming()
|
|
}
|
|
|
|
select {
|
|
case result, ok := <-s.resultChan:
|
|
if !ok {
|
|
return nil, nil // No more results
|
|
}
|
|
return result, nil
|
|
case err := <-s.errorChan:
|
|
return nil, err
|
|
case <-s.doneChan:
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) HasMore() bool {
|
|
if !s.started {
|
|
return true // Haven't started yet, so potentially has data
|
|
}
|
|
return !s.finished || len(s.resultChan) > 0
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) Close() error {
|
|
// Use atomic flag to ensure channels are only closed once
|
|
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
|
|
close(s.doneChan)
|
|
close(s.resultChan)
|
|
close(s.errorChan)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
|
|
func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
|
|
if left < right {
|
|
mid := left + (right-left)/2
|
|
|
|
// Recursively sort both halves
|
|
hms.mergeSort(results, left, mid)
|
|
hms.mergeSort(results, mid+1, right)
|
|
|
|
// Merge the sorted halves
|
|
hms.merge(results, left, mid, right)
|
|
}
|
|
}
|
|
|
|
// merge combines two sorted subarrays into a single sorted array
|
|
func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
|
|
// Create temporary arrays for the two subarrays
|
|
leftArray := make([]HybridScanResult, mid-left+1)
|
|
rightArray := make([]HybridScanResult, right-mid)
|
|
|
|
// Copy data to temporary arrays
|
|
copy(leftArray, results[left:mid+1])
|
|
copy(rightArray, results[mid+1:right+1])
|
|
|
|
// Merge the temporary arrays back into results[left..right]
|
|
i, j, k := 0, 0, left
|
|
|
|
for i < len(leftArray) && j < len(rightArray) {
|
|
if leftArray[i].Timestamp <= rightArray[j].Timestamp {
|
|
results[k] = leftArray[i]
|
|
i++
|
|
} else {
|
|
results[k] = rightArray[j]
|
|
j++
|
|
}
|
|
k++
|
|
}
|
|
|
|
// Copy remaining elements of leftArray, if any
|
|
for i < len(leftArray) {
|
|
results[k] = leftArray[i]
|
|
i++
|
|
k++
|
|
}
|
|
|
|
// Copy remaining elements of rightArray, if any
|
|
for j < len(rightArray) {
|
|
results[k] = rightArray[j]
|
|
j++
|
|
k++
|
|
}
|
|
}
|