mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Add OffsetSubscriber for managing offset-based subscriptions - Implement OffsetSubscription with seeking, lag tracking, and range operations - Add OffsetSeeker for offset validation and range utilities - Create SMQOffsetIntegration for bridging offset management with SMQ broker - Support all OffsetType variants: EXACT_OFFSET, RESET_TO_OFFSET, RESET_TO_EARLIEST, RESET_TO_LATEST - Implement subscription lifecycle: create, seek, advance, close - Add comprehensive offset validation and error handling - Support batch record publishing and subscription - Add offset metrics and partition information APIs - Include extensive test coverage for all subscription scenarios: - Basic subscription creation and record consumption - Offset seeking and range operations - Subscription lag tracking and end-of-stream detection - Empty partition handling and error conditions - Integration with offset assignment and high water marks - All 40+ tests pass, providing robust offset-based messaging foundation
537 lines
15 KiB
Go
537 lines
15 KiB
Go
package offset
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
func TestSMQOffsetIntegration_PublishRecord(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish a single record
|
|
response, err := integration.PublishRecord(
|
|
partition,
|
|
[]byte("test-key"),
|
|
&schema_pb.RecordValue{},
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish record: %v", err)
|
|
}
|
|
|
|
if response.Error != "" {
|
|
t.Errorf("Expected no error, got: %s", response.Error)
|
|
}
|
|
|
|
if response.BaseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
|
|
}
|
|
|
|
if response.LastOffset != 0 {
|
|
t.Errorf("Expected last offset 0, got %d", response.LastOffset)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Create batch of records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
|
|
// Publish batch
|
|
response, err := integration.PublishRecordBatch(partition, records)
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish record batch: %v", err)
|
|
}
|
|
|
|
if response.Error != "" {
|
|
t.Errorf("Expected no error, got: %s", response.Error)
|
|
}
|
|
|
|
if response.BaseOffset != 0 {
|
|
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
|
|
}
|
|
|
|
if response.LastOffset != 2 {
|
|
t.Errorf("Expected last offset 2, got %d", response.LastOffset)
|
|
}
|
|
|
|
// Verify high water mark
|
|
hwm, err := integration.GetHighWaterMark(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get high water mark: %v", err)
|
|
}
|
|
|
|
if hwm != 3 {
|
|
t.Errorf("Expected high water mark 3, got %d", hwm)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish empty batch
|
|
response, err := integration.PublishRecordBatch(partition, []PublishRecordRequest{})
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish empty batch: %v", err)
|
|
}
|
|
|
|
if response.Error == "" {
|
|
t.Error("Expected error for empty batch")
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish some records first
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Create subscription
|
|
sub, err := integration.CreateSubscription(
|
|
"test-sub",
|
|
partition,
|
|
schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
0,
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
if sub.ID != "test-sub" {
|
|
t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
|
|
}
|
|
|
|
if sub.StartOffset != 0 {
|
|
t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish some records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Create subscription
|
|
sub, err := integration.CreateSubscription(
|
|
"test-sub",
|
|
partition,
|
|
schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
0,
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
// Subscribe to records
|
|
responses, err := integration.SubscribeRecords(sub, 2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe to records: %v", err)
|
|
}
|
|
|
|
if len(responses) != 2 {
|
|
t.Errorf("Expected 2 responses, got %d", len(responses))
|
|
}
|
|
|
|
// Check offset progression
|
|
if responses[0].Offset != 0 {
|
|
t.Errorf("Expected first record offset 0, got %d", responses[0].Offset)
|
|
}
|
|
|
|
if responses[1].Offset != 1 {
|
|
t.Errorf("Expected second record offset 1, got %d", responses[1].Offset)
|
|
}
|
|
|
|
// Check subscription advancement
|
|
if sub.CurrentOffset != 2 {
|
|
t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Create subscription on empty partition
|
|
sub, err := integration.CreateSubscription(
|
|
"empty-sub",
|
|
partition,
|
|
schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
0,
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
// Subscribe to records (should return empty)
|
|
responses, err := integration.SubscribeRecords(sub, 10)
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe to empty partition: %v", err)
|
|
}
|
|
|
|
if len(responses) != 0 {
|
|
t.Errorf("Expected 0 responses from empty partition, got %d", len(responses))
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key4"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key5"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Create subscription
|
|
sub, err := integration.CreateSubscription(
|
|
"seek-sub",
|
|
partition,
|
|
schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
0,
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
// Seek to offset 3
|
|
err = integration.SeekSubscription("seek-sub", 3)
|
|
if err != nil {
|
|
t.Fatalf("Failed to seek subscription: %v", err)
|
|
}
|
|
|
|
if sub.CurrentOffset != 3 {
|
|
t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset)
|
|
}
|
|
|
|
// Subscribe from new position
|
|
responses, err := integration.SubscribeRecords(sub, 2)
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe after seek: %v", err)
|
|
}
|
|
|
|
if len(responses) != 2 {
|
|
t.Errorf("Expected 2 responses after seek, got %d", len(responses))
|
|
}
|
|
|
|
if responses[0].Offset != 3 {
|
|
t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Create subscription at offset 1
|
|
sub, err := integration.CreateSubscription(
|
|
"lag-sub",
|
|
partition,
|
|
schema_pb.OffsetType_EXACT_OFFSET,
|
|
1,
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
// Get lag
|
|
lag, err := integration.GetSubscriptionLag("lag-sub")
|
|
if err != nil {
|
|
t.Fatalf("Failed to get subscription lag: %v", err)
|
|
}
|
|
|
|
expectedLag := int64(3 - 1) // hwm - current
|
|
if lag != expectedLag {
|
|
t.Errorf("Expected lag %d, got %d", expectedLag, lag)
|
|
}
|
|
|
|
// Advance subscription and check lag again
|
|
integration.SubscribeRecords(sub, 1)
|
|
|
|
lag, err = integration.GetSubscriptionLag("lag-sub")
|
|
if err != nil {
|
|
t.Fatalf("Failed to get lag after advance: %v", err)
|
|
}
|
|
|
|
expectedLag = int64(3 - 2) // hwm - current
|
|
if lag != expectedLag {
|
|
t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Create subscription
|
|
_, err := integration.CreateSubscription(
|
|
"close-sub",
|
|
partition,
|
|
schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
0,
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create subscription: %v", err)
|
|
}
|
|
|
|
// Close subscription
|
|
err = integration.CloseSubscription("close-sub")
|
|
if err != nil {
|
|
t.Fatalf("Failed to close subscription: %v", err)
|
|
}
|
|
|
|
// Try to get lag (should fail)
|
|
_, err = integration.GetSubscriptionLag("close-sub")
|
|
if err == nil {
|
|
t.Error("Expected error when getting lag for closed subscription")
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Publish some records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Test valid range
|
|
err := integration.ValidateOffsetRange(partition, 0, 2)
|
|
if err != nil {
|
|
t.Errorf("Valid range should not return error: %v", err)
|
|
}
|
|
|
|
// Test invalid range (beyond hwm)
|
|
err = integration.ValidateOffsetRange(partition, 0, 5)
|
|
if err == nil {
|
|
t.Error("Expected error for range beyond high water mark")
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Test empty partition
|
|
offsetRange, err := integration.GetAvailableOffsetRange(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get available range for empty partition: %v", err)
|
|
}
|
|
|
|
if offsetRange.Count != 0 {
|
|
t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
|
|
}
|
|
|
|
// Publish records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Test with data
|
|
offsetRange, err = integration.GetAvailableOffsetRange(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get available range: %v", err)
|
|
}
|
|
|
|
if offsetRange.StartOffset != 0 {
|
|
t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
|
|
}
|
|
|
|
if offsetRange.EndOffset != 1 {
|
|
t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset)
|
|
}
|
|
|
|
if offsetRange.Count != 2 {
|
|
t.Errorf("Expected count 2, got %d", offsetRange.Count)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Initial metrics
|
|
metrics := integration.GetOffsetMetrics()
|
|
if metrics.TotalOffsets != 0 {
|
|
t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
|
|
}
|
|
|
|
if metrics.ActiveSubscriptions != 0 {
|
|
t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions)
|
|
}
|
|
|
|
// Publish records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Create subscriptions
|
|
integration.CreateSubscription("sub1", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
|
|
integration.CreateSubscription("sub2", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
|
|
|
|
// Check updated metrics
|
|
metrics = integration.GetOffsetMetrics()
|
|
if metrics.TotalOffsets != 2 {
|
|
t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets)
|
|
}
|
|
|
|
if metrics.ActiveSubscriptions != 2 {
|
|
t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions)
|
|
}
|
|
|
|
if metrics.PartitionCount != 1 {
|
|
t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Test non-existent offset
|
|
info, err := integration.GetOffsetInfo(partition, 0)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get offset info: %v", err)
|
|
}
|
|
|
|
if info.Exists {
|
|
t.Error("Offset should not exist in empty partition")
|
|
}
|
|
|
|
// Publish record
|
|
integration.PublishRecord(partition, []byte("key1"), &schema_pb.RecordValue{})
|
|
|
|
// Test existing offset
|
|
info, err = integration.GetOffsetInfo(partition, 0)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get offset info for existing offset: %v", err)
|
|
}
|
|
|
|
if !info.Exists {
|
|
t.Error("Offset should exist after publishing")
|
|
}
|
|
|
|
if info.Offset != 0 {
|
|
t.Errorf("Expected offset 0, got %d", info.Offset)
|
|
}
|
|
}
|
|
|
|
func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) {
|
|
storage := NewInMemoryOffsetStorage()
|
|
integration := NewSMQOffsetIntegration(storage)
|
|
partition := createTestPartition()
|
|
|
|
// Test empty partition
|
|
info, err := integration.GetPartitionOffsetInfo(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get partition offset info: %v", err)
|
|
}
|
|
|
|
if info.EarliestOffset != 0 {
|
|
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
|
|
}
|
|
|
|
if info.LatestOffset != -1 {
|
|
t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
|
|
}
|
|
|
|
if info.HighWaterMark != 0 {
|
|
t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark)
|
|
}
|
|
|
|
if info.RecordCount != 0 {
|
|
t.Errorf("Expected record count 0, got %d", info.RecordCount)
|
|
}
|
|
|
|
// Publish records
|
|
records := []PublishRecordRequest{
|
|
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
|
|
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
|
|
}
|
|
integration.PublishRecordBatch(partition, records)
|
|
|
|
// Create subscription
|
|
integration.CreateSubscription("test-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
|
|
|
|
// Test with data
|
|
info, err = integration.GetPartitionOffsetInfo(partition)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get partition offset info with data: %v", err)
|
|
}
|
|
|
|
if info.EarliestOffset != 0 {
|
|
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
|
|
}
|
|
|
|
if info.LatestOffset != 2 {
|
|
t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
|
|
}
|
|
|
|
if info.HighWaterMark != 3 {
|
|
t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
|
|
}
|
|
|
|
if info.RecordCount != 3 {
|
|
t.Errorf("Expected record count 3, got %d", info.RecordCount)
|
|
}
|
|
|
|
if info.ActiveSubscriptions != 1 {
|
|
t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions)
|
|
}
|
|
}
|