1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/offset/sql_storage_test.go
chrislu 6aba7e6620 Phase 5: Implement SQL storage backend for offset persistence
- Design comprehensive SQL schema for offset storage with future _index column support
- Implement SQLOffsetStorage with full database operations:
  - Partition offset checkpoints with UPSERT functionality
  - Detailed offset mappings with range queries and statistics
  - Database migration system with version tracking
  - Performance optimizations with proper indexing
- Add database migration manager with automatic schema updates
- Create comprehensive test suite with 11 test cases covering:
  - Schema initialization and table creation
  - Checkpoint save/load operations with error handling
  - Offset mapping storage and retrieval with sorting
  - Range queries and highest offset detection
  - Partition statistics with NULL value handling
  - Cleanup operations for old data retention
  - Concurrent access safety and database vacuum
- Extend BrokerOffsetManager with SQL storage integration:
  - NewBrokerOffsetManagerWithSQL for database-backed storage
  - Configurable storage backends (in-memory fallback, SQL preferred)
  - Database connection management and error handling
- Add SQLite driver dependency and configure for optimal performance
- Support for future database types (PostgreSQL, MySQL) with abstraction layer

Key TODOs and Assumptions:
- TODO: Add _index as computed column when database supports it
- TODO: Implement database backup and restore functionality
- TODO: Add configuration for database path and connection parameters
- ASSUMPTION: Using SQLite for now, extensible to other databases
- ASSUMPTION: WAL mode and performance pragmas for production use
- ASSUMPTION: Migration system handles schema evolution gracefully

All 11 SQL storage tests pass, providing robust persistent offset management.
2025-09-12 00:53:49 -07:00

516 lines
13 KiB
Go

