From bfe5d910c6fed2017298f8986f42075c7b55e33e Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 16 Jan 2023 22:43:02 -0800 Subject: [PATCH] use one readerCache for the whole file --- weed/filer/filechunk_group.go | 8 +++++--- weed/filer/filechunk_section.go | 2 +- weed/filer/reader_at.go | 5 ++--- weed/filer/reader_at_test.go | 10 +++++----- weed/filer/reader_cache.go | 2 +- weed/mount/page_writer/upload_pipeline.go | 5 ++++- weed/server/webdav_server.go | 9 ++++++--- 7 files changed, 24 insertions(+), 17 deletions(-) diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index de469f310..c89527710 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -12,13 +12,15 @@ type ChunkGroup struct { chunkCache chunk_cache.ChunkCache sections map[SectionIndex]*FileChunkSection sectionsLock sync.RWMutex + readerCache *ReaderCache } func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { group := &ChunkGroup{ - lookupFn: lookupFn, - chunkCache: chunkCache, - sections: make(map[SectionIndex]*FileChunkSection), + lookupFn: lookupFn, + chunkCache: chunkCache, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(32, chunkCache, lookupFn), } err := group.SetChunks(chunks) diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index a80013f22..11d6e631a 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -74,7 +74,7 @@ func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) } if section.reader == nil { - section.reader = NewChunkReaderAtFromClient(group.lookupFn, section.chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) } section.reader.fileSize = fileSize } diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 27e8f79a6..3f32f2a3c 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -10,7 +10,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) @@ -88,12 +87,12 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, - readerCache: newReaderCache(32, chunkCache, lookupFn), + readerCache: readerCache, readerPattern: NewReaderPattern(), } } diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index f61d68a6d..8bc383184 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -68,7 +68,7 @@ func TestReaderAt(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), fileSize: 10, - readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerCache: NewReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } @@ -115,7 +115,7 @@ func TestReaderAt0(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), fileSize: 10, - readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerCache: NewReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } @@ -141,7 +141,7 @@ func TestReaderAt1(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), fileSize: 20, - readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerCache: NewReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } @@ -174,7 +174,7 @@ func TestReaderAtGappedChunksDoNotLeak(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), fileSize: 9, - readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerCache: NewReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } @@ -186,7 +186,7 @@ func TestReaderAtSparseFileDoesNotLeak(t *testing.T) { readerAt := &ChunkReadAt{ chunkViews: ViewFromVisibleIntervals(NewIntervalList[*VisibleInterval](), 0, math.MaxInt64), fileSize: 3, - readerCache: newReaderCache(3, &mockChunkCache{}, nil), + readerCache: NewReaderCache(3, &mockChunkCache{}, nil), readerPattern: NewReaderPattern(), } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 1918cacf8..27d40a78b 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -34,7 +34,7 @@ type SingleChunkCacher struct { completedTimeNew int64 } -func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { +func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { return &ReaderCache{ limit: limit, chunkCache: chunkCache, diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index e1aa43fe2..267ea3a4c 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -92,14 +92,17 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN } */ up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) - // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs) + // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } + // fmt.Printf("isSequential:%v len(up.writableChunks):%v memChunkCounter:%v", isSequential, len(up.writableChunks), memChunkCounter) if isSequential && len(up.writableChunks) < up.writableChunkLimit && atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + // fmt.Printf(" create mem chunk %d\n", logicChunkIndex) } else { pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex) + // fmt.Printf(" create file chunk %d\n", logicChunkIndex) } up.writableChunks[logicChunkIndex] = pageChunk } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 79416d519..2991a39f1 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -83,6 +83,7 @@ type WebDavFileSystem struct { secret security.SigningKey grpcDialOption grpc.DialOption chunkCache *chunk_cache.TieredChunkCache + readerCache *filer.ReaderCache signature int32 } @@ -119,11 +120,13 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { os.MkdirAll(cacheDir, os.FileMode(0755)) chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) - return &WebDavFileSystem{ + t := &WebDavFileSystem{ option: option, chunkCache: chunkCache, signature: util.RandomInt32(), - }, nil + } + t.readerCache = filer.NewReaderCache(32, chunkCache, filer.LookupFn(t)) + return t, nil } var _ = filer_pb.FilerClient(&WebDavFileSystem{}) @@ -527,7 +530,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.reader == nil { chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize) - f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize) + f.reader = filer.NewChunkReaderAtFromClient(f.fs.readerCache, chunkViews, fileSize) } readSize, err = f.reader.ReadAt(p, f.off)