diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index a900275b9..39109071e 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -49,11 +49,6 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) - } else { - if readInMemoryLogErr == log_buffer.ResumeFromDiskError { - time.Sleep(1127 * time.Millisecond) - continue - } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -66,6 +61,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index a6d94670a..c7ddf2d9c 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -189,7 +189,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu defer m.RUnlock() if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) { - return nil, ResumeFromDiskError + if time.Now().Sub(m.lastFlushTime) < m.flushInterval * 2 { + diff := m.lastFlushTime.Sub(lastReadTime) + glog.V(4).Infof("lastFlush:%v lastRead:%v diff:%v", m.lastFlushTime, lastReadTime, diff) + return nil, ResumeFromDiskError + } } /*