package offset
import (
"database/sql"
"os"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
_ "github.com/mattn/go-sqlite3" // SQLite driver
)
func createTestDB(t *testing.T) *sql.DB {
// Create temporary database file
tmpFile, err := os.CreateTemp("", "offset_test_*.db")
if err != nil {
t.Fatalf("Failed to create temp database file: %v", err)
}
tmpFile.Close()
// Clean up the file when test completes
t.Cleanup(func() {
os.Remove(tmpFile.Name())
})
db, err := sql.Open("sqlite3", tmpFile.Name())
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
t.Cleanup(func() {
db.Close()
})
return db
}
func createTestPartitionForSQL() *schema_pb.Partition {
return &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
}
func TestSQLOffsetStorage_InitializeSchema(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
// Verify tables were created
tables := []string{
"partition_offset_checkpoints",
"offset_mappings",
}
for _, table := range tables {
var count int
err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?", table).Scan(&count)
if err != nil {
t.Fatalf("Failed to check table %s: %v", table, err)
}
if count != 1 {
t.Errorf("Table %s was not created", table)
}
}
}
func TestSQLOffsetStorage_SaveLoadCheckpoint(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
// Test saving checkpoint
err = storage.SaveCheckpoint(partition, 100)
if err != nil {
t.Fatalf("Failed to save checkpoint: %v", err)
}
// Test loading checkpoint
checkpoint, err := storage.LoadCheckpoint(partition)
if err != nil {
t.Fatalf("Failed to load checkpoint: %v", err)
}
if checkpoint != 100 {
t.Errorf("Expected checkpoint 100, got %d", checkpoint)
}
// Test updating checkpoint
err = storage.SaveCheckpoint(partition, 200)
if err != nil {
t.Fatalf("Failed to update checkpoint: %v", err)
}
checkpoint, err = storage.LoadCheckpoint(partition)
if err != nil {
t.Fatalf("Failed to load updated checkpoint: %v", err)
}
if checkpoint != 200 {
t.Errorf("Expected updated checkpoint 200, got %d", checkpoint)
}
}
func TestSQLOffsetStorage_LoadCheckpointNotFound(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
// Test loading non-existent checkpoint
_, err = storage.LoadCheckpoint(partition)
if err == nil {
t.Error("Expected error for non-existent checkpoint")
}
}
func TestSQLOffsetStorage_SaveLoadOffsetMappings(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
// Save multiple offset mappings
mappings := []struct {
offset int64
timestamp int64
size int32
}{
{0, 1000, 100},
{1, 2000, 150},
{2, 3000, 200},
}
for _, mapping := range mappings {
err := storage.SaveOffsetMapping(partitionKey, mapping.offset, mapping.timestamp, mapping.size)
if err != nil {
t.Fatalf("Failed to save offset mapping: %v", err)
}
}
// Load offset mappings
entries, err := storage.LoadOffsetMappings(partitionKey)
if err != nil {
t.Fatalf("Failed to load offset mappings: %v", err)
}
if len(entries) != len(mappings) {
t.Errorf("Expected %d entries, got %d", len(mappings), len(entries))
}
// Verify entries are sorted by offset
for i, entry := range entries {
expected := mappings[i]
if entry.KafkaOffset != expected.offset {
t.Errorf("Entry %d: expected offset %d, got %d", i, expected.offset, entry.KafkaOffset)
}
if entry.SMQTimestamp != expected.timestamp {
t.Errorf("Entry %d: expected timestamp %d, got %d", i, expected.timestamp, entry.SMQTimestamp)
}
if entry.MessageSize != expected.size {
t.Errorf("Entry %d: expected size %d, got %d", i, expected.size, entry.MessageSize)
}
}
}
func TestSQLOffsetStorage_GetHighestOffset(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
// Test empty partition
_, err = storage.GetHighestOffset(partition)
if err == nil {
t.Error("Expected error for empty partition")
}
// Add some offset mappings
offsets := []int64{5, 1, 3, 2, 4}
for _, offset := range offsets {
err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100)
if err != nil {
t.Fatalf("Failed to save offset mapping: %v", err)
}
}
// Get highest offset
highest, err := storage.GetHighestOffset(partition)
if err != nil {
t.Fatalf("Failed to get highest offset: %v", err)
}
if highest != 5 {
t.Errorf("Expected highest offset 5, got %d", highest)
}
}
func TestSQLOffsetStorage_GetOffsetMappingsByRange(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
// Add offset mappings
for i := int64(0); i < 10; i++ {
err := storage.SaveOffsetMapping(partitionKey, i, i*1000, 100)
if err != nil {
t.Fatalf("Failed to save offset mapping: %v", err)
}
}
// Get range of offsets
entries, err := storage.GetOffsetMappingsByRange(partitionKey, 3, 7)
if err != nil {
t.Fatalf("Failed to get offset range: %v", err)
}
expectedCount := 5 // offsets 3, 4, 5, 6, 7
if len(entries) != expectedCount {
t.Errorf("Expected %d entries, got %d", expectedCount, len(entries))
}
// Verify range
for i, entry := range entries {
expectedOffset := int64(3 + i)
if entry.KafkaOffset != expectedOffset {
t.Errorf("Entry %d: expected offset %d, got %d", i, expectedOffset, entry.KafkaOffset)
}
}
}
func TestSQLOffsetStorage_GetPartitionStats(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
// Test empty partition stats
stats, err := storage.GetPartitionStats(partitionKey)
if err != nil {
t.Fatalf("Failed to get empty partition stats: %v", err)
}
if stats.RecordCount != 0 {
t.Errorf("Expected record count 0, got %d", stats.RecordCount)
}
if stats.EarliestOffset != -1 {
t.Errorf("Expected earliest offset -1, got %d", stats.EarliestOffset)
}
// Add some data
sizes := []int32{100, 150, 200}
for i, size := range sizes {
err := storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), size)
if err != nil {
t.Fatalf("Failed to save offset mapping: %v", err)
}
}
// Get stats with data
stats, err = storage.GetPartitionStats(partitionKey)
if err != nil {
t.Fatalf("Failed to get partition stats: %v", err)
}
if stats.RecordCount != 3 {
t.Errorf("Expected record count 3, got %d", stats.RecordCount)
}
if stats.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", stats.EarliestOffset)
}
if stats.LatestOffset != 2 {
t.Errorf("Expected latest offset 2, got %d", stats.LatestOffset)
}
if stats.HighWaterMark != 3 {
t.Errorf("Expected high water mark 3, got %d", stats.HighWaterMark)
}
expectedTotalSize := int64(100 + 150 + 200)
if stats.TotalSize != expectedTotalSize {
t.Errorf("Expected total size %d, got %d", expectedTotalSize, stats.TotalSize)
}
}
func TestSQLOffsetStorage_GetAllPartitions(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
// Test empty database
partitions, err := storage.GetAllPartitions()
if err != nil {
t.Fatalf("Failed to get all partitions: %v", err)
}
if len(partitions) != 0 {
t.Errorf("Expected 0 partitions, got %d", len(partitions))
}
// Add data for multiple partitions
partition1 := createTestPartitionForSQL()
partition2 := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 32,
RangeStop: 63,
UnixTimeNs: time.Now().UnixNano(),
}
partitionKey1 := partitionKey(partition1)
partitionKey2 := partitionKey(partition2)
storage.SaveOffsetMapping(partitionKey1, 0, 1000, 100)
storage.SaveOffsetMapping(partitionKey2, 0, 2000, 150)
// Get all partitions
partitions, err = storage.GetAllPartitions()
if err != nil {
t.Fatalf("Failed to get all partitions: %v", err)
}
if len(partitions) != 2 {
t.Errorf("Expected 2 partitions, got %d", len(partitions))
}
// Verify partition keys are present
partitionMap := make(map[string]bool)
for _, p := range partitions {
partitionMap[p] = true
}
if !partitionMap[partitionKey1] {
t.Errorf("Partition key %s not found", partitionKey1)
}
if !partitionMap[partitionKey2] {
t.Errorf("Partition key %s not found", partitionKey2)
}
}
func TestSQLOffsetStorage_CleanupOldMappings(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
// Add mappings with different timestamps
now := time.Now().UnixNano()
// Add old mapping by directly inserting with old timestamp
oldTime := now - (24 * time.Hour).Nanoseconds() // 24 hours ago
_, err = db.Exec(`
INSERT INTO offset_mappings
(partition_key, kafka_offset, smq_timestamp, message_size, created_at)
VALUES (?, ?, ?, ?, ?)
`, partitionKey, 0, oldTime, 100, oldTime)
if err != nil {
t.Fatalf("Failed to insert old mapping: %v", err)
}
// Add recent mapping
storage.SaveOffsetMapping(partitionKey, 1, now, 150)
// Verify both mappings exist
entries, err := storage.LoadOffsetMappings(partitionKey)
if err != nil {
t.Fatalf("Failed to load mappings: %v", err)
}
if len(entries) != 2 {
t.Errorf("Expected 2 mappings before cleanup, got %d", len(entries))
}
// Cleanup old mappings (older than 12 hours)
cutoffTime := now - (12 * time.Hour).Nanoseconds()
err = storage.CleanupOldMappings(cutoffTime)
if err != nil {
t.Fatalf("Failed to cleanup old mappings: %v", err)
}
// Verify only recent mapping remains
entries, err = storage.LoadOffsetMappings(partitionKey)
if err != nil {
t.Fatalf("Failed to load mappings after cleanup: %v", err)
}
if len(entries) != 1 {
t.Errorf("Expected 1 mapping after cleanup, got %d", len(entries))
}
if entries[0].KafkaOffset != 1 {
t.Errorf("Expected remaining mapping offset 1, got %d", entries[0].KafkaOffset)
}
}
func TestSQLOffsetStorage_Vacuum(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
// Vacuum should not fail on empty database
err = storage.Vacuum()
if err != nil {
t.Fatalf("Failed to vacuum database: %v", err)
}
// Add some data and vacuum again
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
storage.SaveOffsetMapping(partitionKey, 0, 1000, 100)
err = storage.Vacuum()
if err != nil {
t.Fatalf("Failed to vacuum database with data: %v", err)
}
}
func TestSQLOffsetStorage_ConcurrentAccess(t *testing.T) {
db := createTestDB(t)
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
partition := createTestPartitionForSQL()
partitionKey := partitionKey(partition)
// Test concurrent writes
const numGoroutines = 10
const offsetsPerGoroutine = 10
done := make(chan bool, numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
defer func() { done <- true }()
for j := 0; j < offsetsPerGoroutine; j++ {
offset := int64(goroutineID*offsetsPerGoroutine + j)
err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100)
if err != nil {
t.Errorf("Failed to save offset mapping %d: %v", offset, err)
return
}
}
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < numGoroutines; i++ {
<-done
}
// Verify all mappings were saved
entries, err := storage.LoadOffsetMappings(partitionKey)
if err != nil {
t.Fatalf("Failed to load mappings: %v", err)
}
expectedCount := numGoroutines * offsetsPerGoroutine
if len(entries) != expectedCount {
t.Errorf("Expected %d mappings, got %d", expectedCount, len(entries))
}
}