From 7ab389e7eca179516cdb19ad8b890e53053f2af3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 19 Jul 2021 23:07:22 -0700 Subject: [PATCH] optimization: improve random range query for large files --- weed/filer/filechunk_manifest.go | 9 +++++++-- weed/filer/filechunks.go | 12 ++++++------ weed/filer/filechunks_test.go | 4 ++-- weed/filesys/filehandle.go | 2 +- weed/replication/sink/filersink/filer_sink.go | 5 +++-- weed/server/webdav_server.go | 2 +- weed/shell/command_volume_fsck.go | 4 ++-- 7 files changed, 22 insertions(+), 16 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index c709dc819..2fe131dd0 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -39,9 +39,14 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa return } -func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { +func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { // TODO maybe parallel this for _, chunk := range chunks { + + if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) { + continue + } + if !chunk.IsChunkManifest { dataChunks = append(dataChunks, chunk) continue @@ -54,7 +59,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun manifestChunks = append(manifestChunks, chunk) // recursive - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks) + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { return chunks, nil, subErr } diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 346eb3cfb..0dc03f6e2 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -53,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64) fileIds := make(map[string]bool) for _, interval := range visibles { @@ -72,11 +72,11 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { - aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) + aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64) if aErr != nil { return nil, aErr } - bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) + bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64) if bErr != nil { return nil, bErr } @@ -117,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool { func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size) return ViewFromVisibleIntervals(visibles, offset, size) @@ -221,9 +221,9 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory // If the file chunk content is a chunk manifest -func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { +func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) { - chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) + chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) sort.Slice(chunks, func(i, j int) bool { if chunks[i].Mtime == chunks[j].Mtime { diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go index 699e7e298..b0ea20848 100644 --- a/weed/filer/filechunks_test.go +++ b/weed/filer/filechunks_test.go @@ -90,7 +90,7 @@ func TestRandomFileChunksCompact(t *testing.T) { } } - visibles, _ := NonOverlappingVisibleIntervals(nil, chunks) + visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64) for _, v := range visibles { for x := v.start; x < v.stop; x++ { @@ -227,7 +227,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) + intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64) for x, interval := range intervals { log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", i, x, interval.start, interval.stop, interval.fileId) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index f95051f65..9acede330 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -130,7 +130,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { var chunkResolveErr error if fh.entryViewCache == nil { - fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks) + fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 608103469..3898f2c58 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/wdclient" + "math" "google.golang.org/grpc" @@ -228,11 +229,11 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { - aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) + aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks, 0, math.MaxInt64) if aErr != nil { return nil, nil, aErr } - bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) + bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks, 0, math.MaxInt64) if bErr != nil { return nil, nil, bErr } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index c6550a36f..68c1f3233 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -532,7 +532,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64) f.reader = nil } if f.reader == nil { diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 2ced0f571..bd3be4d89 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -164,7 +164,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint if verbose && entry.Entry.IsDirectory { fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) } - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { return nil } @@ -311,7 +311,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer files[i.vid].Write(buffer) } }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { return nil }