From 07e0d13d2d13f53a6540e836189741186aaf060e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 28 May 2018 05:39:12 -0700 Subject: [PATCH] filer support reading multiple chunks, with range support --- weed/filer2/filechunks.go | 2 +- weed/filer2/filechunks_test.go | 2 +- weed/filesys/filehandle.go | 2 +- weed/server/filer_server_handlers_read.go | 165 +++++++++++++++++++++- weed/util/http_util.go | 34 +++++ 5 files changed, 198 insertions(+), 7 deletions(-) diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index c4c77d270..0ac7bb43b 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -58,7 +58,7 @@ type ChunkView struct { LogicOffset int64 } -func ReadFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { +func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { visibles := nonOverlappingVisibleIntervals(chunks) diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index 134bb9c19..c3f7e0504 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -267,7 +267,7 @@ func TestChunksReading(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ReadFromChunks(testcase.Chunks, testcase.Offset, testcase.Size) + chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size) for x, chunk := range chunks { log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", i, x, chunk.Offset, chunk.Size, chunk.FileId) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 81aca42a4..d3c8ec796 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -50,7 +50,7 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff := make([]byte, req.Size) - chunkViews := filer2.ReadFromChunks(fh.f.Chunks, req.Offset, req.Size) + chunkViews := filer2.ViewFromChunks(fh.f.Chunks, req.Offset, req.Size) var vids []string for _, chunkView := range chunkViews { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index a982806e3..08430716c 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -10,6 +10,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/util" + "strconv" + "mime/multipart" + "mime" + "path" ) func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { @@ -40,20 +44,37 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } - // FIXME pick the right fid + w.Header().Set("Accept-Ranges", "bytes") + if r.Method == "HEAD" { + w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) + return + } + + if len(entry.Chunks) == 1 { + fs.handleSingleChunk(w, r, entry) + return + } + + fs.handleMultipleChunks(w, r, entry) + +} + +func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { + fileId := entry.Chunks[0].FileId - urlLocation, err := operation.LookupFileId(fs.getMasterNode(), fileId) + urlString, err := operation.LookupFileId(fs.getMasterNode(), fileId) if err != nil { - glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) w.WriteHeader(http.StatusNotFound) return } - urlString := urlLocation + if fs.redirectOnRead { http.Redirect(w, r, urlString, http.StatusFound) return } + u, _ := url.Parse(urlString) q := u.Query() for key, values := range r.URL.Query() { @@ -86,5 +107,141 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body) +} + +func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { + + mimeType := "" + if ext := path.Ext(entry.Name()); ext != "" { + mimeType = mime.TypeByExtension(ext) + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } + + println("mime type:", mimeType) + + totalSize := int64(filer2.TotalSize(entry.Chunks)) + + rangeReq := r.Header.Get("Range") + + if rangeReq == "" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + if err := fs.writeContent(w, entry, 0, int(totalSize)); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + return + } + + //the rest is dealing with partial content request + //mostly copy from src/pkg/net/http/fs.go + ranges, err := parseRange(rangeReq, totalSize) + if err != nil { + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if sumRangesSize(ranges) > totalSize { + // The total number of bytes in all the ranges + // is larger than the size of the file by + // itself, so this is probably an attack, or a + // dumb client. Ignore the range request. + return + } + if len(ranges) == 0 { + return + } + if len(ranges) == 1 { + // RFC 2616, Section 14.16: + // "When an HTTP message includes the content of a single + // range (for example, a response to a request for a + // single range, or to a request for a set of ranges + // that overlap without any holes), this content is + // transmitted with a Content-Range header, and a + // Content-Length header showing the number of bytes + // actually transferred. + // ... + // A response to a request for a single range MUST NOT + // be sent using the multipart/byteranges media type." + ra := ranges[0] + w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) + + err = fs.writeContent(w, entry, ra.start, int(ra.length)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + return + } + + // process multiple ranges + for _, ra := range ranges { + if ra.start > totalSize { + http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) + return + } + } + sendSize := rangesMIMESize(ranges, mimeType, totalSize) + pr, pw := io.Pipe() + mw := multipart.NewWriter(pw) + w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) + sendContent := pr + defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. + go func() { + for _, ra := range ranges { + part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) + if e != nil { + pw.CloseWithError(e) + return + } + if e = fs.writeContent(part, entry, ra.start, int(ra.length)); e != nil { + pw.CloseWithError(e) + return + } + } + mw.Close() + pw.Close() + }() + if w.Header().Get("Content-Encoding") == "" { + w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) + } + w.WriteHeader(http.StatusPartialContent) + if _, err := io.CopyN(w, sendContent, sendSize); err != nil { + http.Error(w, "Internal Error", http.StatusInternalServerError) + return + } + +} + +func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error { + + chunkViews := filer2.ViewFromChunks(entry.Chunks, offset, size) + + fileId2Url := make(map[string]string) + + for _, chunkView := range chunkViews { + + urlString, err := operation.LookupFileId(fs.getMasterNode(), chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + fileId2Url[chunkView.FileId] = urlString + } + + for _, chunkView := range chunkViews { + urlString := fileId2Url[chunkView.FileId] + _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { + w.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + } + + return nil } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 579abaac0..51bedcdfd 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -215,3 +215,37 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte) (n int64, e err } } + +func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) { + + req, _ := http.NewRequest("GET", fileUrl, nil) + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + + r, err := client.Do(req) + if err != nil { + return 0, err + } + defer r.Body.Close() + if r.StatusCode >= 400 { + return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var m int + buf := make([]byte, 64*1024) + + for { + m, err = r.Body.Read(buf) + if m == 0 { + return + } + fn(buf[:m]) + n += int64(m) + if err == io.EOF { + return n, nil + } + if e != nil { + return n, e + } + } + +}