From 3eafec4b29b9a5090eb6dd5f0ddee6ee1f713792 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 3 Jan 2020 00:37:24 -0800 Subject: [PATCH] volume: add option to limit file size --- weed/command/server.go | 1 + weed/command/volume.go | 11 ++++++--- weed/server/common.go | 2 +- weed/server/volume_server.go | 9 ++++++-- weed/server/volume_server_handlers_write.go | 2 +- weed/storage/needle/needle.go | 18 +++++++++------ weed/storage/needle/needle_parse_multipart.go | 23 ++++++++++++++----- 7 files changed, 46 insertions(+), 20 deletions(-) diff --git a/weed/command/server.go b/weed/command/server.go index 87f404ed3..6aa68b6d2 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -89,6 +89,7 @@ func init() { serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") + serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") diff --git a/weed/command/volume.go b/weed/command/volume.go index 3e8341ef8..b0f46bbf3 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -10,17 +10,19 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util/httpdown" "github.com/spf13/viper" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util/httpdown" + + "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc/reflection" ) var ( @@ -47,6 +49,7 @@ type VolumeServerOptions struct { cpuProfile *string memProfile *string compactionMBPerSecond *int + fileSizeLimitMB *int } func init() { @@ -67,6 +70,7 @@ func init() { v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") + v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") } var cmdVolume = &Command{ @@ -158,6 +162,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v v.whiteList, *v.fixJpgOrientation, *v.readRedirect, *v.compactionMBPerSecond, + *v.fileSizeLimitMB, ) // starting grpc server diff --git a/weed/server/common.go b/weed/server/common.go index 888ddec49..6828e9dc5 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -98,7 +98,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r) + fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r, 256*1024*1024) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 6cf654738..a406b36cc 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -4,13 +4,15 @@ import ( "fmt" "net/http" - "github.com/chrislusf/seaweedfs/weed/stats" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/stats" + + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/spf13/viper" ) type VolumeServer struct { @@ -29,6 +31,7 @@ type VolumeServer struct { compactionBytePerSecond int64 MetricsAddress string MetricsIntervalSec int + fileSizeLimitBytes int64 } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -41,6 +44,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fixJpgOrientation bool, readRedirect bool, compactionMBPerSecond int, + fileSizeLimitMB int, ) *VolumeServer { v := viper.GetViper() @@ -62,6 +66,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, ReadRedirect: readRedirect, grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, + fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, } vs.SeedMasterNodes = masterNodes vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 05e21612b..cd35255e5 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -43,7 +43,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation) + needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 2f03ba87b..494cc138e 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -3,13 +3,13 @@ package needle import ( "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" "strconv" "strings" "time" - "io/ioutil" - "github.com/chrislusf/seaweedfs/weed/images" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -50,7 +50,7 @@ func (n *Needle) String() (str string) { return } -func ParseUpload(r *http.Request) ( +func ParseUpload(r *http.Request, sizeLimit int64) ( fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int, modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { pairMap = make(map[string]string) @@ -61,13 +61,17 @@ func ParseUpload(r *http.Request) ( } if r.Method == "POST" { - fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r) + fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r, sizeLimit) } else { isGzipped = false mimeType = r.Header.Get("Content-Type") fileName = "" - data, e = ioutil.ReadAll(r.Body) + data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1)) originalDataSize = len(data) + if e == io.EOF || int64(originalDataSize) == sizeLimit+1 { + io.Copy(ioutil.Discard, r.Body) + } + r.Body.Close() } if e != nil { return @@ -78,11 +82,11 @@ func ParseUpload(r *http.Request) ( return } -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle, originalSize int, e error) { +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) { var pairMap map[string]string fname, mimeType, isGzipped, isChunkedFile := "", "", false, false n = new(Needle) - fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) + fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r, sizeLimit) if e != nil { return } diff --git a/weed/storage/needle/needle_parse_multipart.go b/weed/storage/needle/needle_parse_multipart.go index 8be1a1da4..f0a239f9f 100644 --- a/weed/storage/needle/needle_parse_multipart.go +++ b/weed/storage/needle/needle_parse_multipart.go @@ -1,9 +1,7 @@ package needle import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - + "fmt" "io" "io/ioutil" "mime" @@ -11,9 +9,12 @@ import ( "path" "strconv" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) -func parseMultipart(r *http.Request) ( +func parseMultipart(r *http.Request, sizeLimit int64) ( fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) { defer func() { if e != nil && r.Body != nil { @@ -41,11 +42,17 @@ func parseMultipart(r *http.Request) ( fileName = path.Base(fileName) } - data, e = ioutil.ReadAll(part) + println("reading part", sizeLimit) + + data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1)) if e != nil { glog.V(0).Infoln("Reading Content [ERROR]", e) return } + if len(data) == int(sizeLimit)+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } //if the filename is empty string, do a search on the other multi-part items for fileName == "" { @@ -58,12 +65,16 @@ func parseMultipart(r *http.Request) ( //found the first multi-part has filename if fName != "" { - data2, fe2 := ioutil.ReadAll(part2) + data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1)) if fe2 != nil { glog.V(0).Infoln("Reading Content [ERROR]", fe2) e = fe2 return } + if len(data) == int(sizeLimit)+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } //update data = data2