diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 7e80991f4..20d529239 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -117,7 +117,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lastReadTime = time.Unix(0, processedTsNs) } - lastReadTime, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, func() bool { + lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool { lock.Mutex.Lock() lock.cond.Wait() lock.Mutex.Unlock() diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 3688ae047..0540400a3 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() @@ -74,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, break } } + if isDone { + return nil + } time.Sleep(1127 * time.Millisecond) } @@ -127,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() @@ -142,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq break } } + if isDone { + return nil + } } return readInMemoryLogErr diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 7dcfe5f52..915b93bf4 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -27,7 +27,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { } receivedmessageCount := 0 - lb.LoopProcessLogData("test", startTime, func() bool { + lb.LoopProcessLogData("test", startTime, 0, func() bool { // stop if no more messages return false }, func(logEntry *filer_pb.LogEntry) error { diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 5a8e47070..36854cb74 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,10 +17,11 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startTreadTime + lastReadTime = startReadTime defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -35,7 +36,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) if err == ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) - return lastReadTime, ResumeFromDiskError + return lastReadTime, isDone, ResumeFromDiskError } // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) if bytesBuf == nil { @@ -50,7 +51,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) batchSize := 0 - var startReadTime time.Time for pos := 0; pos+4 < len(buf); { @@ -68,10 +68,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime pos += 4 + int(size) continue } - lastReadTime = time.Unix(0, logEntry.TsNs) - if startReadTime.IsZero() { - startReadTime = lastReadTime + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + isDone = true + return } + lastReadTime = time.Unix(0, logEntry.TsNs) if err = eachLogDataFn(logEntry); err != nil { return