From 5dc1362bdcdfdda355fa265313eea0f623733bfd Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 27 Jan 2024 16:12:49 -0800 Subject: [PATCH] close the input --- weed/util/buffered_queue/buffered_queue.go | 24 ++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go index fedfcee51..cc33e35c0 100644 --- a/weed/util/buffered_queue/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -1,6 +1,7 @@ package buffered_queue import ( + "fmt" "sync" ) @@ -24,6 +25,7 @@ type BufferedQueue[T any] struct { nodeCounter int waitOnRead bool waitCond *sync.Cond + isClosed bool } // NewBufferedQueue creates a new buffered queue with the specified chunk size @@ -44,7 +46,12 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { } // Enqueue adds a job to the queue -func (q *BufferedQueue[T]) Enqueue(job T) { +func (q *BufferedQueue[T]) Enqueue(job T) error { + + if q.isClosed { + return fmt.Errorf("queue is closed") + } + q.mutex.Lock() defer q.mutex.Unlock() @@ -73,6 +80,8 @@ func (q *BufferedQueue[T]) Enqueue(job T) { if q.waitOnRead { q.waitCond.Signal() } + + return nil } // Dequeue removes and returns a job from the queue @@ -81,9 +90,13 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { defer q.mutex.Unlock() if q.waitOnRead { - for q.count <= 0 { + for q.count <= 0 && !q.isClosed { q.waitCond.Wait() } + if q.isClosed { + var a T + return a, false + } } else { if q.count == 0 { var a T @@ -124,3 +137,10 @@ func (q *BufferedQueue[T]) Size() int { func (q *BufferedQueue[T]) IsEmpty() bool { return q.Size() == 0 } + +func (q *BufferedQueue[T]) CloseInput() { + q.mutex.Lock() + defer q.mutex.Unlock() + q.isClosed = true + q.waitCond.Broadcast() +}