From 16e3f2d5289ea27f7bcb4f71d3685393529ccd07 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 20 Aug 2023 12:46:15 -0700 Subject: [PATCH] fix log buffer test --- weed/util/log_buffer/log_buffer_test.go | 48 +++++++++++++++++-------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 2cc1c89b4..7aad0afab 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -1,8 +1,10 @@ package log_buffer import ( + "crypto/rand" "fmt" - "math/rand" + "io" + "sync" "testing" "time" @@ -10,33 +12,49 @@ import ( ) func TestNewLogBufferFirstBuffer(t *testing.T) { - lb := NewLogBuffer("test", time.Minute, func(startTime, stopTime time.Time, buf []byte) { - + flushInterval := time.Second + lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) { + fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) }, func() { - }) startTime := time.Now() messageSize := 1024 messageCount := 5000 + + receivedmessageCount := 0 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { + // stop if no more messages + return true + }, func(logEntry *filer_pb.LogEntry) error { + receivedmessageCount++ + if receivedmessageCount >= messageCount { + println("processed all messages") + return io.EOF + } + return nil + }) + + fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedmessageCount) + fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err) + if err != nil && err != io.EOF { + t.Errorf("unexpected error %v", err) + } + }() + var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) lb.AddToBuffer(nil, buf, 0) } - - receivedmessageCount := 0 - lb.LoopProcessLogData("test", startTime, 0, func() bool { - // stop if no more messages - return false - }, func(logEntry *filer_pb.LogEntry) error { - receivedmessageCount++ - return nil - }) + wg.Wait() if receivedmessageCount != messageCount { - fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount) + t.Errorf("expect %d messages, but got %d", messageCount, receivedmessageCount) } - }