diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index d8929f88f..4c3caec7e 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -16,7 +16,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) -const PaginationSize = 1024 * 256 +const ( + LogFlushInterval = time.Minute + PaginationSize = 1024 * 256 +) var ( OS_UID = uint32(os.Getuid()) @@ -47,7 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, GrpcDialOption: grpcDialOption, Signature: util.RandomInt32(), } - f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 5e6d625e0..19a7e70f0 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -68,7 +68,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica return } - f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data) + f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) } @@ -119,7 +119,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { - break + continue } return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) } diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go index 00fcf27d1..f2792bd26 100644 --- a/weed/filer2/meta_aggregator.go +++ b/weed/filer2/meta_aggregator.go @@ -25,13 +25,15 @@ type MetaAggregator struct { ListenersCond *sync.Cond } +// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. +// The old data comes from what each LocalMetadata persisted on disk. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { t := &MetaAggregator{ filers: filers, grpcDialOption: grpcDialOption, } t.ListenersCond = sync.NewCond(&t.ListenersLock) - t.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, nil, func() { + t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() { t.ListenersCond.Broadcast() }) return t @@ -48,7 +50,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) lastPersistTime := time.Now() changesSinceLastPersist := 0 - lastTsNs := int64(0) + lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() MaxChangeLimit := 100 @@ -88,7 +90,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data) + ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index dc11061af..154bf8a44 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -85,7 +85,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis continue } - tl.logBuffer.AddToBuffer(in.Data.Key, data) + tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) if in.Data.IsClose { // println("server received closing") diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d066014d1..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { - // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + if m.lastTsNs >= eventTsNs { + // this is unlikely to happen, but just in case + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) + } + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index f9ccc95c2..3d77afb18 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf) + lb.AddToBuffer(nil, buf, 0) } receivedmessageCount := 0