diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 2d12a5bf2..bd124908e 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -3,6 +3,7 @@ package log_buffer import ( "bytes" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/proto" @@ -34,7 +35,7 @@ type LogBuffer struct { flushInterval time.Duration flushFn func(startTime, stopTime time.Time, buf []byte) notifyFn func() - isStopping bool + isStopping *atomic.Bool flushChan chan *dataToFlush lastTsNs int64 sync.RWMutex @@ -50,6 +51,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi flushFn: flushFn, notifyFn: notifyFn, flushChan: make(chan *dataToFlush, 256), + isStopping: new(atomic.Bool), } go lb.loopFlush() go lb.loopInterval() @@ -119,20 +121,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) } func (m *LogBuffer) IsStopping() bool { - m.RLock() - defer m.RUnlock() - - return m.isStopping + return m.isStopping.Load() } func (m *LogBuffer) Shutdown() { - m.Lock() - defer m.Unlock() - - if m.isStopping { + isAlreadyStopped := m.isStopping.Swap(true) + if isAlreadyStopped { return } - m.isStopping = true toFlush := m.copyToFlush() m.flushChan <- toFlush close(m.flushChan)