From 083d8e9ecede84ea359962136c28ebda0ba1323b Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 24 Dec 2021 22:38:22 -0800 Subject: [PATCH] add stream writer this should improve streaming write performance, which is common in many cases, e.g., copying large files. This is additional to improved random read write operations: https://github.com/chrislusf/seaweedfs/wiki/FUSE-Mount/_compare/3e69d193805c79802f4f8f6cc63269b7a9a911f3...19084d87918f297cac15e2471c19306176e0771f --- weed/filesys/dirty_pages_stream.go | 107 ++++++++++++++++ weed/filesys/dirty_pages_temp_file.go | 6 +- weed/filesys/filehandle.go | 16 ++- weed/filesys/page_writer.go | 32 +++-- .../page_writer/chunk_interval_list.go | 29 +++-- .../page_writer/chunked_file_writer.go | 11 +- .../page_writer/chunked_file_writer_test.go | 4 +- .../page_writer/chunked_stream_writer.go | 119 ++++++++++++++++++ .../page_writer/chunked_stream_writer_test.go | 33 +++++ weed/filesys/page_writer_pattern.go | 9 +- 10 files changed, 331 insertions(+), 35 deletions(-) create mode 100644 weed/filesys/dirty_pages_stream.go create mode 100644 weed/filesys/page_writer/chunked_stream_writer.go create mode 100644 weed/filesys/page_writer/chunked_stream_writer_test.go diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go new file mode 100644 index 000000000..2d57ee0bc --- /dev/null +++ b/weed/filesys/dirty_pages_stream.go @@ -0,0 +1,107 @@ +package filesys + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "sync" + "time" +) + +type StreamDirtyPages struct { + f *File + writeWaitGroup sync.WaitGroup + pageAddLock sync.Mutex + chunkAddLock sync.Mutex + lastErr error + collection string + replication string + chunkedStream *page_writer.ChunkedStreamWriter +} + +func newStreamDirtyPages(file *File, chunkSize int64) *StreamDirtyPages { + + dirtyPages := &StreamDirtyPages{ + f: file, + chunkedStream: page_writer.NewChunkedStreamWriter(chunkSize), + } + + dirtyPages.chunkedStream.SetSaveToStorageFunction(dirtyPages.saveChunkedFileIntevalToStorage) + + return dirtyPages +} + +func (pages *StreamDirtyPages) AddPage(offset int64, data []byte) { + + pages.pageAddLock.Lock() + defer pages.pageAddLock.Unlock() + + glog.V(4).Infof("%v stream AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) + if _, err := pages.chunkedStream.WriteAt(data, offset); err != nil { + pages.lastErr = err + } + + return +} + +func (pages *StreamDirtyPages) FlushData() error { + pages.saveChunkedFileToStorage() + pages.writeWaitGroup.Wait() + if pages.lastErr != nil { + return fmt.Errorf("flush data: %v", pages.lastErr) + } + pages.chunkedStream.Reset() + return nil +} + +func (pages *StreamDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.chunkedStream.ReadDataAt(data, startOffset) +} + +func (pages *StreamDirtyPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} + +func (pages *StreamDirtyPages) saveChunkedFileToStorage() { + + pages.chunkedStream.FlushAll() + +} + +func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { + + mtime := time.Now().UnixNano() + pages.writeWaitGroup.Add(1) + writer := func() { + defer pages.writeWaitGroup.Done() + defer cleanupFn() + + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + pages.lastErr = err + return + } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + pages.f.addChunks([]*filer_pb.FileChunk{chunk}) + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) + pages.chunkAddLock.Unlock() + + cleanupFn() + } + + if pages.f.wfs.concurrentWriters != nil { + pages.f.wfs.concurrentWriters.Execute(writer) + } else { + go writer() + } + +} + +func (pages StreamDirtyPages) Destroy() { + pages.chunkedStream.Reset() +} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index a207eeb38..e0c3a91de 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -37,6 +37,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { pages.pageAddLock.Lock() defer pages.pageAddLock.Unlock() + glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { pages.lastErr = err } @@ -50,6 +51,7 @@ func (pages *TempFileDirtyPages) FlushData() error { if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } + pages.chunkedFile.Reset() return nil } @@ -65,7 +67,7 @@ func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) - pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize, interval.Size()) + pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) }) } @@ -100,5 +102,5 @@ func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reade } func (pages TempFileDirtyPages) Destroy() { - pages.chunkedFile.Destroy() + pages.chunkedFile.Reset() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index a551e6e10..e25437fd3 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -27,6 +27,7 @@ type FileHandle struct { contentType string handle uint64 sync.Mutex + sync.WaitGroup f *File RequestId fuse.RequestID // unique ID for request @@ -41,7 +42,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { fh := &FileHandle{ f: file, // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newPageWriter(file, 2*1024*1024), + dirtyPages: newPageWriter(file, file.wfs.option.CacheSizeMB*1024*1024), Uid: uid, Gid: gid, } @@ -63,6 +64,9 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { + fh.Add(1) + defer fh.Done() + fh.Lock() defer fh.Unlock() @@ -170,6 +174,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + fh.Add(1) + defer fh.Done() + fh.Lock() defer fh.Unlock() @@ -209,8 +216,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen) - fh.Lock() - defer fh.Unlock() + fh.Wait() fh.f.wfs.handlesLock.Lock() fh.f.isOpen-- @@ -243,6 +249,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } + fh.Add(1) + defer fh.Done() + fh.Lock() defer fh.Unlock() @@ -251,7 +260,6 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return err } - glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle) return nil } diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 560ae052d..bdcbc0fbc 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -24,19 +24,21 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter { pw := &PageWriter{ f: file, chunkSize: chunkSize, + writerPattern: NewWriterPattern(chunkSize), randomWriter: newTempFileDirtyPages(file, chunkSize), - streamWriter: newContinuousDirtyPages(file), - writerPattern: NewWriterPattern(file.Name, chunkSize), + streamWriter: newStreamDirtyPages(file, chunkSize), + //streamWriter: newContinuousDirtyPages(file), + //streamWriter: nil, } return pw } func (pw *PageWriter) AddPage(offset int64, data []byte) { - glog.V(4).Infof("AddPage %v [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) - pw.writerPattern.MonitorWriteAt(offset, len(data)) + glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) @@ -48,7 +50,7 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) { func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { if chunkIndex > 0 { - if pw.writerPattern.IsStreamingMode() { + if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { pw.streamWriter.AddPage(offset, data) return } @@ -57,8 +59,11 @@ func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { } func (pw *PageWriter) FlushData() error { - if err := pw.streamWriter.FlushData(); err != nil { - return err + pw.writerPattern.Reset() + if pw.streamWriter != nil { + if err := pw.streamWriter.FlushData(); err != nil { + return err + } } return pw.randomWriter.FlushData() } @@ -70,10 +75,12 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) + if pw.streamWriter != nil { + m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) + maxStop = max(maxStop, m1) + } m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - - maxStop = max(maxStop, max(m1, m2)) + maxStop = max(maxStop, m2) offset += readSize data = data[readSize:] @@ -83,13 +90,16 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - if pw.writerPattern.IsStreamingMode() { + if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { return pw.streamWriter.GetStorageOptions() } return pw.randomWriter.GetStorageOptions() } func (pw *PageWriter) Destroy() { + if pw.streamWriter != nil { + pw.streamWriter.Destroy() + } pw.randomWriter.Destroy() } diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go index 9c518192f..dca9a1740 100644 --- a/weed/filesys/page_writer/chunk_interval_list.go +++ b/weed/filesys/page_writer/chunk_interval_list.go @@ -4,14 +4,18 @@ import "math" // ChunkWrittenInterval mark one written interval within one page chunk type ChunkWrittenInterval struct { - startOffset int64 + StartOffset int64 stopOffset int64 prev *ChunkWrittenInterval next *ChunkWrittenInterval } func (interval *ChunkWrittenInterval) Size() int64 { - return interval.stopOffset - interval.startOffset + return interval.stopOffset - interval.StartOffset +} + +func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool { + return interval.stopOffset-interval.StartOffset == chunkSize } // ChunkWrittenIntervalList mark written intervals within one page chunk @@ -23,11 +27,11 @@ type ChunkWrittenIntervalList struct { func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { list := &ChunkWrittenIntervalList{ head: &ChunkWrittenInterval{ - startOffset: -1, + StartOffset: -1, stopOffset: -1, }, tail: &ChunkWrittenInterval{ - startOffset: math.MaxInt64, + StartOffset: math.MaxInt64, stopOffset: math.MaxInt64, }, } @@ -38,35 +42,40 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { interval := &ChunkWrittenInterval{ - startOffset: startOffset, + StartOffset: startOffset, stopOffset: stopOffset, } list.addInterval(interval) } + +func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool { + return list.size() == 1 && list.head.next.isComplete(chunkSize) +} + func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { p := list.head - for ; p.next != nil && p.next.startOffset <= interval.startOffset; p = p.next { + for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next { } q := list.tail for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { } - if interval.startOffset <= p.stopOffset && q.startOffset <= interval.stopOffset { + if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { // merge p and q together p.stopOffset = q.stopOffset unlinkNodesBetween(p, q.next) return } - if interval.startOffset <= p.stopOffset { + if interval.StartOffset <= p.stopOffset { // merge new interval into p p.stopOffset = interval.stopOffset unlinkNodesBetween(p, q) return } - if q.startOffset <= interval.stopOffset { + if q.StartOffset <= interval.stopOffset { // merge new interval into q - q.startOffset = interval.startOffset + q.StartOffset = interval.StartOffset unlinkNodesBetween(p, q) return } diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go index 14c034900..b0e1c2844 100644 --- a/weed/filesys/page_writer/chunked_file_writer.go +++ b/weed/filesys/page_writer/chunked_file_writer.go @@ -64,7 +64,7 @@ func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) if chunkUsage != nil { for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset) + logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset) logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) if logicStart < logicStop { actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize @@ -110,11 +110,16 @@ func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, log } } } -func (cw *ChunkedFileWriter) Destroy() { + +// Reset releases used resources +func (cw *ChunkedFileWriter) Reset() { if cw.file != nil { cw.file.Close() os.Remove(cw.file.Name()) + cw.file = nil } + cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex) + cw.chunkUsages = cw.chunkUsages[:0] } type FileIntervalReader struct { @@ -134,7 +139,7 @@ func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkInde } return &FileIntervalReader{ f: cw.file, - startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset, + startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset, stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, position: 0, } diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go index 1c72c77d4..244ed62c3 100644 --- a/weed/filesys/page_writer/chunked_file_writer_test.go +++ b/weed/filesys/page_writer/chunked_file_writer_test.go @@ -35,9 +35,9 @@ func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { func TestWriteChunkedFile(t *testing.T) { x := NewChunkedFileWriter(os.TempDir(), 20) - defer x.Destroy() + defer x.Reset() y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Destroy() + defer y.Reset() batchSize := 4 buf := make([]byte, batchSize) diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go new file mode 100644 index 000000000..b4314e78f --- /dev/null +++ b/weed/filesys/page_writer/chunked_stream_writer.go @@ -0,0 +1,119 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "io" + "sync" + "sync/atomic" +) + +type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) + +// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode +type ChunkedStreamWriter struct { + activeChunks map[LogicChunkIndex]*MemChunk + activeChunksLock sync.Mutex + ChunkSize int64 + saveToStorageFn SaveToStorageFunc + sync.Mutex +} + +type MemChunk struct { + buf []byte + usage *ChunkWrittenIntervalList +} + +var _ = io.WriterAt(&ChunkedStreamWriter{}) + +func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter { + return &ChunkedStreamWriter{ + ChunkSize: chunkSize, + activeChunks: make(map[LogicChunkIndex]*MemChunk), + } +} + +func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) { + cw.saveToStorageFn = saveToStorageFn +} + +func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { + cw.Lock() + defer cw.Unlock() + + logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + offsetRemainder := off % cw.ChunkSize + + memChunk, found := cw.activeChunks[logicChunkIndex] + if !found { + memChunk = &MemChunk{ + buf: mem.Allocate(int(cw.ChunkSize)), + usage: newChunkWrittenIntervalList(), + } + cw.activeChunks[logicChunkIndex] = memChunk + } + n = copy(memChunk.buf[offsetRemainder:], p) + memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) + if memChunk.usage.IsComplete(cw.ChunkSize) { + if cw.saveToStorageFn != nil { + cw.saveOneChunk(memChunk, logicChunkIndex) + delete(cw.activeChunks, logicChunkIndex) + } + } + + return +} + +func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { + cw.Lock() + defer cw.Unlock() + + logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize + memChunk, found := cw.activeChunks[logicChunkIndex] + if !found { + return + } + + for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { + logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset) + logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) + if logicStart < logicStop { + copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (cw *ChunkedStreamWriter) FlushAll() { + cw.Lock() + defer cw.Unlock() + for logicChunkIndex, memChunk := range cw.activeChunks { + if cw.saveToStorageFn != nil { + cw.saveOneChunk(memChunk, logicChunkIndex) + delete(cw.activeChunks, logicChunkIndex) + } + } +} + +func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + var referenceCounter = int32(memChunk.usage.size()) + for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { + reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) + cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { + atomic.AddInt32(&referenceCounter, -1) + if atomic.LoadInt32(&referenceCounter) == 0 { + mem.Free(memChunk.buf) + } + }) + } +} + +// Reset releases used resources +func (cw *ChunkedStreamWriter) Reset() { + for t, memChunk := range cw.activeChunks { + mem.Free(memChunk.buf) + delete(cw.activeChunks, t) + } +} diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go new file mode 100644 index 000000000..3c55a91ad --- /dev/null +++ b/weed/filesys/page_writer/chunked_stream_writer_test.go @@ -0,0 +1,33 @@ +package page_writer + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestWriteChunkedStream(t *testing.T) { + x := NewChunkedStreamWriter(20) + defer x.Reset() + y := NewChunkedFileWriter(os.TempDir(), 12) + defer y.Reset() + + batchSize := 4 + buf := make([]byte, batchSize) + for i := 0; i < 256; i++ { + for x := 0; x < batchSize; x++ { + buf[x] = byte(i) + } + x.WriteAt(buf, int64(i*batchSize)) + y.WriteAt(buf, int64((255-i)*batchSize)) + } + + a := make([]byte, 1) + b := make([]byte, 1) + for i := 0; i < 256*batchSize; i++ { + x.ReadDataAt(a, int64(i)) + y.ReadDataAt(b, int64(256*batchSize-1-i)) + assert.Equal(t, a[0], b[0], "same read") + } + +} diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go index 44b69cda7..51c63d472 100644 --- a/weed/filesys/page_writer_pattern.go +++ b/weed/filesys/page_writer_pattern.go @@ -4,19 +4,17 @@ type WriterPattern struct { isStreaming bool lastWriteOffset int64 chunkSize int64 - fileName string } // For streaming write: only cache the first chunk // For random write: fall back to temp file approach // writes can only change from streaming mode to non-streaming mode -func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern { +func NewWriterPattern(chunkSize int64) *WriterPattern { return &WriterPattern{ isStreaming: true, lastWriteOffset: -1, chunkSize: chunkSize, - fileName: fileName, } } @@ -39,3 +37,8 @@ func (rp *WriterPattern) IsStreamingMode() bool { func (rp *WriterPattern) IsRandomMode() bool { return !rp.isStreaming } + +func (rp *WriterPattern) Reset() { + rp.isStreaming = true + rp.lastWriteOffset = -1 +}