From 9274557552e3f3f4b6f7071c47ba8d7aa5c6f64a Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 23:23:49 -0800 Subject: [PATCH] keep dirty pages based on temp file --- weed/filesys/dirty_pages_temp_file.go | 112 ++++++++++++ weed/filesys/page_writer.go | 1 + .../page_writer/chunked_file_writer.go | 159 ++++++++++++++++++ .../page_writer/chunked_file_writer_test.go | 60 +++++++ 4 files changed, 332 insertions(+) create mode 100644 weed/filesys/dirty_pages_temp_file.go create mode 100644 weed/filesys/page_writer/chunked_file_writer.go create mode 100644 weed/filesys/page_writer/chunked_file_writer_test.go diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go new file mode 100644 index 000000000..12f5b5159 --- /dev/null +++ b/weed/filesys/dirty_pages_temp_file.go @@ -0,0 +1,112 @@ +package filesys + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "os" + "sync" + "time" +) + +type TempFileDirtyPages struct { + f *File + writeWaitGroup sync.WaitGroup + pageAddLock sync.Mutex + chunkAddLock sync.Mutex + lastErr error + collection string + replication string + chunkedFile *page_writer.ChunkedFileWriter +} + +func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { + + tempFile := &TempFileDirtyPages{ + f: file, + chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), + } + + return tempFile +} + +func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { + + pages.pageAddLock.Lock() + defer pages.pageAddLock.Unlock() + + glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) + if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { + pages.lastErr = err + } + + return +} + +func (pages *TempFileDirtyPages) FlushData() error { + pages.saveChunkedFileToStorage() + pages.writeWaitGroup.Wait() + if pages.lastErr != nil { + return fmt.Errorf("flush data: %v", pages.lastErr) + } + pages.chunkedFile.Reset() + return nil +} + +func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.chunkedFile.ReadDataAt(data, startOffset) +} + +func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} + +func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { + + pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { + reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) + pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) + }) + +} + +func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { + + mtime := time.Now().UnixNano() + pages.writeWaitGroup.Add(1) + writer := func() { + defer pages.writeWaitGroup.Done() + + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + pages.lastErr = err + return + } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + defer pages.chunkAddLock.Unlock() + pages.f.addChunks([]*filer_pb.FileChunk{chunk}) + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) + } + + if pages.f.wfs.concurrentWriters != nil { + pages.f.wfs.concurrentWriters.Execute(writer) + } else { + go writer() + } + +} + +func (pages TempFileDirtyPages) Destroy() { + pages.chunkedFile.Reset() +} + +func (pages *TempFileDirtyPages) LockForRead(startOffset, stopOffset int64) { +} + +func (pages *TempFileDirtyPages) UnlockForRead(startOffset, stopOffset int64) { +} diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index c6d08348d..b8e884f58 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -25,6 +25,7 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { chunkSize: chunkSize, writerPattern: NewWriterPattern(chunkSize), randomWriter: newMemoryChunkPages(fh, chunkSize), + // randomWriter: newTempFileDirtyPages(fh.f, chunkSize), } return pw } diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go new file mode 100644 index 000000000..4dad46c69 --- /dev/null +++ b/weed/filesys/page_writer/chunked_file_writer.go @@ -0,0 +1,159 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "io" + "os" + "sync" +) + +type ActualChunkIndex int + +// ChunkedFileWriter assumes the write requests will come in within chunks +type ChunkedFileWriter struct { + dir string + file *os.File + logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex + chunkUsages []*ChunkWrittenIntervalList + ChunkSize int64 + sync.Mutex +} + +var _ = io.WriterAt(&ChunkedFileWriter{}) + +func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { + return &ChunkedFileWriter{ + dir: dir, + file: nil, + logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), + ChunkSize: chunkSize, + } +} + +func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { + cw.Lock() + defer cw.Unlock() + + if cw.file == nil { + cw.file, err = os.CreateTemp(cw.dir, "") + if err != nil { + glog.Errorf("create temp file: %v", err) + return + } + } + + actualOffset, chunkUsage := cw.toActualWriteOffset(off) + n, err = cw.file.WriteAt(p, actualOffset) + if err == nil { + startOffset := off % cw.ChunkSize + chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) + } + return +} + +func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { + cw.Lock() + defer cw.Unlock() + + if cw.file == nil { + return + } + + logicChunkIndex := off / cw.ChunkSize + actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) + if chunkUsage != nil { + for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { + logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset) + logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) + if logicStart < logicStop { + actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize + _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) + if err != nil { + glog.Errorf("reading temp file: %v", err) + break + } + maxStop = max(maxStop, logicStop) + } + } + } + return +} + +func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { + logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) + offsetRemainder := logicOffset % cw.ChunkSize + existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if found { + return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] + } + cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages)) + chunkUsage = newChunkWrittenIntervalList() + cw.chunkUsages = append(cw.chunkUsages, chunkUsage) + return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage +} + +func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) { + logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) + existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if found { + return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] + } + return 0, nil +} + +func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) { + for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { + chunkUsage := cw.chunkUsages[actualChunkIndex] + for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { + process(cw.file, logicChunkIndex, t) + } + } +} + +// Reset releases used resources +func (cw *ChunkedFileWriter) Reset() { + if cw.file != nil { + cw.file.Close() + os.Remove(cw.file.Name()) + cw.file = nil + } + cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex) + cw.chunkUsages = cw.chunkUsages[:0] +} + +type FileIntervalReader struct { + f *os.File + startOffset int64 + stopOffset int64 + position int64 +} + +var _ = io.Reader(&FileIntervalReader{}) + +func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader { + actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if !found { + // this should never happen + return nil + } + return &FileIntervalReader{ + f: cw.file, + startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset, + stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, + position: 0, + } +} + +func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { + readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) + n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) + if err == nil || err == io.EOF { + fr.position += int64(n) + if fr.stopOffset-fr.startOffset-fr.position == 0 { + // return a tiny bit faster + err = io.EOF + return + } + } + return +} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go new file mode 100644 index 000000000..244ed62c3 --- /dev/null +++ b/weed/filesys/page_writer/chunked_file_writer_test.go @@ -0,0 +1,60 @@ +package page_writer + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestChunkedFileWriter_toActualOffset(t *testing.T) { + cw := NewChunkedFileWriter("", 16) + + writeToFile(cw, 50, 60) + writeToFile(cw, 60, 64) + + writeToFile(cw, 32, 40) + writeToFile(cw, 42, 48) + + writeToFile(cw, 48, 50) + + assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") + assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") + +} + +func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { + + _, chunkUsage := cw.toActualWriteOffset(startOffset) + + // skip doing actual writing + + innerOffset := startOffset % cw.ChunkSize + chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) + +} + +func TestWriteChunkedFile(t *testing.T) { + x := NewChunkedFileWriter(os.TempDir(), 20) + defer x.Reset() + y := NewChunkedFileWriter(os.TempDir(), 12) + defer y.Reset() + + batchSize := 4 + buf := make([]byte, batchSize) + for i := 0; i < 256; i++ { + for x := 0; x < batchSize; x++ { + buf[x] = byte(i) + } + x.WriteAt(buf, int64(i*batchSize)) + y.WriteAt(buf, int64((255-i)*batchSize)) + } + + a := make([]byte, 1) + b := make([]byte, 1) + for i := 0; i < 256*batchSize; i++ { + x.ReadDataAt(a, int64(i)) + y.ReadDataAt(b, int64(256*batchSize-1-i)) + assert.Equal(t, a[0], b[0], "same read") + } + +}