diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go deleted file mode 100644 index 2692c2950..000000000 --- a/weed/filesys/dirty_pages_continuous.go +++ /dev/null @@ -1,138 +0,0 @@ -package filesys - -import ( - "bytes" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "io" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type ContinuousDirtyPages struct { - intervals *page_writer.ContinuousIntervals - f *File - writeWaitGroup sync.WaitGroup - chunkAddLock sync.Mutex - lastErr error - collection string - replication string -} - -func newContinuousDirtyPages(file *File) *ContinuousDirtyPages { - dirtyPages := &ContinuousDirtyPages{ - intervals: &page_writer.ContinuousIntervals{}, - f: file, - } - return dirtyPages -} - -func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - - glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) - - if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { - // this is more than what buffer can hold. - pages.flushAndSave(offset, data) - } - - pages.intervals.AddInterval(data, offset) - - if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit { - pages.saveExistingLargestPageToStorage() - } - - return -} - -func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) { - - // flush existing - pages.saveExistingPagesToStorage() - - // flush the new page - pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))) - - return -} - -func (pages *ContinuousDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - return nil -} - -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() { - for pages.saveExistingLargestPageToStorage() { - } -} - -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) { - - maxList := pages.intervals.RemoveLargestIntervalLinkedList() - if maxList == nil { - return false - } - - entry := pages.f.getEntry() - if entry == nil { - return false - } - - fileSize := int64(entry.Attributes.FileSize) - - chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) - if chunkSize == 0 { - return false - } - - pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - - return true -} - -func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } -} - -func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.intervals.ReadDataAt(data, startOffset) -} - -func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} -func (pages ContinuousDirtyPages) Destroy() { -} diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 9313c4562..e6548d7be 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -21,6 +21,10 @@ type MemoryChunkPages struct { hasWrites bool } +var ( + _ = page_writer.DirtyPages(&MemoryChunkPages{}) +) + func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { dirtyPages := &MemoryChunkPages{ @@ -88,3 +92,10 @@ func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, func (pages MemoryChunkPages) Destroy() { pages.uploadPipeline.Shutdown() } + +func (pages *MemoryChunkPages) LockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.LockForRead(startOffset, stopOffset) +} +func (pages *MemoryChunkPages) UnlockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) +} diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go deleted file mode 100644 index 586b73698..000000000 --- a/weed/filesys/dirty_pages_stream.go +++ /dev/null @@ -1,106 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "sync" - "time" -) - -type StreamDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedStream *page_writer.ChunkedStreamWriter -} - -func newStreamDirtyPages(file *File, chunkSize int64) *StreamDirtyPages { - - dirtyPages := &StreamDirtyPages{ - f: file, - chunkedStream: page_writer.NewChunkedStreamWriter(chunkSize), - } - - dirtyPages.chunkedStream.SetSaveToStorageFunction(dirtyPages.saveChunkedFileIntevalToStorage) - - return dirtyPages -} - -func (pages *StreamDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v stream AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedStream.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *StreamDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedStream.Reset() - return nil -} - -func (pages *StreamDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedStream.ReadDataAt(data, startOffset) -} - -func (pages *StreamDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *StreamDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedStream.FlushAll() - -} - -func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - defer cleanupFn() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - pages.chunkAddLock.Unlock() - - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages StreamDirtyPages) Destroy() { - pages.chunkedStream.Reset() -} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go deleted file mode 100644 index e0c3a91de..000000000 --- a/weed/filesys/dirty_pages_temp_file.go +++ /dev/null @@ -1,106 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "os" - "sync" - "time" -) - -type TempFileDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedFile *page_writer.ChunkedFileWriter -} - -func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { - - tempFile := &TempFileDirtyPages{ - f: file, - chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), - } - - return tempFile -} - -func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *TempFileDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedFile.Reset() - return nil -} - -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedFile.ReadDataAt(data, startOffset) -} - -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { - reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) - pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) - }) - -} - -func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages TempFileDirtyPages) Destroy() { - pages.chunkedFile.Reset() -} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 8606ac0d2..d3b37a5b9 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -75,6 +75,8 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff = make([]byte, req.Size) } + fh.lockForRead(req.Offset, len(buff)) + defer fh.unlockForRead(req.Offset, len(buff)) totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil || err == io.EOF { maxStop := fh.readFromDirtyPages(buff, req.Offset) @@ -101,6 +103,13 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus return err } +func (fh *FileHandle) lockForRead(startOffset int64, size int) { + fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size)) +} +func (fh *FileHandle) unlockForRead(startOffset int64, size int) { + fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) +} + func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) return diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 90ef7d7c4..c6d08348d 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -13,7 +13,6 @@ type PageWriter struct { writerPattern *WriterPattern randomWriter page_writer.DirtyPages - streamWriter page_writer.DirtyPages } var ( @@ -44,22 +43,11 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) { } func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { - if chunkIndex > 0 { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - pw.streamWriter.AddPage(offset, data) - return - } - } pw.randomWriter.AddPage(offset, data) } func (pw *PageWriter) FlushData() error { pw.writerPattern.Reset() - if pw.streamWriter != nil { - if err := pw.streamWriter.FlushData(); err != nil { - return err - } - } return pw.randomWriter.FlushData() } @@ -70,12 +58,7 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - if pw.streamWriter != nil { - m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m1) - } - m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m2) + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) offset += readSize data = data[readSize:] @@ -85,16 +68,18 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - return pw.streamWriter.GetStorageOptions() - } return pw.randomWriter.GetStorageOptions() } +func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) { + pw.randomWriter.LockForRead(startOffset, stopOffset) +} + +func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) { + pw.randomWriter.UnlockForRead(startOffset, stopOffset) +} + func (pw *PageWriter) Destroy() { - if pw.streamWriter != nil { - pw.streamWriter.Destroy() - } pw.randomWriter.Destroy() } diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go deleted file mode 100644 index b0e1c2844..000000000 --- a/weed/filesys/page_writer/chunked_file_writer.go +++ /dev/null @@ -1,160 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "io" - "os" - "sync" -) - -type LogicChunkIndex int -type ActualChunkIndex int - -// ChunkedFileWriter assumes the write requests will come in within chunks -type ChunkedFileWriter struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - chunkUsages []*ChunkWrittenIntervalList - ChunkSize int64 - sync.Mutex -} - -var _ = io.WriterAt(&ChunkedFileWriter{}) - -func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { - return &ChunkedFileWriter{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - ChunkSize: chunkSize, - } -} - -func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - cw.file, err = os.CreateTemp(cw.dir, "") - if err != nil { - glog.Errorf("create temp file: %v", err) - return - } - } - - actualOffset, chunkUsage := cw.toActualWriteOffset(off) - n, err = cw.file.WriteAt(p, actualOffset) - if err == nil { - startOffset := off % cw.ChunkSize - chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) - } - return -} - -func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - return - } - - logicChunkIndex := off / cw.ChunkSize - actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) - if chunkUsage != nil { - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) - if logicStart < logicStop { - actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize - _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) - if err != nil { - glog.Errorf("reading temp file: %v", err) - break - } - maxStop = max(maxStop, logicStop) - } - } - } - return -} - -func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - offsetRemainder := logicOffset % cw.ChunkSize - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] - } - cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages)) - chunkUsage = newChunkWrittenIntervalList() - cw.chunkUsages = append(cw.chunkUsages, chunkUsage) - return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage -} - -func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] - } - return 0, nil -} - -func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) { - for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { - chunkUsage := cw.chunkUsages[actualChunkIndex] - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - process(cw.file, logicChunkIndex, t) - } - } -} - -// Reset releases used resources -func (cw *ChunkedFileWriter) Reset() { - if cw.file != nil { - cw.file.Close() - os.Remove(cw.file.Name()) - cw.file = nil - } - cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex) - cw.chunkUsages = cw.chunkUsages[:0] -} - -type FileIntervalReader struct { - f *os.File - startOffset int64 - stopOffset int64 - position int64 -} - -var _ = io.Reader(&FileIntervalReader{}) - -func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader { - actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if !found { - // this should never happen - return nil - } - return &FileIntervalReader{ - f: cw.file, - startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset, - stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, - position: 0, - } -} - -func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { - readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) - n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) - if err == nil || err == io.EOF { - fr.position += int64(n) - if fr.stopOffset-fr.startOffset-fr.position == 0 { - // return a tiny bit faster - err = io.EOF - return - } - } - return -} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go deleted file mode 100644 index 244ed62c3..000000000 --- a/weed/filesys/page_writer/chunked_file_writer_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestChunkedFileWriter_toActualOffset(t *testing.T) { - cw := NewChunkedFileWriter("", 16) - - writeToFile(cw, 50, 60) - writeToFile(cw, 60, 64) - - writeToFile(cw, 32, 40) - writeToFile(cw, 42, 48) - - writeToFile(cw, 48, 50) - - assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") - assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") - -} - -func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { - - _, chunkUsage := cw.toActualWriteOffset(startOffset) - - // skip doing actual writing - - innerOffset := startOffset % cw.ChunkSize - chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) - -} - -func TestWriteChunkedFile(t *testing.T) { - x := NewChunkedFileWriter(os.TempDir(), 20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go index b4314e78f..2f869ddb8 100644 --- a/weed/filesys/page_writer/chunked_stream_writer.go +++ b/weed/filesys/page_writer/chunked_stream_writer.go @@ -1,119 +1,12 @@ package page_writer import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" "io" - "sync" - "sync/atomic" ) type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) -// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode -type ChunkedStreamWriter struct { - activeChunks map[LogicChunkIndex]*MemChunk - activeChunksLock sync.Mutex - ChunkSize int64 - saveToStorageFn SaveToStorageFunc - sync.Mutex -} - type MemChunk struct { buf []byte usage *ChunkWrittenIntervalList } - -var _ = io.WriterAt(&ChunkedStreamWriter{}) - -func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter { - return &ChunkedStreamWriter{ - ChunkSize: chunkSize, - activeChunks: make(map[LogicChunkIndex]*MemChunk), - } -} - -func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) { - cw.saveToStorageFn = saveToStorageFn -} - -func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - offsetRemainder := off % cw.ChunkSize - - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - memChunk = &MemChunk{ - buf: mem.Allocate(int(cw.ChunkSize)), - usage: newChunkWrittenIntervalList(), - } - cw.activeChunks[logicChunkIndex] = memChunk - } - n = copy(memChunk.buf[offsetRemainder:], p) - memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) - if memChunk.usage.IsComplete(cw.ChunkSize) { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } - - return -} - -func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - return - } - - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (cw *ChunkedStreamWriter) FlushAll() { - cw.Lock() - defer cw.Unlock() - for logicChunkIndex, memChunk := range cw.activeChunks { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } -} - -func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - var referenceCounter = int32(memChunk.usage.size()) - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) - cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { - atomic.AddInt32(&referenceCounter, -1) - if atomic.LoadInt32(&referenceCounter) == 0 { - mem.Free(memChunk.buf) - } - }) - } -} - -// Reset releases used resources -func (cw *ChunkedStreamWriter) Reset() { - for t, memChunk := range cw.activeChunks { - mem.Free(memChunk.buf) - delete(cw.activeChunks, t) - } -} diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go deleted file mode 100644 index 3c55a91ad..000000000 --- a/weed/filesys/page_writer/chunked_stream_writer_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestWriteChunkedStream(t *testing.T) { - x := NewChunkedStreamWriter(20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/dirty_page_interval.go b/weed/filesys/page_writer/dirty_page_interval.go deleted file mode 100644 index 6d73b8cd7..000000000 --- a/weed/filesys/page_writer/dirty_page_interval.go +++ /dev/null @@ -1,222 +0,0 @@ -package page_writer - -import ( - "io" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type IntervalNode struct { - Data []byte - Offset int64 - Size int64 - Next *IntervalNode -} - -type IntervalLinkedList struct { - Head *IntervalNode - Tail *IntervalNode -} - -type ContinuousIntervals struct { - lists []*IntervalLinkedList -} - -func (list *IntervalLinkedList) Offset() int64 { - return list.Head.Offset -} -func (list *IntervalLinkedList) Size() int64 { - return list.Tail.Offset + list.Tail.Size - list.Head.Offset -} -func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { - // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - list.Tail.Next = node - list.Tail = node -} -func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { - // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) - node.Next = list.Head - list.Head = node -} - -func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { - t := list.Head - for { - - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart < nodeStop { - // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) - copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) - } - - if t.Next == nil { - break - } - t = t.Next - } -} - -func (c *ContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { - total += list.Size() - } - return -} - -func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { - var nodes []*IntervalNode - for t := list.Head; t != nil; t = t.Next { - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart >= nodeStop { - // skip non overlapping IntervalNode - continue - } - nodes = append(nodes, &IntervalNode{ - Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], - Offset: nodeStart, - Size: nodeStop - nodeStart, - Next: nil, - }) - } - for i := 1; i < len(nodes); i++ { - nodes[i-1].Next = nodes[i] - } - return &IntervalLinkedList{ - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } -} - -func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { - - interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} - - // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] - if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset { - lastSpan.addNodeToTail(interval) - return - } - } - - var newLists []*IntervalLinkedList - for _, list := range c.lists { - // if list is to the left of new interval, add to the new list - if list.Tail.Offset+list.Tail.Size <= interval.Offset { - newLists = append(newLists, list) - } - // if list is to the right of new interval, add to the new list - if interval.Offset+interval.Size <= list.Head.Offset { - newLists = append(newLists, list) - } - // if new interval overwrite the right part of the list - if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size { - // create a new list of the left part of existing list - newLists = append(newLists, subList(list, list.Offset(), interval.Offset)) - } - // if new interval overwrite the left part of the list - if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size { - // create a new list of the right part of existing list - newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size)) - } - // skip anything that is fully overwritten by the new interval - } - - c.lists = newLists - // add the new interval to the lists, connecting neighbor lists - var prevList, nextList *IntervalLinkedList - - for _, list := range c.lists { - if list.Head.Offset == interval.Offset+interval.Size { - nextList = list - break - } - } - - for _, list := range c.lists { - if list.Head.Offset+list.Size() == offset { - list.addNodeToTail(interval) - prevList = list - break - } - } - - if prevList != nil && nextList != nil { - // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) - prevList.Tail.Next = nextList.Head - prevList.Tail = nextList.Tail - c.removeList(nextList) - } else if nextList != nil { - // add to head was not done when checking - nextList.addNodeToHead(interval) - } - if prevList == nil && nextList == nil { - c.lists = append(c.lists, &IntervalLinkedList{ - Head: interval, - Tail: interval, - }) - } - - return -} - -func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { - var maxSize int64 - maxIndex := -1 - for k, list := range c.lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k - } - } - if maxSize <= 0 { - return nil - } - - t := c.lists[maxIndex] - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) - return t - -} - -func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { - index := -1 - for k, list := range c.lists { - if list.Offset() == target.Offset() { - index = k - } - } - if index < 0 { - return - } - - c.lists = append(c.lists[0:index], c.lists[index+1:]...) - -} - -func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { - start := max(startOffset, list.Offset()) - stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start < stop { - list.ReadData(data[start-startOffset:], start, stop) - maxStop = max(maxStop, stop) - } - } - return -} - -func (l *IntervalLinkedList) ToReader() io.Reader { - var readers []io.Reader - t := l.Head - readers = append(readers, util.NewBytesReader(t.Data)) - for t.Next != nil { - t = t.Next - readers = append(readers, util.NewBytesReader(t.Data)) - } - if len(readers) == 1 { - return readers[0] - } - return io.MultiReader(readers...) -} diff --git a/weed/filesys/page_writer/dirty_page_interval_test.go b/weed/filesys/page_writer/dirty_page_interval_test.go deleted file mode 100644 index 2a2a1df4d..000000000 --- a/weed/filesys/page_writer/dirty_page_interval_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package page_writer - -import ( - "bytes" - "math/rand" - "testing" -) - -func TestContinuousIntervals_AddIntervalAppend(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25 - c.AddInterval(getBytes(25, 3), 0) - // _, _, 23, 23, 23, 23 - c.AddInterval(getBytes(23, 4), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 23, 23) - -} - -func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25, 25, 25 - c.AddInterval(getBytes(25, 5), 0) - // _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 25) - -} - -func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 1, - c.AddInterval(getBytes(1, 1), 0) - // _, 2, - c.AddInterval(getBytes(2, 1), 1) - // _, _, 3, 3, 3 - c.AddInterval(getBytes(3, 3), 2) - // _, _, _, 4, 4, 4 - c.AddInterval(getBytes(4, 3), 3) - - expectedData(t, c, 0, 1, 2, 3, 4, 4, 4) - -} - -func TestContinuousIntervals_RealCase1(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, - c.AddInterval(getBytes(25, 1), 0) - // _, _, _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 4) - // _, _, _, 24, 24, 24, 24 - c.AddInterval(getBytes(24, 4), 3) - - // _, 22, 22 - c.AddInterval(getBytes(22, 2), 1) - - expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24) - -} - -func TestRandomWrites(t *testing.T) { - - c := &ContinuousIntervals{} - - data := make([]byte, 1024) - - for i := 0; i < 1024; i++ { - - start, stop := rand.Intn(len(data)), rand.Intn(len(data)) - if start > stop { - start, stop = stop, start - } - - rand.Read(data[start : stop+1]) - - c.AddInterval(data[start:stop+1], int64(start)) - - expectedData(t, c, 0, data...) - - } - -} - -func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { - start, stop := int64(offset), int64(offset+len(data)) - for _, list := range c.lists { - nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size()) - if nodeStart < nodeStop { - buf := make([]byte, nodeStop-nodeStart) - list.ReadData(buf, nodeStart, nodeStop) - if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 { - t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf) - } - } - } -} - -func getBytes(content byte, length int) []byte { - data := make([]byte, length) - for i := 0; i < length; i++ { - data[i] = content - } - return data -} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index 955627d67..25b747fad 100644 --- a/weed/filesys/page_writer/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -6,6 +6,8 @@ type DirtyPages interface { ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) Destroy() + LockForRead(startOffset, stopOffset int64) + UnlockForRead(startOffset, stopOffset int64) } func max(x, y int64) int64 { diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 9f459c11e..13ee3caec 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -7,19 +7,24 @@ import ( "github.com/chrislusf/seaweedfs/weed/util/mem" "sync" "sync/atomic" + "time" ) +type LogicChunkIndex int + type UploadPipeline struct { - writableChunks map[LogicChunkIndex]*MemChunk - writableChunksLock sync.Mutex - sealedChunks map[LogicChunkIndex]*SealedChunk - sealedChunksLock sync.Mutex - ChunkSize int64 - writers *util.LimitedConcurrentExecutor - activeWriterCond *sync.Cond - activeWriterCount int32 - saveToStorageFn SaveToStorageFunc - filepath util.FullPath + filepath util.FullPath + ChunkSize int64 + writers *util.LimitedConcurrentExecutor + writableChunks map[LogicChunkIndex]*MemChunk + writableChunksLock sync.Mutex + sealedChunks map[LogicChunkIndex]*SealedChunk + sealedChunksLock sync.Mutex + activeWriterCond *sync.Cond + activeWriterCount int32 + activeReadChunks map[LogicChunkIndex]int + activeReadChunksLock sync.Mutex + saveToStorageFn SaveToStorageFunc } type SealedChunk struct { @@ -44,6 +49,7 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx activeWriterCond: sync.NewCond(&sync.Mutex{}), saveToStorageFn: saveToStorageFn, filepath: filepath, + activeReadChunks: make(map[LogicChunkIndex]int), } } @@ -110,6 +116,51 @@ func (cw *UploadPipeline) FlushAll() { cw.waitForCurrentWritersToComplete() } +func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) + if stopOffset%cw.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := cw.activeReadChunks[i]; found { + cw.activeReadChunks[i] = count + 1 + } else { + cw.activeReadChunks[i] = 1 + } + } +} + +func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) + if stopOffset%cw.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := cw.activeReadChunks[i]; found { + if count == 1 { + delete(cw.activeReadChunks, i) + } else { + cw.activeReadChunks[i] = count - 1 + } + } + } +} + +func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + if count, found := cw.activeReadChunks[logicChunkIndex]; found { + return count > 0 + } + return false +} + func (cw *UploadPipeline) waitForCurrentWritersToComplete() { cw.activeWriterCond.L.Lock() t := int32(100) @@ -152,12 +203,7 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic // first add to the file chunks cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) - // then remove from sealed chunks - cw.sealedChunksLock.Lock() - defer cw.sealedChunksLock.Unlock() - delete(cw.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) - + // notify waiting process atomic.AddInt32(&cw.activeWriterCount, -1) glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount) // Lock and Unlock are not required, @@ -166,6 +212,18 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.activeWriterCond.L.Lock() cw.activeWriterCond.Broadcast() cw.activeWriterCond.L.Unlock() + + // wait for readers + for cw.IsLocked(logicChunkIndex) { + time.Sleep(59 * time.Millisecond) + } + + // then remove from sealed chunks + cw.sealedChunksLock.Lock() + defer cw.sealedChunksLock.Unlock() + delete(cw.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) + }) } diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go index 81191868f..d17948251 100644 --- a/weed/filesys/page_writer/upload_pipeline_test.go +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -7,7 +7,7 @@ import ( func TestUploadPipeline(t *testing.T) { - uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil) + uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil) writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 131072, 262144)