From 0fe286a6cac0dc2d7cb960645a4235c25516ea91 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 17 May 2019 17:33:49 -0700 Subject: [PATCH] filer: optimization for leveldb, add leveldb2 large filer db may see high CPU and disk usage due to background compaction --- .../compact_leveldb/compact_leveldb.go | 35 ++++ weed/command/scaffold.go | 6 + weed/filer2/leveldb/leveldb_store.go | 9 +- weed/filer2/leveldb2/leveldb2_store.go | 190 ++++++++++++++++++ weed/filer2/leveldb2/leveldb2_store_test.go | 88 ++++++++ 5 files changed, 327 insertions(+), 1 deletion(-) create mode 100644 unmaintained/compact_leveldb/compact_leveldb.go create mode 100644 weed/filer2/leveldb2/leveldb2_store.go create mode 100644 weed/filer2/leveldb2/leveldb2_store_test.go diff --git a/unmaintained/compact_leveldb/compact_leveldb.go b/unmaintained/compact_leveldb/compact_leveldb.go new file mode 100644 index 000000000..317356c3f --- /dev/null +++ b/unmaintained/compact_leveldb/compact_leveldb.go @@ -0,0 +1,35 @@ +package main + +import ( + "flag" + "log" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var ( + dir = flag.String("dir", ".", "data directory to store leveldb files") +) + +func main() { + + flag.Parse() + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, + OpenFilesCacheCapacity: -1, + } + + db, err := leveldb.OpenFile(*dir, opts) + if err != nil { + log.Fatal(err) + } + defer db.Close() + if err := db.CompactRange(util.Range{}); err != nil { + log.Fatal(err) + } +} diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index e56a1b5ba..a0908912c 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -63,6 +63,12 @@ enabled = false [leveldb] # local on disk, mostly for simple single-machine setup, fairly scalable +enabled = false +dir = "." # directory to store level db files + +[leveldb2] +# local on disk, mostly for simple single-machine setup, fairly scalable +# faster than previous leveldb, recommended. enabled = true dir = "." # directory to store level db files diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 5b3a63959..a55c58153 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" ) @@ -39,7 +40,13 @@ func (store *LevelDBStore) initialize(dir string) (err error) { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } - if store.db, err = leveldb.OpenFile(dir, nil); err != nil { + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 4, + } + + if store.db, err = leveldb.OpenFile(dir, opts); err != nil { glog.Infof("filer store open dir %s: %v", dir, err) return } diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go new file mode 100644 index 000000000..c0e2cd877 --- /dev/null +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -0,0 +1,190 @@ +package leveldb + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + weed_util "github.com/chrislusf/seaweedfs/weed/util" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" +) + +func init() { + filer2.Stores = append(filer2.Stores, &LevelDB2Store{}) +} + +// known theoretically 128 bit MD5 collision of 2 directories. +// (but really? please show some real examples) +type LevelDB2Store struct { + db *leveldb.DB +} + +func (store *LevelDB2Store) GetName() string { + return "leveldb2" +} + +func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) { + dir := configuration.GetString("dir") + return store.initialize(dir) +} + +func (store *LevelDB2Store) initialize(dir string) (err error) { + glog.Infof("filer store leveldb2 dir: %s", dir) + if err := weed_util.TestFolderWritable(dir); err != nil { + return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) + } + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, + } + + if store.db, err = leveldb.OpenFile(dir, opts); err != nil { + glog.Infof("filer store open dir %s: %v", dir, err) + return + } + return +} + +func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + key := genKey(entry.DirAndName()) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + err = store.db.Put(key, value, nil) + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) + + return nil +} + +func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { + key := genKey(fullpath.DirAndName()) + + data, err := store.db.Get(key, nil) + + if err == leveldb.ErrNotFound { + return nil, filer2.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(data) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) + + return entry, nil +} + +func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { + key := genKey(fullpath.DirAndName()) + + err = store.db.Delete(key, nil) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer2.Entry, err error) { + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entry := &filer2.Entry{ + FullPath: filer2.NewFullPath(string(fullpath), fileName), + } + if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + iter.Release() + + return entries, err +} + +func genKey(dirPath, fileName string) (key []byte) { + key = hashToBytes(dirPath) + key = append(key, []byte(fileName)...) + return key +} + +func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = hashToBytes(string(fullpath)) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func getNameFromKey(key []byte) string { + + return string(key[8:]) + +} + +func hashToBytes(dir string) []byte { + h := md5.New() + io.WriteString(h, dir) + + b := h.Sum(nil) + + return b +} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go new file mode 100644 index 000000000..5d6e660de --- /dev/null +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -0,0 +1,88 @@ +package leveldb + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer2" + "io/ioutil" + "os" + "testing" +) + +func TestCreateAndFind(t *testing.T) { + filer := filer2.NewFiler(nil, nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") + defer os.RemoveAll(dir) + store := &LevelDB2Store{} + store.initialize(dir) + filer.SetStore(store) + filer.DisableDirectoryCache() + + fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + + ctx := context.Background() + + entry1 := &filer2.Entry{ + FullPath: fullpath, + Attr: filer2.Attr{ + Mode: 0440, + Uid: 1234, + Gid: 5678, + }, + } + + if err := filer.CreateEntry(ctx, entry1); err != nil { + t.Errorf("create entry %v: %v", entry1.FullPath, err) + return + } + + entry, err := filer.FindEntry(ctx, fullpath) + + if err != nil { + t.Errorf("find entry: %v", err) + return + } + + if entry.FullPath != entry1.FullPath { + t.Errorf("find wrong entry: %v", entry.FullPath) + return + } + + // checking one upper directory + entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + + // checking one upper directory + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} + +func TestEmptyRoot(t *testing.T) { + filer := filer2.NewFiler(nil, nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") + defer os.RemoveAll(dir) + store := &LevelDB2Store{} + store.initialize(dir) + filer.SetStore(store) + filer.DisableDirectoryCache() + + ctx := context.Background() + + // checking one upper directory + entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + if err != nil { + t.Errorf("list entries: %v", err) + return + } + if len(entries) != 0 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +}