diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index ace958e82..5b0e17ea3 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -206,7 +206,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. entry := dir.wfs.cacheGet(fullFilePath) if dir.wfs.option.AsyncMetaDataCaching { - + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + entry = cachedEntry.ToProtoEntry() } if entry == nil { @@ -248,13 +252,8 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) - if dir.wfs.option.AsyncMetaDataCaching { - - } - cacheTtl := 5 * time.Minute - - readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", func(entry *filer_pb.Entry, isLast bool) { + processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() if entry.IsDirectory { @@ -265,7 +264,21 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { ret = append(ret, dirent) } dir.wfs.cacheSet(fullpath, entry, cacheTtl) - }) + } + + if dir.wfs.option.AsyncMetaDataCaching { + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO + } + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) + } + return + } + + readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) if readErr != nil { glog.V(0).Infof("list %s: %v", dir.FullPath(), err) return ret, fuse.EIO diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go new file mode 100644 index 000000000..e6593ebde --- /dev/null +++ b/weed/filesys/meta_cache/cache_config.go @@ -0,0 +1,32 @@ +package meta_cache + +import "github.com/chrislusf/seaweedfs/weed/util" + +var ( + _ = util.Configuration(&cacheConfig{}) +) + +// implementing util.Configuraion +type cacheConfig struct { + dir string +} + +func (c cacheConfig) GetString(key string) string { + return c.dir +} + +func (c cacheConfig) GetBool(key string) bool { + panic("implement me") +} + +func (c cacheConfig) GetInt(key string) int { + panic("implement me") +} + +func (c cacheConfig) GetStringSlice(key string) []string { + panic("implement me") +} + +func (c cacheConfig) SetDefault(key string, value interface{}) { + panic("implement me") +} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go new file mode 100644 index 000000000..4f047e824 --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache.go @@ -0,0 +1,34 @@ +package meta_cache + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type MetaCache struct { + filer2.FilerStore +} + +func NewMetaCache(dbFolder string) *MetaCache { + return &MetaCache{ + FilerStore: OpenMetaStore(dbFolder), + } +} + +func OpenMetaStore(dbFolder string) filer2.FilerStore { + + os.MkdirAll(dbFolder, 0755) + + store := &leveldb.LevelDBStore{} + config := &cacheConfig{} + + if err := store.Initialize(config, ""); err != nil { + glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) + } + + return store + +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 2013f3c03..b27ac440d 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "os" + "path" "strings" "sync" "time" @@ -12,6 +13,7 @@ import ( "github.com/karlseguin/ccache" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -67,6 +69,7 @@ type WFS struct { fsNodeCache *FsCache chunkCache *chunk_cache.ChunkCache + metaCache *meta_cache.MetaCache } type statsCache struct { filer_pb.StatisticsResponse @@ -90,6 +93,9 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache.Shutdown() }) } + if wfs.option.AsyncMetaDataCaching { + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) + } wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} wfs.fsNodeCache = newFsCache(wfs.root) diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 1768c1d6e..7e7b8c60b 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -1,6 +1,8 @@ package filesys import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -114,8 +116,13 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err } // glog.V(3).Infof("read entry cache miss %s", fullpath) + // read from async meta cache if wfs.option.AsyncMetaDataCaching { - + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + return cachedEntry.ToProtoEntry(), nil } err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {