1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/offset/integration_test.go
chrislu 82fb366968 Phase 3: Implement offset-based subscription and SMQ integration
- Add OffsetSubscriber for managing offset-based subscriptions
- Implement OffsetSubscription with seeking, lag tracking, and range operations
- Add OffsetSeeker for offset validation and range utilities
- Create SMQOffsetIntegration for bridging offset management with SMQ broker
- Support all OffsetType variants: EXACT_OFFSET, RESET_TO_OFFSET, RESET_TO_EARLIEST, RESET_TO_LATEST
- Implement subscription lifecycle: create, seek, advance, close
- Add comprehensive offset validation and error handling
- Support batch record publishing and subscription
- Add offset metrics and partition information APIs
- Include extensive test coverage for all subscription scenarios:
  - Basic subscription creation and record consumption
  - Offset seeking and range operations
  - Subscription lag tracking and end-of-stream detection
  - Empty partition handling and error conditions
  - Integration with offset assignment and high water marks
- All 40+ tests pass, providing robust offset-based messaging foundation
2025-09-12 00:30:19 -07:00

537 lines
15 KiB
Go

package offset
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestSMQOffsetIntegration_PublishRecord(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish a single record
response, err := integration.PublishRecord(
partition,
[]byte("test-key"),
&schema_pb.RecordValue{},
)
if err != nil {
t.Fatalf("Failed to publish record: %v", err)
}
if response.Error != "" {
t.Errorf("Expected no error, got: %s", response.Error)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 0 {
t.Errorf("Expected last offset 0, got %d", response.LastOffset)
}
}
func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create batch of records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
// Publish batch
response, err := integration.PublishRecordBatch(partition, records)
if err != nil {
t.Fatalf("Failed to publish record batch: %v", err)
}
if response.Error != "" {
t.Errorf("Expected no error, got: %s", response.Error)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 2 {
t.Errorf("Expected last offset 2, got %d", response.LastOffset)
}
// Verify high water mark
hwm, err := integration.GetHighWaterMark(partition)
if err != nil {
t.Fatalf("Failed to get high water mark: %v", err)
}
if hwm != 3 {
t.Errorf("Expected high water mark 3, got %d", hwm)
}
}
func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish empty batch
response, err := integration.PublishRecordBatch(partition, []PublishRecordRequest{})
if err != nil {
t.Fatalf("Failed to publish empty batch: %v", err)
}
if response.Error == "" {
t.Error("Expected error for empty batch")
}
}
func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records first
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"test-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
if sub.ID != "test-sub" {
t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
}
if sub.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
}
}
func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"test-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe to records: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses, got %d", len(responses))
}
// Check offset progression
if responses[0].Offset != 0 {
t.Errorf("Expected first record offset 0, got %d", responses[0].Offset)
}
if responses[1].Offset != 1 {
t.Errorf("Expected second record offset 1, got %d", responses[1].Offset)
}
// Check subscription advancement
if sub.CurrentOffset != 2 {
t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset)
}
}
func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create subscription on empty partition
sub, err := integration.CreateSubscription(
"empty-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records (should return empty)
responses, err := integration.SubscribeRecords(sub, 10)
if err != nil {
t.Fatalf("Failed to subscribe to empty partition: %v", err)
}
if len(responses) != 0 {
t.Errorf("Expected 0 responses from empty partition, got %d", len(responses))
}
}
func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key4"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key5"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"seek-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Seek to offset 3
err = integration.SeekSubscription("seek-sub", 3)
if err != nil {
t.Fatalf("Failed to seek subscription: %v", err)
}
if sub.CurrentOffset != 3 {
t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset)
}
// Subscribe from new position
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe after seek: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses after seek, got %d", len(responses))
}
if responses[0].Offset != 3 {
t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset)
}
}
func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription at offset 1
sub, err := integration.CreateSubscription(
"lag-sub",
partition,
schema_pb.OffsetType_EXACT_OFFSET,
1,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Get lag
lag, err := integration.GetSubscriptionLag("lag-sub")
if err != nil {
t.Fatalf("Failed to get subscription lag: %v", err)
}
expectedLag := int64(3 - 1) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d, got %d", expectedLag, lag)
}
// Advance subscription and check lag again
integration.SubscribeRecords(sub, 1)
lag, err = integration.GetSubscriptionLag("lag-sub")
if err != nil {
t.Fatalf("Failed to get lag after advance: %v", err)
}
expectedLag = int64(3 - 2) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
}
}
func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create subscription
_, err := integration.CreateSubscription(
"close-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Close subscription
err = integration.CloseSubscription("close-sub")
if err != nil {
t.Fatalf("Failed to close subscription: %v", err)
}
// Try to get lag (should fail)
_, err = integration.GetSubscriptionLag("close-sub")
if err == nil {
t.Error("Expected error when getting lag for closed subscription")
}
}
func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Test valid range
err := integration.ValidateOffsetRange(partition, 0, 2)
if err != nil {
t.Errorf("Valid range should not return error: %v", err)
}
// Test invalid range (beyond hwm)
err = integration.ValidateOffsetRange(partition, 0, 5)
if err == nil {
t.Error("Expected error for range beyond high water mark")
}
}
func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test empty partition
offsetRange, err := integration.GetAvailableOffsetRange(partition)
if err != nil {
t.Fatalf("Failed to get available range for empty partition: %v", err)
}
if offsetRange.Count != 0 {
t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Test with data
offsetRange, err = integration.GetAvailableOffsetRange(partition)
if err != nil {
t.Fatalf("Failed to get available range: %v", err)
}
if offsetRange.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 1 {
t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset)
}
if offsetRange.Count != 2 {
t.Errorf("Expected count 2, got %d", offsetRange.Count)
}
}
func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Initial metrics
metrics := integration.GetOffsetMetrics()
if metrics.TotalOffsets != 0 {
t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
}
if metrics.ActiveSubscriptions != 0 {
t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscriptions
integration.CreateSubscription("sub1", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
integration.CreateSubscription("sub2", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
// Check updated metrics
metrics = integration.GetOffsetMetrics()
if metrics.TotalOffsets != 2 {
t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets)
}
if metrics.ActiveSubscriptions != 2 {
t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions)
}
if metrics.PartitionCount != 1 {
t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
}
}
func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test non-existent offset
info, err := integration.GetOffsetInfo(partition, 0)
if err != nil {
t.Fatalf("Failed to get offset info: %v", err)
}
if info.Exists {
t.Error("Offset should not exist in empty partition")
}
// Publish record
integration.PublishRecord(partition, []byte("key1"), &schema_pb.RecordValue{})
// Test existing offset
info, err = integration.GetOffsetInfo(partition, 0)
if err != nil {
t.Fatalf("Failed to get offset info for existing offset: %v", err)
}
if !info.Exists {
t.Error("Offset should exist after publishing")
}
if info.Offset != 0 {
t.Errorf("Expected offset 0, got %d", info.Offset)
}
}
func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test empty partition
info, err := integration.GetPartitionOffsetInfo(partition)
if err != nil {
t.Fatalf("Failed to get partition offset info: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != -1 {
t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
}
if info.HighWaterMark != 0 {
t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark)
}
if info.RecordCount != 0 {
t.Errorf("Expected record count 0, got %d", info.RecordCount)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
integration.CreateSubscription("test-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
// Test with data
info, err = integration.GetPartitionOffsetInfo(partition)
if err != nil {
t.Fatalf("Failed to get partition offset info with data: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != 2 {
t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
}
if info.HighWaterMark != 3 {
t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
}
if info.RecordCount != 3 {
t.Errorf("Expected record count 3, got %d", info.RecordCount)
}
if info.ActiveSubscriptions != 1 {
t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions)
}
}