diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 4bf9b16fa..13268b944 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -156,7 +156,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index 9fd1fffb3..13d14b2b0 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -157,7 +157,7 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } model := &Model{ @@ -196,7 +196,7 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } model := &Model{ diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fb61b0771..d8c094a45 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -100,7 +100,7 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 2a5dfc926..4146a3899 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -82,7 +82,7 @@ func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (er return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = weed_util.MaybeGzipData(meta) } diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index a092ee456..260945b33 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -7,6 +7,8 @@ import ( "io" ) +const CountEntryChunksForGzip = 50 + var ( ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index e0d878ca7..c5d6eb48c 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -75,7 +75,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index 73d757e62..6abb37f99 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -86,7 +86,7 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 966686ed9..d68493bd7 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -88,7 +88,7 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index e448f0093..d21515bd4 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -177,7 +177,7 @@ func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = weed_util.MaybeGzipData(value) } diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index c12354ad6..83686bfe7 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -107,7 +107,7 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 0cdf58d7f..89684647b 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -40,7 +40,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index deccf8922..7a34092a0 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -52,7 +52,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index f04ee493d..10a87e2a4 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -40,7 +40,7 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go index 9674ac03f..0ab0f2f24 100644 --- a/weed/filer/redis_lua/universal_redis_store.go +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -53,7 +53,7 @@ func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *fil return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.Chunks) > 50 { + if len(entry.Chunks) > filer.CountEntryChunksForGzip { value = util.MaybeGzipData(value) } diff --git a/weed/filer/ydb/readme.md b/weed/filer/ydb/readme.md index 90d7a18e9..2221e13b5 100644 --- a/weed/filer/ydb/readme.md +++ b/weed/filer/ydb/readme.md @@ -8,14 +8,9 @@ options: ``` [ydb] enabled=true -db_name="seaweedfs" -servers=["http://localhost:8529"] -#basic auth -user="root" -pass="test" - -# tls settings -insecure_skip_verify=true +prefix="seaweedfs" +useBucketPrefix=true +coonectionUrl=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db ``` get ydb types diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index 57b282a7a..fdfc8bcb1 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -67,6 +67,6 @@ const ( SELECT name, meta FROM file_meta - WHERE dir_hash == $dir_hash AND directory == $directory and name %v $start_name and name LIKE '$prefix%' + WHERE dir_hash == $dir_hash AND directory == $directory and name %s $start_name and name LIKE '$prefix%%' ORDER BY name ASC LIMIT $limit;` ) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index aedc11ec5..d82fc4da7 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -15,6 +15,10 @@ import ( "time" ) +const ( + defaultConnectionTimeOut = 10 +) + var ( roTX = table.TxControl( table.BeginTx(table.WithOnlineReadOnly()), @@ -29,8 +33,6 @@ var ( type YdbStore struct { SupportBucketTable bool DB *connect.Connection - connParams connect.ConnectParams - connCtx context.Context dirBuckets string tablePathPrefix string } @@ -44,16 +46,27 @@ func (store *YdbStore) GetName() string { } func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.initialize(configuration.GetString(prefix + "coonectionUrl")) + return store.initialize( + configuration.GetString("filer.options.buckets_folder"), + configuration.GetString(prefix+"coonectionUrl"), + configuration.GetString(prefix+"tablePathPrefix"), + configuration.GetBool(prefix+"useBucketPrefix"), + configuration.GetInt(prefix+"connectionTimeOut"), + ) } -func (store *YdbStore) initialize(sqlUrl string) (err error) { - store.SupportBucketTable = false +func (store *YdbStore) initialize(dirBuckets string, sqlUrl string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int) (err error) { + store.dirBuckets = dirBuckets + store.tablePathPrefix = tablePathPrefix + store.SupportBucketTable = useBucketPrefix + if connectionTimeOut == 0 { + connectionTimeOut = defaultConnectionTimeOut + } var cancel context.CancelFunc - store.connCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + connCtx, cancel := context.WithTimeout(context.Background(), time.Duration(connectionTimeOut)*time.Second) defer cancel() - store.connParams = connect.MustConnectionString(sqlUrl) - store.DB, err = connect.New(store.connCtx, store.connParams) + connParams := connect.MustConnectionString(sqlUrl) + store.DB, err = connect.New(connCtx, connParams) if err != nil { store.DB.Close() store.DB = nil @@ -61,7 +74,7 @@ func (store *YdbStore) initialize(sqlUrl string) (err error) { } defer store.DB.Close() - if err = store.DB.EnsurePathExists(store.connCtx, store.connParams.Database()); err != nil { + if err = store.DB.EnsurePathExists(connCtx, connParams.Database()); err != nil { return fmt.Errorf("connect to %s error:%v", sqlUrl, err) } return nil @@ -73,6 +86,11 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent if err != nil { return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} return table.Retry(ctx, store.DB.Table().Pool(), table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { @@ -114,7 +132,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e } defer res.Close() - for res.NextSet() { + for res.NextResultSet(ctx) { for res.NextRow() { res.SeekItem("meta") entry.FullPath = fullpath @@ -251,17 +269,21 @@ func (store *YdbStore) Shutdown() { } func (store *YdbStore) getPrefix(dir string) string { + if !store.SupportBucketTable { + return store.tablePathPrefix + } + prefixBuckets := store.dirBuckets + "/" if strings.HasPrefix(dir, prefixBuckets) { // detect bucket bucketAndDir := dir[len(prefixBuckets):] if t := strings.Index(bucketAndDir, "/"); t > 0 { - return bucketAndDir[:t] + return path.Join(bucketAndDir[:t], store.tablePathPrefix) } } - return "" + return store.tablePathPrefix } func (store *YdbStore) withPragma(prefix, query string) string { - return `PRAGMA TablePathPrefix("` + path.Join(store.tablePathPrefix, prefix) + `");` + query + return `PRAGMA TablePathPrefix("` + prefix + `");` + query }