From ecef844dfc5bab7a28b34bbe1472d1c9585c7e41 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 5 Jun 2022 11:54:04 -0700 Subject: [PATCH] stream read large files --- weed/server/volume_server_handlers_read.go | 53 ++++++++++++-- weed/storage/needle/needle_read_page.go | 32 +++++---- weed/storage/needle/needle_read_test.go | 2 +- weed/storage/store.go | 15 +++- weed/storage/volume_read.go | 80 ++++++++++++++++++---- 5 files changed, 151 insertions(+), 31 deletions(-) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index b9213a15d..eb5b2be5a 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -127,6 +127,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) var count int var needleSize types.Size + readOption.AttemptMetaOnly, readOption.MustMetaOnly = shouldAttemptStreamWrite(hasVolume, ext, r) onReadSizeFn := func(size types.Size) { needleSize = size atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize)) @@ -218,13 +219,33 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r) - - if e := writeResponseContent(filename, mtype, rs, w, r); e != nil { - glog.V(2).Infoln("response write error:", e) + if !readOption.IsMetaOnly { + rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r) + if e := writeResponseContent(filename, mtype, rs, w, r); e != nil { + glog.V(2).Infoln("response write error:", e) + } + } else { + vs.streamWriteResponseContent(filename, mtype, volumeId, n, w, r, readOption) } } +func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request) (shouldAttempt bool, mustMetaOnly bool) { + if !hasLocalVolume { + return false, false + } + if len(ext) > 0 { + ext = strings.ToLower(ext) + } + if r.Method == "HEAD" { + return true, true + } + _, _, _, shouldResize := shouldResizeImages(ext, r) + if shouldResize { + return false, false + } + return true, false +} + func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) { if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" { return false @@ -318,3 +339,27 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re }) return nil } + +func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType string, volumeId needle.VolumeId, n *needle.Needle, w http.ResponseWriter, r *http.Request, readOption *storage.ReadOption) { + totalSize := int64(n.DataSize) + if mimeType == "" { + if ext := filepath.Ext(filename); ext != "" { + mimeType = mime.TypeByExtension(ext) + } + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } + w.Header().Set("Accept-Ranges", "bytes") + adjustPassthroughHeaders(w, r, filename) + + if r.Method == "HEAD" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + return + } + + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) + }) + +} diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index 300b415c9..1fe40f847 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -10,26 +10,28 @@ import ( ) // ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer -func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, offset int64, buf []byte, writer io.Writer, expectedChecksumValue uint32) (err error) { +func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64, expectedChecksumValue uint32) (err error) { crc := CRC(0) - for x := 0; ; x += len(buf) { - count, err := n.ReadNeedleData(r, offset, buf, int64(x)) - if err != nil { - if err == io.EOF { - break - } - return fmt.Errorf("ReadNeedleData: %v", err) - } + for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) { + count, err := n.ReadNeedleData(r, volumeOffset, buf, x) if count > 0 { crc = crc.Update(buf[0:count]) if _, err = writer.Write(buf[0:count]); err != nil { return fmt.Errorf("ReadNeedleData write: %v", err) } - } else { + } + if err != nil { + if err == io.EOF { + err = nil + break + } + return fmt.Errorf("ReadNeedleData: %v", err) + } + if count <= 0 { break } } - if expectedChecksumValue != crc.Value() { + if needleOffset == 0 && size == int64(n.DataSize) && expectedChecksumValue != crc.Value() { return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc.Value(), expectedChecksumValue) } return nil @@ -65,14 +67,18 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size return 0, err } n.ParseNeedleHeader(bytes) + if n.Size != size { + if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { + return 0, ErrorSizeMismatch + } + } + n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize]) startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize) dataSize := GetActualSize(size, version) stopOffset := offset + dataSize metaSize := stopOffset - startOffset - fmt.Printf("offset %d dataSize %d\n", offset, dataSize) - fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize) metaSlice := make([]byte, int(metaSize)) count, err = r.ReadAt(metaSlice, startOffset) diff --git a/weed/storage/needle/needle_read_test.go b/weed/storage/needle/needle_read_test.go index 688df0d53..9ffd8836e 100644 --- a/weed/storage/needle/needle_read_test.go +++ b/weed/storage/needle/needle_read_test.go @@ -74,7 +74,7 @@ func TestPageRead(t *testing.T) { fmt.Printf("Checksum value %d\n", checksumValue) buf := make([]byte, 1024) - if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, checksumValue); err != nil { + if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, 0, int64(n.DataSize), checksumValue); err != nil { t.Fatalf("ReadNeedleDataInto: %v", err) } diff --git a/weed/storage/store.go b/weed/storage/store.go index 7f1d35f33..81e69aaa2 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "io" "path/filepath" "strings" "sync/atomic" @@ -26,7 +27,13 @@ const ( ) type ReadOption struct { - ReadDeleted bool + ReadDeleted bool + AttemptMetaOnly bool + MustMetaOnly bool + IsMetaOnly bool // read status + ChecksumValue uint32 // read status + VolumeRevision uint16 + IsOutOfRange bool // whether need to read over MaxPossibleVolumeSize } /* @@ -375,6 +382,12 @@ func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption } return 0, fmt.Errorf("volume %d not found", i) } +func (s *Store) ReadVolumeNeedleDataInto(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) error { + if v := s.findVolume(i); v != nil { + return v.readNeedleDataInto(n, readOption, writer, offset, size) + } + return fmt.Errorf("volume %d not found", i) +} func (s *Store) GetVolume(i needle.VolumeId) *Volume { return s.findVolume(i) } diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index 9751b56ae..ced3fcb8f 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "time" @@ -12,8 +13,10 @@ import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" ) +const PagedReadLimit = 1024 * 1024 + // read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) { +func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (count int, err error) { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() @@ -36,31 +39,84 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize if onReadSizeFn != nil { onReadSizeFn(readSize) } - err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) - if err == needle.ErrorSizeMismatch && OffsetSize == 4 { - err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + if readOption != nil && readOption.AttemptMetaOnly && readSize > PagedReadLimit { + readOption.VolumeRevision = v.SuperBlock.CompactionRevision + readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + readOption.IsOutOfRange = true + readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + if err != nil { + return 0, err + } + if !n.IsCompressed() && !n.IsChunkedManifest() { + readOption.IsMetaOnly = true + } } - v.checkReadWriteError(err) - if err != nil { - return 0, err + if readOption == nil || !readOption.IsMetaOnly { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + v.checkReadWriteError(err) + if err != nil { + return 0, err + } } - bytesRead := len(n.Data) + count = int(n.DataSize) if !n.HasTtl() { - return bytesRead, nil + return } ttlMinutes := n.Ttl.Minutes() if ttlMinutes == 0 { - return bytesRead, nil + return } if !n.HasLastModifiedDate() { - return bytesRead, nil + return } if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { - return bytesRead, nil + return } return -1, ErrorNotFound } +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return ErrorNotFound + } + readSize := nv.Size + if readSize.IsDeleted() { + if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { + glog.V(3).Infof("reading deleted %s", n.String()) + readSize = -readSize + } else { + return ErrorDeleted + } + } + if readSize == 0 { + return nil + } + + if readOption.VolumeRevision != v.SuperBlock.CompactionRevision { + // the volume is compacted + readOption.IsOutOfRange = false + readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + } + buf := mem.Allocate(1024 * 1024) + defer mem.Free(buf) + actualOffset := nv.Offset.ToActualOffset() + if readOption.IsOutOfRange { + actualOffset += int64(MaxPossibleVolumeSize) + } + + return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size, readOption.ChecksumValue) +} + // read fills in Needle content by looking up n.Id from NeedleMapper func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) { v.dataFileAccessLock.RLock()