diff --git a/weed/util/compression.go b/weed/util/compression.go index 8699a8117..8158b9ba5 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -2,10 +2,7 @@ package util import ( "bytes" - "compress/flate" - "compress/gzip" "fmt" - "io/ioutil" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -42,17 +39,21 @@ func MaybeDecompressData(input []byte) []byte { } func GzipData(input []byte) ([]byte, error) { - buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) - if _, err := w.Write(input); err != nil { - glog.V(2).Infof("error gzip data: %v", err) + w := new(bytes.Buffer) + _, err := GzipStream(w, bytes.NewReader(input)) + if err != nil { return nil, err } - if err := w.Close(); err != nil { - glog.V(2).Infof("error closing gzipped data: %v", err) + return w.Bytes(), nil +} + +func ungzipData(input []byte) ([]byte, error) { + w := new(bytes.Buffer) + _, err := GunzipStream(w, bytes.NewReader(input)) + if err != nil { return nil, err } - return buf.Bytes(), nil + return w.Bytes(), nil } func DecompressData(input []byte) ([]byte, error) { @@ -67,17 +68,6 @@ func DecompressData(input []byte) ([]byte, error) { return input, UnsupportedCompression } -func ungzipData(input []byte) ([]byte, error) { - buf := bytes.NewBuffer(input) - r, _ := gzip.NewReader(buf) - defer r.Close() - output, err := ioutil.ReadAll(r) - if err != nil { - glog.V(2).Infof("error ungzip data: %v", err) - } - return output, err -} - func IsGzippedContent(data []byte) bool { if len(data) < 2 { return false diff --git a/weed/util/compression_stream.go b/weed/util/compression_stream.go new file mode 100644 index 000000000..390100c80 --- /dev/null +++ b/weed/util/compression_stream.go @@ -0,0 +1,53 @@ +package util + +import ( + "compress/gzip" + "fmt" + "io" + "sync" +) + +var ( + gzipReaderPool = sync.Pool{ + New: func() interface{} { + return new(gzip.Reader) + //return gzip.NewReader() + }, + } + + gzipWriterPool = sync.Pool{ + New: func() interface{} { + w, _ := gzip.NewWriterLevel(nil, gzip.BestSpeed) + return w + }, + } +) + +func GzipStream(w io.Writer, r io.Reader) (int64, error) { + gw, ok := gzipWriterPool.Get().(*gzip.Writer) + if !ok { + return 0, fmt.Errorf("gzip: new writer error") + } + gw.Reset(w) + defer func() { + gw.Close() + gzipWriterPool.Put(gw) + }() + return io.Copy(gw, r) +} + +func GunzipStream(w io.Writer, r io.Reader) (int64, error) { + gr, ok := gzipReaderPool.Get().(*gzip.Reader) + if !ok { + return 0, fmt.Errorf("gzip: new reader error") + } + + if err := gr.Reset(r); err != nil { + return 0, err + } + defer func() { + gr.Close() + gzipReaderPool.Put(gr) + }() + return io.Copy(w, gr) +}