diff --git a/weed/command/filer.go b/weed/command/filer.go index 888dc2d03..5c1e653cb 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -59,6 +59,7 @@ type FilerOptions struct { debugPort *int localSocket *string showUIDirectoryDelete *bool + downloadMaxMBps *int } func init() { @@ -87,6 +88,7 @@ func init() { f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-.sock") f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button") + f.downloadMaxMBps = cmdFiler.Flag.Int("downloadMaxMBps", 0, "download max speed for each download request, in MB per second") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -239,6 +241,7 @@ func (fo *FilerOptions) startFiler() { SaveToFilerLimit: int64(*fo.saveToFilerLimit), ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, + DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/server.go b/weed/command/server.go index d4c7fb64e..1f5aa5727 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -115,6 +115,7 @@ func init() { filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-.sock") filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button") + filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 9ac74d0a0..b4ec58478 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -68,6 +68,10 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R } func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { + return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0) +} + +func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error { glog.V(4).Infof("start to stream content for chunks: %+v", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) @@ -95,6 +99,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ fileId2Url[chunkView.FileId] = urlStrings } + downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) remaining := size for _, chunkView := range chunkViews { if offset < chunkView.LogicOffset { @@ -118,6 +123,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ return fmt.Errorf("read chunk: %v", err) } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() + downloadThrottler.MaybeSlowdown(int64(chunkView.Size)) } if remaining > 0 { glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 1e220d5db..a243b07f9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -68,6 +68,7 @@ type FilerOption struct { SaveToFilerLimit int64 ConcurrentUploadLimit int64 ShowUIDirectoryDelete bool + DownloadMaxBytesPs int64 } type FilerServer struct { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 04017894b..fcd08a79e 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -238,7 +238,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size) + err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to stream content %s: %v", r.URL, err)