diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 368bec973..48b5795c2 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -67,6 +67,10 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) if err != nil { if !strings.Contains(strings.ToLower(err.Error()), "duplicate") { @@ -126,7 +130,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full entry := &filer.Entry{ FullPath: fullpath, } - if err := entry.DecodeAttributesAndChunks(data); err != nil { + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -188,7 +192,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, entry := &filer.Entry{ FullPath: util.NewFullPath(string(fullpath), name), } - if err = entry.DecodeAttributesAndChunks(data); err != nil { + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fd161b1f1..250db629a 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -60,6 +60,10 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + if err := store.session.Query( "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ", dir, name, meta, entry.TtlSec).Exec(); err != nil { @@ -93,7 +97,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -144,7 +148,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath entry := &filer.Entry{ FullPath: util.NewFullPath(string(fullpath), name), } - if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 36db4ac01..634fba1eb 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -76,12 +76,16 @@ func (store *EtcdStore) RollbackTransaction(ctx context.Context) error { func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { key := genKey(entry.DirAndName()) - value, err := entry.EncodeAttributesAndChunks() + meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if _, err := store.client.Put(ctx, string(key), string(value)); err != nil { + if len(entry.Chunks) > 50 { + meta = weed_util.MaybeGzipData(meta) + } + + if _, err := store.client.Put(ctx, string(key), string(meta)); err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } @@ -107,7 +111,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(resp.Kvs[0].Value)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -163,7 +167,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_ entry := &filer.Entry{ FullPath: weed_util.NewFullPath(string(fullpath), fileName), } - if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index eccb760a2..4b8dd5ea9 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -78,6 +78,10 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = weed_util.MaybeGzipData(value) + } + err = store.db.Put(key, value, nil) if err != nil { @@ -109,7 +113,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData((data))) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -187,7 +191,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we entry := &filer.Entry{ FullPath: weed_util.NewFullPath(string(fullpath), fileName), } - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 7a2bdac2e..2ad0dd648 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -85,6 +85,10 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = weed_util.MaybeGzipData(value) + } + err = store.dbs[partitionId].Put(key, value, nil) if err != nil { @@ -117,7 +121,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -199,8 +203,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w } // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 1fc67931a..b7e855165 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -101,6 +101,10 @@ func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + c := store.connect.Database(store.database).Collection(store.collectionName) _, err = c.InsertOne(ctx, Model{ @@ -140,7 +144,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data.Meta) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -197,7 +201,7 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut entry := &filer.Entry{ FullPath: util.NewFullPath(string(fullpath), data.Name), } - if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index cc8819019..0de9924a3 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -40,6 +40,10 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() if err != nil { @@ -76,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks([]byte(data)) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 9e06ff68f..c213b39a8 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -38,6 +38,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } @@ -71,7 +75,7 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util. entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks([]byte(data)) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 94ce02596..a097a3a4e 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -70,8 +70,10 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W fmt.Fprintf(writer, "%s\n", text) - bytes, err := proto.Marshal(respLookupEntry.Entry) - fmt.Fprintf(writer, "chunks %d meta size: %d\n", len(respLookupEntry.Entry.Chunks), len(bytes)) + bytes, _ := proto.Marshal(respLookupEntry.Entry) + gzippedBytes, _ := util.GzipData(bytes) + zstdBytes, _ := util.ZstdData(bytes) + fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes)) return nil diff --git a/weed/util/compression.go b/weed/util/compression.go index 2881a7bfd..736f76a5e 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -12,15 +12,44 @@ import ( "github.com/klauspost/compress/zstd" ) +var( + UnsupportedCompression = fmt.Errorf("unsupported compression") +) + +func MaybeGzipData(input []byte) ([]byte) { + if IsGzippedContent(input) { + return input + } + gzipped, err := GzipData(input) + if err != nil { + return input + } + if len(gzipped) * 10 > len(input) * 9 { + return input + } + return gzipped +} + +func MaybeDecompressData(input []byte) ([]byte) { + uncompressed, err := DecompressData(input) + if err != nil { + if err != UnsupportedCompression { + glog.Errorf("decompressed data: %v", err) + } + return input + } + return uncompressed +} + func GzipData(input []byte) ([]byte, error) { buf := new(bytes.Buffer) w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) + glog.V(2).Infof("error gzip data: %v", err) return nil, err } if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) + glog.V(2).Infof("error closing gzipped data: %v", err) return nil, err } return buf.Bytes(), nil @@ -39,7 +68,7 @@ func DecompressData(input []byte) ([]byte, error) { if IsZstdContent(input) { return unzstdData(input) } - return input, fmt.Errorf("unsupported compression") + return input, UnsupportedCompression } func ungzipData(input []byte) ([]byte, error) { @@ -48,7 +77,7 @@ func ungzipData(input []byte) ([]byte, error) { defer r.Close() output, err := ioutil.ReadAll(r) if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) + glog.V(2).Infof("error ungzip data: %v", err) } return output, err }