diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index c0850f682..c24ac0384 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -103,7 +103,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. BrokerPartitionAssignment: assignment, stopChan: make(chan bool, 1), generation: generation, - inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024, true), + inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024), } job.wg.Add(1) go func(job *EachPartitionPublishJob) { diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go index cc33e35c0..6f5f79eb5 100644 --- a/weed/util/buffered_queue/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -23,13 +23,12 @@ type BufferedQueue[T any] struct { count int // Total number of items in the queue mutex sync.Mutex nodeCounter int - waitOnRead bool waitCond *sync.Cond isClosed bool } // NewBufferedQueue creates a new buffered queue with the specified chunk size -func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { +func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] { // Create an empty chunk to initialize head and tail chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0} bq := &BufferedQueue[T]{ @@ -39,7 +38,6 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { last: chunk, count: 0, mutex: sync.Mutex{}, - waitOnRead: waitOnRead, } bq.waitCond = sync.NewCond(&bq.mutex) return bq @@ -77,7 +75,7 @@ func (q *BufferedQueue[T]) Enqueue(job T) error { q.tail.items[q.tail.tailIndex] = job q.tail.tailIndex++ q.count++ - if q.waitOnRead { + if q.count == 1 { q.waitCond.Signal() } @@ -89,19 +87,12 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { q.mutex.Lock() defer q.mutex.Unlock() - if q.waitOnRead { - for q.count <= 0 && !q.isClosed { - q.waitCond.Wait() - } - if q.isClosed { - var a T - return a, false - } - } else { - if q.count == 0 { - var a T - return a, false - } + for q.count <= 0 && !q.isClosed { + q.waitCond.Wait() + } + if q.count <= 0 && q.isClosed { + var a T + return a, false } job := q.head.items[q.head.headIndex] diff --git a/weed/util/buffered_queue/buffered_queue_test.go b/weed/util/buffered_queue/buffered_queue_test.go index 0dc96a083..97c9f25a7 100644 --- a/weed/util/buffered_queue/buffered_queue_test.go +++ b/weed/util/buffered_queue/buffered_queue_test.go @@ -1,6 +1,9 @@ package buffered_queue -import "testing" +import ( + "sync" + "testing" +) func TestJobQueue(t *testing.T) { type Job[T any] struct { @@ -9,7 +12,7 @@ func TestJobQueue(t *testing.T) { Data T } - queue := NewBufferedQueue[Job[string]](2, false) // Chunk size of 5 + queue := NewBufferedQueue[Job[string]](2) // Chunk size of 5 queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"}) queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"}) @@ -77,6 +80,35 @@ func TestJobQueue(t *testing.T) { } +func TestJobQueueClose(t *testing.T) { + type Job[T any] struct { + ID int + Action string + Data T + } + + queue := NewBufferedQueue[Job[string]](2) + queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"}) + queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"}) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for data, ok := queue.Dequeue(); ok; data, ok = queue.Dequeue() { + println("dequeued", data.ID) + } + }() + + for i := 0; i < 5; i++ { + queue.Enqueue(Job[string]{ID: i + 3, Action: "task", Data: "data"}) + } + + queue.CloseInput() + wg.Wait() + +} + func BenchmarkBufferedQueue(b *testing.B) { type Job[T any] struct { ID int @@ -84,7 +116,7 @@ func BenchmarkBufferedQueue(b *testing.B) { Data T } - queue := NewBufferedQueue[Job[string]](1024, true) + queue := NewBufferedQueue[Job[string]](1024) for i := 0; i < b.N; i++ { queue.Enqueue(Job[string]{ID: i, Action: "task", Data: "data"})