mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-18 17:20:22 +02:00
* feat: Phase 1 - Add SQL query engine foundation for MQ topics Implements core SQL infrastructure with metadata operations: New Components: - SQL parser integration using github.com/xwb1989/sqlparser - Query engine framework in weed/query/engine/ - Schema catalog mapping MQ topics to SQL tables - Interactive SQL CLI command 'weed sql' Supported Operations: - SHOW DATABASES (lists MQ namespaces) - SHOW TABLES (lists MQ topics) - SQL statement parsing and routing - Error handling and result formatting Key Design Decisions: - MQ namespaces ↔ SQL databases - MQ topics ↔ SQL tables - Parquet message storage ready for querying - Backward-compatible schema evolution support Testing: - Unit tests for core engine functionality - Command integration tests - Parse error handling validation Assumptions (documented in code): - All MQ messages stored in Parquet format - Schema evolution maintains backward compatibility - MySQL-compatible SQL syntax via sqlparser - Single-threaded usage per SQL session Next Phase: DDL operations (CREATE/ALTER/DROP TABLE) * feat: Phase 2 - Add DDL operations and real MQ broker integration Implements comprehensive DDL support for MQ topic management: New Components: - Real MQ broker connectivity via BrokerClient - CREATE TABLE → ConfigureTopic gRPC calls - DROP TABLE → DeleteTopic operations - DESCRIBE table → Schema introspection - SQL type mapping (SQL ↔ MQ schema types) Enhanced Features: - Live topic discovery from MQ broker - Fallback to cached/sample data when broker unavailable - MySQL-compatible DESCRIBE output - Schema validation and error handling - CREATE TABLE with column definitions Key Infrastructure: - broker_client.go: gRPC communication with MQ broker - sql_types.go: Bidirectional SQL/MQ type conversion - describe.go: Table schema introspection - Enhanced engine.go: Full DDL routing and execution Supported SQL Operations: ✅ SHOW DATABASES, SHOW TABLES (live + fallback) ✅ CREATE TABLE table_name (col1 INT, col2 VARCHAR(50), ...) ✅ DROP TABLE table_name ✅ DESCRIBE table_name / SHOW COLUMNS FROM table_name Known Limitations: - SQL parser issues with reserved keywords (e.g., 'timestamp') - Requires running MQ broker for full functionality - ALTER TABLE not yet implemented - DeleteTopic method needs broker-side implementation Architecture Decisions: - Broker discovery via filer lock mechanism (same as shell commands) - Graceful fallback when broker unavailable - ConfigureTopic for CREATE TABLE with 6 default partitions - Schema versioning ready for ALTER TABLE support Testing: - Unit tests updated with filer address parameter - Integration tests for DDL operations - Error handling for connection failures Next Phase: SELECT query execution with Parquet scanning * fix: Resolve High Priority TODOs - Real MQ Broker Integration ✅ COMPLETED HIGH PRIORITY TODOs: 🔧 **Real FilerClient Integration** (engine.go:131) - Implemented GetFilerClient() method in BrokerClient - Added filerClientImpl with full FilerClient interface compliance - Added AdjustedUrl() and GetDataCenter() methods - Real filerClient connection replaces nil fallback 🔧 **Partition Discovery via MQ Broker** (hybrid_message_scanner.go:116) - Added ListTopicPartitions() method using topic configuration - Implemented discoverTopicPartitions() in HybridMessageScanner - Reads actual partition count from BrokerPartitionAssignments - Generates proper partition ranges based on topic.PartitionCount 📋 **Technical Fixes:** - Fixed compilation errors with undefined variables - Proper error handling with filerClientErr variable - Corrected ConfigureTopicResponse field usage (BrokerPartitionAssignments vs PartitionCount) - Complete FilerClient interface implementation 🎯 **Impact:** - SQL engine now connects to real MQ broker infrastructure - Actual topic partition discovery instead of hardcoded defaults - Production-ready broker integration with graceful fallbacks - Maintains backward compatibility with sample data when broker unavailable ✅ All tests passing - High priority TODO resolution complete! Next: Schema-aware message parsing and time filter optimization. * feat: Time Filter Extraction - Complete Performance Optimization ✅ FOURTH HIGH PRIORITY TODO COMPLETED! ⏰ **Time Filter Extraction & Push-Down Optimization** (engine.go:198-199) - Replaced hardcoded StartTimeNs=0, StopTimeNs=0 with intelligent extraction - Added extractTimeFilters() with recursive WHERE clause analysis - Smart time column detection (\_timestamp_ns, created_at, timestamp, etc.) - Comprehensive time value parsing (nanoseconds, ISO dates, datetime formats) - Operator reversal handling (column op value vs value op column) 🧠 **Intelligent WHERE Clause Processing:** - AND expressions: Combine time bounds (intersection) ✅ - OR expressions: Skip extraction (safety) ✅ - Parentheses: Recursive unwrapping ✅ - Comparison operators: >, >=, <, <=, = ✅ - Multiple time formats: nanoseconds, RFC3339, date-only, datetime ✅ 🚀 **Performance Impact:** - Push-down filtering to hybrid scanner level - Reduced data scanning at source (live logs + Parquet files) - Time-based partition pruning potential - Significant performance gains for time-series queries 📊 **Comprehensive Testing (21 tests passing):** - ✅ Time filter extraction (6 test scenarios) - ✅ Time column recognition (case-insensitive) - ✅ Time value parsing (5 formats) - ✅ Full integration with SELECT queries - ✅ Backward compatibility maintained 💡 **Real-World Query Examples:** Before: Scans ALL data, filters in memory SELECT * FROM events WHERE \_timestamp_ns > 1672531200000000000; After: Scans ONLY relevant time range at source level → StartTimeNs=1672531200000000000, StopTimeNs=0 → Massive performance improvement for large datasets! 🎯 **Production Ready Features:** - Multiple time column formats supported - Graceful fallbacks for invalid dates - OR clause safety (avoids incorrect optimization) - Comprehensive error handling **ALL MEDIUM PRIORITY TODOs NOW READY FOR NEXT PHASEtest ./weed/query/engine/ -v* 🎉 * feat: Extended WHERE Operators - Complete Advanced Filtering ✅ **EXTENDED WHERE OPERATORS IMPLEMENTEDtest ./weed/query/engine/ -v | grep -E PASS * feat: Enhanced SQL CLI Experience ✅ COMPLETE ENHANCED CLI IMPLEMENTATION: 🚀 **Multiple Execution Modes:** - Interactive shell with enhanced prompts and context - Single query execution: --query 'SQL' --output format - Batch file processing: --file queries.sql --output csv - Database context switching: --database dbname 📊 **Multi-Format Output:** - Table format (ASCII) - default for interactive - JSON format - structured data for programmatic use - CSV format - spreadsheet-friendly output - Smart auto-detection based on execution mode ⚙️ **Enhanced Interactive Shell:** - Database context switching: USE database_name; - Output format switching: \format table|json|csv - Command history tracking (basic implementation) - Enhanced help with WHERE operator examples - Contextual prompts: seaweedfs:dbname> 🛠️ **Production Features:** - Comprehensive error handling (JSON + user-friendly) - Query execution timing and performance metrics - 30-second timeout protection with graceful handling - Real MQ integration with hybrid data scanning 📖 **Complete CLI Interface:** - Full flag support: --server, --interactive, --file, --output, --database, --query - Auto-detection of execution mode and output format - Structured help system with practical examples - Batch processing with multi-query file support 💡 **Advanced WHERE Integration:** All extended operators (<=, >=, !=, LIKE, IN) fully supported across all execution modes and output formats. 🎯 **Usage Examples:** - weed sql --interactive - weed sql --query 'SHOW DATABASES' --output json - weed sql --file queries.sql --output csv - weed sql --database analytics --interactive Enhanced CLI experience complete - production ready! 🚀 * Delete test_utils_test.go * fmt * integer conversion * show databases works * show tables works * Update describe.go * actual column types * Update .gitignore * scan topic messages * remove emoji * support aggregation functions * column name case insensitive, better auto column names * fmt * fix reading system fields * use parquet statistics for optimization * remove emoji * parquet file generate stats * scan all files * parquet file generation remember the sources also * fmt * sql * truncate topic * combine parquet results with live logs * explain * explain the execution plan * add tests * improve tests * skip * use mock for testing * add tests * refactor * fix after refactoring * detailed logs during explain. Fix bugs on reading live logs. * fix decoding data * save source buffer index start for log files * process buffer from brokers * filter out already flushed messages * dedup with buffer start index * explain with broker buffer * the parquet file should also remember the first buffer_start attribute from the sources * parquet file can query messages in broker memory, if log files do not exist * buffer start stored as 8 bytes * add jdbc * add postgres protocol * Revert "add jdbc" This reverts commit |
||
---|---|---|
.. | ||
README.md | ||
test_client.py |
SeaweedFS PostgreSQL Protocol Examples
This directory contains examples demonstrating how to connect to SeaweedFS using the PostgreSQL wire protocol.
Starting the PostgreSQL Server
# Start with trust authentication (no password required)
weed postgres -port=5432 -master=localhost:9333
# Start with password authentication
weed postgres -port=5432 -auth=password -users="admin:secret;readonly:view123"
# Start with MD5 authentication (more secure)
weed postgres -port=5432 -auth=md5 -users="user1:pass1;user2:pass2"
# Start with TLS encryption
weed postgres -port=5432 -tls-cert=server.crt -tls-key=server.key
# Allow connections from any host
weed postgres -host=0.0.0.0 -port=5432
Client Connections
psql Command Line
# Basic connection (trust auth)
psql -h localhost -p 5432 -U seaweedfs -d default
# With password
PGPASSWORD=secret psql -h localhost -p 5432 -U admin -d default
# Connection string format
psql "postgresql://admin:secret@localhost:5432/default"
# Connection string with parameters
psql "host=localhost port=5432 dbname=default user=admin password=secret"
Programming Languages
Python (psycopg2)
import psycopg2
# Connect to SeaweedFS
conn = psycopg2.connect(
host="localhost",
port=5432,
user="seaweedfs",
database="default"
)
# Execute queries
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_topic LIMIT 10")
for row in cursor.fetchall():
print(row)
cursor.close()
conn.close()
Java JDBC
import java.sql.*;
public class SeaweedFSExample {
public static void main(String[] args) throws SQLException {
String url = "jdbc:postgresql://localhost:5432/default";
Connection conn = DriverManager.getConnection(url, "seaweedfs", "");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM my_topic LIMIT 10");
while (rs.next()) {
System.out.println("ID: " + rs.getLong("id"));
System.out.println("Message: " + rs.getString("message"));
}
rs.close();
stmt.close();
conn.close();
}
}
Go (lib/pq)
package main
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
)
func main() {
db, err := sql.Open("postgres",
"host=localhost port=5432 user=seaweedfs dbname=default sslmode=disable")
if err != nil {
panic(err)
}
defer db.Close()
rows, err := db.Query("SELECT * FROM my_topic LIMIT 10")
if err != nil {
panic(err)
}
defer rows.Close()
for rows.Next() {
var id int64
var message string
err := rows.Scan(&id, &message)
if err != nil {
panic(err)
}
fmt.Printf("ID: %d, Message: %s\n", id, message)
}
}
Node.js (pg)
const { Client } = require('pg');
const client = new Client({
host: 'localhost',
port: 5432,
user: 'seaweedfs',
database: 'default',
});
async function query() {
await client.connect();
const result = await client.query('SELECT * FROM my_topic LIMIT 10');
console.log(result.rows);
await client.end();
}
query().catch(console.error);
SQL Operations
Basic Queries
-- List databases
SHOW DATABASES;
-- List tables (topics)
SHOW TABLES;
-- Describe table structure
DESCRIBE my_topic;
-- or use the shorthand: DESC my_topic;
-- Basic select
SELECT * FROM my_topic;
-- With WHERE clause
SELECT id, message FROM my_topic WHERE id > 1000;
-- With LIMIT
SELECT * FROM my_topic LIMIT 100;
Aggregations
-- Count records
SELECT COUNT(*) FROM my_topic;
-- Multiple aggregations
SELECT
COUNT(*) as total_messages,
MIN(id) as min_id,
MAX(id) as max_id,
AVG(amount) as avg_amount
FROM my_topic;
-- Aggregations with WHERE
SELECT COUNT(*) FROM my_topic WHERE status = 'active';
System Columns
-- Access system columns
SELECT
id,
message,
_timestamp_ns as timestamp,
_key as partition_key,
_source as data_source
FROM my_topic;
-- Filter by timestamp
SELECT * FROM my_topic
WHERE _timestamp_ns > 1640995200000000000
LIMIT 10;
PostgreSQL System Queries
-- Version information
SELECT version();
-- Current database
SELECT current_database();
-- Current user
SELECT current_user;
-- Server settings
SELECT current_setting('server_version');
SELECT current_setting('server_encoding');
psql Meta-Commands
-- List tables
\d
\dt
-- List databases
\l
-- Describe specific table
\d my_topic
\dt my_topic
-- List schemas
\dn
-- Help
\h
\?
-- Quit
\q
Database Tools Integration
DBeaver
- Create New Connection → PostgreSQL
- Settings:
- Host: localhost
- Port: 5432
- Database: default
- Username: seaweedfs (or configured user)
- Password: (if using password auth)
pgAdmin
- Add New Server
- Connection tab:
- Host: localhost
- Port: 5432
- Username: seaweedfs
- Database: default
DataGrip
- New Data Source → PostgreSQL
- Configure:
- Host: localhost
- Port: 5432
- User: seaweedfs
- Database: default
Grafana
- Add Data Source → PostgreSQL
- Configuration:
- Host: localhost:5432
- Database: default
- User: seaweedfs
- SSL Mode: disable
BI Tools
Tableau
- Connect to Data → PostgreSQL
- Server: localhost
- Port: 5432
- Database: default
- Username: seaweedfs
Power BI
- Get Data → Database → PostgreSQL
- Server: localhost
- Database: default
- Username: seaweedfs
Connection Pooling
Java (HikariCP)
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/default");
config.setUsername("seaweedfs");
config.setMaximumPoolSize(10);
HikariDataSource dataSource = new HikariDataSource(config);
Python (connection pooling)
from psycopg2 import pool
connection_pool = psycopg2.pool.SimpleConnectionPool(
1, 20,
host="localhost",
port=5432,
user="seaweedfs",
database="default"
)
conn = connection_pool.getconn()
# Use connection
connection_pool.putconn(conn)
Security Best Practices
Use TLS Encryption
# Generate self-signed certificate for testing
openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 365 -nodes
# Start with TLS
weed postgres -tls-cert=server.crt -tls-key=server.key
Use MD5 Authentication
# More secure than password auth
weed postgres -auth=md5 -users="admin:secret123;readonly:view456"
Limit Connections
# Limit concurrent connections
weed postgres -max-connections=50 -idle-timeout=30m
Troubleshooting
Connection Issues
# Test connectivity
telnet localhost 5432
# Check if server is running
ps aux | grep "weed postgres"
# Check logs for errors
tail -f /var/log/seaweedfs/postgres.log
Common Errors
"Connection refused"
- Ensure PostgreSQL server is running
- Check host/port configuration
- Verify firewall settings
"Authentication failed"
- Check username/password
- Verify auth method configuration
- Ensure user is configured in server
"Database does not exist"
- Use correct database name (default: 'default')
- Check available databases:
SHOW DATABASES
"Permission denied"
- Check user permissions
- Verify authentication method
- Use correct credentials
Performance Tips
- Use LIMIT clauses for large result sets
- Filter with WHERE clauses to reduce data transfer
- Use connection pooling for multi-threaded applications
- Close resources properly (connections, statements, result sets)
- Use prepared statements for repeated queries
Monitoring
Connection Statistics
-- Current connections (if supported)
SELECT COUNT(*) FROM pg_stat_activity;
-- Server version
SELECT version();
-- Current settings
SELECT name, setting FROM pg_settings WHERE name LIKE '%connection%';
Query Performance
-- Use EXPLAIN for query plans (if supported)
EXPLAIN SELECT * FROM my_topic WHERE id > 1000;
This PostgreSQL protocol support makes SeaweedFS accessible to the entire PostgreSQL ecosystem, enabling seamless integration with existing tools, applications, and workflows.