From f7a45d448f5efe1bea7c8fc5ee61cb7d535995b5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 19 Jun 2020 09:45:27 -0700 Subject: [PATCH] FUSE mount: lazy loading meta cache --- weed/filesys/dir.go | 9 ++++++-- weed/filesys/meta_cache/meta_cache.go | 25 ++++++++++++++++----- weed/filesys/meta_cache/meta_cache_init.go | 26 ++++++++++++++++++++++ weed/filesys/xattr.go | 2 ++ 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 4e164726d..18d21cf7f 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -8,6 +8,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -204,8 +205,10 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) - fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) + dirPath := dir.FullPath() + fullFilePath := util.NewFullPath(dirPath, req.Name) + meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath)) cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT @@ -263,7 +266,9 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { return nil } - listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + dirPath := util.FullPath(dir.FullPath()) + meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int(dir.wfs.option.DirListCacheLimit)) if listErr != nil { glog.Errorf("list meta cache: %v", listErr) return nil, fuse.EIO diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 4c9090d42..3b04040a5 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -9,16 +9,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" ) type MetaCache struct { actualStore filer2.FilerStore sync.RWMutex + visitedBoundary *bounded_tree.BoundedTree } func NewMetaCache(dbFolder string) *MetaCache { return &MetaCache{ - actualStore: openMetaStore(dbFolder), + actualStore: openMetaStore(dbFolder), + visitedBoundary: bounded_tree.NewBoundedTree(), } } @@ -49,14 +52,24 @@ func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error { mc.Lock() defer mc.Unlock() - if oldPath != "" { - if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { - return err + + oldDir, _ := oldPath.DirAndName() + if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { + if oldPath != "" { + if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } } + }else{ + // println("unknown old directory:", oldDir) } + if newEntry != nil { - if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil { - return err + newDir, _ := newEntry.DirAndName() + if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { + if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil { + return err + } } } return nil diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index 58bf6862e..1fbc3e532 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -2,6 +2,7 @@ package meta_cache import ( "context" + "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -10,6 +11,7 @@ import ( ) func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error { + return nil glog.V(0).Infof("synchronizing meta data ...") filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) { entry := filer2.FromPbEntry(string(parentPath), pbEntry) @@ -19,3 +21,27 @@ func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) erro }) return nil } + +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) { + + mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { + + glog.V(2).Infof("ReadDirAllEntries %s ...", path) + + err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer2.FromPbEntry(string(dirPath), pbEntry) + if err := mc.InsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + return err + } + if entry.IsDirectory() { + childDirectories = append(childDirectories, entry.Name()) + } + return nil + }) + if err != nil { + err = fmt.Errorf("list %s: %v", dirPath, err) + } + return + }) +} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 940979457..870a72ebe 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/fuse" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -113,6 +114,7 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err // glog.V(3).Infof("read entry cache miss %s", fullpath) // read from async meta cache + meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT