mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Add PartitionOffsetManager for sequential offset assignment per partition - Implement OffsetStorage interface with in-memory and SQL storage backends - Add PartitionOffsetRegistry for managing multiple partition offset managers - Implement offset recovery from checkpoints and storage scanning - Add OffsetAssigner for high-level offset assignment operations - Support both single and batch offset assignment with timestamps - Add comprehensive tests covering: - Basic and batch offset assignment - Offset recovery from checkpoints and storage - Multi-partition offset management - Concurrent offset assignment safety - All tests pass, offset assignment is thread-safe and recoverable
388 lines
10 KiB
Go
388 lines
10 KiB
Go
package offset
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
func createTestPartition() *schema_pb.Partition {
|
|
return &schema_pb.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 0,
|
|
RangeStop: 31,
|
|
UnixTimeNs: time.Now().UnixNano(),
|
|
}
|
|
}
|
|
|
|
func TestPartitionOffsetManager_BasicAssignment(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
partition := createTestPartition()
|
|
|
|
manager, err := NewPartitionOffsetManager(partition, storage)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create offset manager: %v", err)
|
|
}
|
|
|
|
// Test sequential offset assignment
|
|
for i := int64(0); i < 10; i++ {
|
|
offset := manager.AssignOffset()
|
|
if offset != i {
|
|
t.Errorf("Expected offset %d, got %d", i, offset)
|
|
}
|
|
}
|
|
|
|
// Test high water mark
|
|
hwm := manager.GetHighWaterMark()
|
|
if hwm != 10 {
|
|
t.Errorf("Expected high water mark 10, got %d", hwm)
|
|
}
|
|
}
|
|
|
|
func TestPartitionOffsetManager_BatchAssignment(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
partition := createTestPartition()
|
|
|
|
manager, err := NewPartitionOffsetManager(partition, storage)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create offset manager: %v", err)
|
|
}
|
|
|
|
// Assign batch of 5 offsets
|
|
baseOffset, lastOffset := manager.AssignOffsets(5)
|
|
if baseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", baseOffset)
|
|
}
|
|
if lastOffset != 4 {
|
|
t.Errorf("Expected last offset 4, got %d", lastOffset)
|
|
}
|
|
|
|
// Assign another batch
|
|
baseOffset, lastOffset = manager.AssignOffsets(3)
|
|
if baseOffset != 5 {
|
|
t.Errorf("Expected base offset 5, got %d", baseOffset)
|
|
}
|
|
if lastOffset != 7 {
|
|
t.Errorf("Expected last offset 7, got %d", lastOffset)
|
|
}
|
|
|
|
// Check high water mark
|
|
hwm := manager.GetHighWaterMark()
|
|
if hwm != 8 {
|
|
t.Errorf("Expected high water mark 8, got %d", hwm)
|
|
}
|
|
}
|
|
|
|
func TestPartitionOffsetManager_Recovery(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
partition := createTestPartition()
|
|
|
|
// Create manager and assign some offsets
|
|
manager1, err := NewPartitionOffsetManager(partition, storage)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create offset manager: %v", err)
|
|
}
|
|
|
|
// Assign offsets and simulate records
|
|
for i := 0; i < 150; i++ { // More than checkpoint interval
|
|
offset := manager1.AssignOffset()
|
|
storage.AddRecord(partition, offset)
|
|
}
|
|
|
|
// Wait for checkpoint to complete
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Create new manager (simulates restart)
|
|
manager2, err := NewPartitionOffsetManager(partition, storage)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create offset manager after recovery: %v", err)
|
|
}
|
|
|
|
// Next offset should continue from checkpoint + 1
|
|
// With checkpoint interval 100, checkpoint happens at offset 100
|
|
// So recovery should start from 101, but we assigned 150 offsets (0-149)
|
|
// The checkpoint should be at 100, so next offset should be 101
|
|
// But since we have records up to 149, it should recover from storage scan
|
|
nextOffset := manager2.AssignOffset()
|
|
if nextOffset != 150 {
|
|
t.Errorf("Expected next offset 150 after recovery, got %d", nextOffset)
|
|
}
|
|
}
|
|
|
|
func TestPartitionOffsetManager_RecoveryFromStorage(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
partition := createTestPartition()
|
|
|
|
// Simulate existing records in storage without checkpoint
|
|
for i := int64(0); i < 50; i++ {
|
|
storage.AddRecord(partition, i)
|
|
}
|
|
|
|
// Create manager - should recover from storage scan
|
|
manager, err := NewPartitionOffsetManager(partition, storage)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create offset manager: %v", err)
|
|
}
|
|
|
|
// Next offset should be 50
|
|
nextOffset := manager.AssignOffset()
|
|
if nextOffset != 50 {
|
|
t.Errorf("Expected next offset 50 after storage recovery, got %d", nextOffset)
|
|
}
|
|
}
|
|
|
|
func TestPartitionOffsetRegistry_MultiplePartitions(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
registry := NewPartitionOffsetRegistry(storage)
|
|
|
|
// Create different partitions
|
|
partition1 := &schema_pb.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 0,
|
|
RangeStop: 31,
|
|
UnixTimeNs: time.Now().UnixNano(),
|
|
}
|
|
|
|
partition2 := &schema_pb.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 32,
|
|
RangeStop: 63,
|
|
UnixTimeNs: time.Now().UnixNano(),
|
|
}
|
|
|
|
// Assign offsets to different partitions
|
|
offset1, err := registry.AssignOffset(partition1)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign offset to partition1: %v", err)
|
|
}
|
|
if offset1 != 0 {
|
|
t.Errorf("Expected offset 0 for partition1, got %d", offset1)
|
|
}
|
|
|
|
offset2, err := registry.AssignOffset(partition2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign offset to partition2: %v", err)
|
|
}
|
|
if offset2 != 0 {
|
|
t.Errorf("Expected offset 0 for partition2, got %d", offset2)
|
|
}
|
|
|
|
// Assign more offsets to partition1
|
|
offset1_2, err := registry.AssignOffset(partition1)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign second offset to partition1: %v", err)
|
|
}
|
|
if offset1_2 != 1 {
|
|
t.Errorf("Expected offset 1 for partition1, got %d", offset1_2)
|
|
}
|
|
|
|
// Partition2 should still be at 0 for next assignment
|
|
offset2_2, err := registry.AssignOffset(partition2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign second offset to partition2: %v", err)
|
|
}
|
|
if offset2_2 != 1 {
|
|
t.Errorf("Expected offset 1 for partition2, got %d", offset2_2)
|
|
}
|
|
}
|
|
|
|
func TestPartitionOffsetRegistry_BatchAssignment(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
registry := NewPartitionOffsetRegistry(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Assign batch of offsets
|
|
baseOffset, lastOffset, err := registry.AssignOffsets(partition, 10)
|
|
if err != nil {
|
|
t.Fatalf("Failed to assign batch offsets: %v", err)
|
|
}
|
|
|
|
if baseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", baseOffset)
|
|
}
|
|
if lastOffset != 9 {
|
|
t.Errorf("Expected last offset 9, got %d", lastOffset)
|
|
}
|
|
|
|
// Get high water mark
|
|
hwm, err := registry.GetHighWaterMark(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get high water mark: %v", err)
|
|
}
|
|
if hwm != 10 {
|
|
t.Errorf("Expected high water mark 10, got %d", hwm)
|
|
}
|
|
}
|
|
|
|
func TestOffsetAssigner_SingleAssignment(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
assigner := NewOffsetAssigner(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Assign single offset
|
|
result := assigner.AssignSingleOffset(partition)
|
|
if result.Error != nil {
|
|
t.Fatalf("Failed to assign single offset: %v", result.Error)
|
|
}
|
|
|
|
if result.Assignment == nil {
|
|
t.Fatal("Assignment result is nil")
|
|
}
|
|
|
|
if result.Assignment.Offset != 0 {
|
|
t.Errorf("Expected offset 0, got %d", result.Assignment.Offset)
|
|
}
|
|
|
|
if result.Assignment.Partition != partition {
|
|
t.Error("Partition mismatch in assignment")
|
|
}
|
|
|
|
if result.Assignment.Timestamp <= 0 {
|
|
t.Error("Timestamp should be set")
|
|
}
|
|
}
|
|
|
|
func TestOffsetAssigner_BatchAssignment(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
assigner := NewOffsetAssigner(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Assign batch of offsets
|
|
result := assigner.AssignBatchOffsets(partition, 5)
|
|
if result.Error != nil {
|
|
t.Fatalf("Failed to assign batch offsets: %v", result.Error)
|
|
}
|
|
|
|
if result.Batch == nil {
|
|
t.Fatal("Batch result is nil")
|
|
}
|
|
|
|
if result.Batch.BaseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", result.Batch.BaseOffset)
|
|
}
|
|
|
|
if result.Batch.LastOffset != 4 {
|
|
t.Errorf("Expected last offset 4, got %d", result.Batch.LastOffset)
|
|
}
|
|
|
|
if result.Batch.Count != 5 {
|
|
t.Errorf("Expected count 5, got %d", result.Batch.Count)
|
|
}
|
|
|
|
if result.Batch.Timestamp <= 0 {
|
|
t.Error("Timestamp should be set")
|
|
}
|
|
}
|
|
|
|
func TestOffsetAssigner_HighWaterMark(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
assigner := NewOffsetAssigner(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Initially should be 0
|
|
hwm, err := assigner.GetHighWaterMark(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get initial high water mark: %v", err)
|
|
}
|
|
if hwm != 0 {
|
|
t.Errorf("Expected initial high water mark 0, got %d", hwm)
|
|
}
|
|
|
|
// Assign some offsets
|
|
assigner.AssignBatchOffsets(partition, 10)
|
|
|
|
// High water mark should be updated
|
|
hwm, err = assigner.GetHighWaterMark(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get high water mark after assignment: %v", err)
|
|
}
|
|
if hwm != 10 {
|
|
t.Errorf("Expected high water mark 10, got %d", hwm)
|
|
}
|
|
}
|
|
|
|
func TestPartitionKey(t *testing.T) {
|
|
partition1 := &schema_pb.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 0,
|
|
RangeStop: 31,
|
|
UnixTimeNs: 1234567890,
|
|
}
|
|
|
|
partition2 := &schema_pb.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 0,
|
|
RangeStop: 31,
|
|
UnixTimeNs: 1234567890,
|
|
}
|
|
|
|
partition3 := &schema_pb.Partition{
|
|
RingSize: 1024,
|
|
RangeStart: 32,
|
|
RangeStop: 63,
|
|
UnixTimeNs: 1234567890,
|
|
}
|
|
|
|
key1 := partitionKey(partition1)
|
|
key2 := partitionKey(partition2)
|
|
key3 := partitionKey(partition3)
|
|
|
|
// Same partitions should have same key
|
|
if key1 != key2 {
|
|
t.Errorf("Same partitions should have same key: %s vs %s", key1, key2)
|
|
}
|
|
|
|
// Different partitions should have different keys
|
|
if key1 == key3 {
|
|
t.Errorf("Different partitions should have different keys: %s vs %s", key1, key3)
|
|
}
|
|
}
|
|
|
|
func TestConcurrentOffsetAssignment(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
registry := NewPartitionOffsetRegistry(storage)
|
|
partition := createTestPartition()
|
|
|
|
const numGoroutines = 10
|
|
const offsetsPerGoroutine = 100
|
|
|
|
results := make(chan int64, numGoroutines*offsetsPerGoroutine)
|
|
|
|
// Start concurrent offset assignments
|
|
for i := 0; i < numGoroutines; i++ {
|
|
go func() {
|
|
for j := 0; j < offsetsPerGoroutine; j++ {
|
|
offset, err := registry.AssignOffset(partition)
|
|
if err != nil {
|
|
t.Errorf("Failed to assign offset: %v", err)
|
|
return
|
|
}
|
|
results <- offset
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Collect all results
|
|
offsets := make(map[int64]bool)
|
|
for i := 0; i < numGoroutines*offsetsPerGoroutine; i++ {
|
|
offset := <-results
|
|
if offsets[offset] {
|
|
t.Errorf("Duplicate offset assigned: %d", offset)
|
|
}
|
|
offsets[offset] = true
|
|
}
|
|
|
|
// Verify we got all expected offsets
|
|
expectedCount := numGoroutines * offsetsPerGoroutine
|
|
if len(offsets) != expectedCount {
|
|
t.Errorf("Expected %d unique offsets, got %d", expectedCount, len(offsets))
|
|
}
|
|
|
|
// Verify offsets are in expected range
|
|
for offset := range offsets {
|
|
if offset < 0 || offset >= int64(expectedCount) {
|
|
t.Errorf("Offset %d is out of expected range [0, %d)", offset, expectedCount)
|
|
}
|
|
}
|
|
}
|