diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 32008271b..b6a64b30d 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -101,6 +101,15 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) } +func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) { + urlStrings, err := lookupFileIdFn(fileId) + if err != nil { + glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size) +} + func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { var err error diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 458cf88be..27a4f29ec 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -26,6 +26,7 @@ type ChunkReadAt struct { chunkCache chunk_cache.ChunkCache lastChunkFileId string lastChunkData []byte + readerPattern *ReaderPattern } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ - chunkViews: chunkViews, - lookupFileId: lookupFn, - chunkCache: chunkCache, - fileSize: fileSize, + chunkViews: chunkViews, + lookupFileId: lookupFn, + chunkCache: chunkCache, + fileSize: fileSize, + readerPattern: NewReaderPattern(), } } @@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerLock.Lock() defer c.readerLock.Unlock() + c.readerPattern.MonitorReadAt(offset, len(p)) + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } @@ -171,6 +175,10 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) { + if c.readerPattern.IsRandomMode() { + return c.doFetchRangeChunkData(chunkView, offset, length) + } + chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) if len(chunkSlice) > 0 { return chunkSlice, nil @@ -243,3 +251,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) return data, err } + +func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) { + + glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) + + data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length)) + + glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) + + return data, err + +} diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go new file mode 100644 index 000000000..2bf18d141 --- /dev/null +++ b/weed/filer/reader_pattern.go @@ -0,0 +1,31 @@ +package filer + +type ReaderPattern struct { + isStreaming bool + lastReadOffset int64 +} + +// For streaming read: only cache the first chunk +// For random read: only fetch the requested range, instead of the whole chunk + +func NewReaderPattern() *ReaderPattern { + return &ReaderPattern{ + isStreaming: true, + lastReadOffset: 0, + } +} + +func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { + if rp.lastReadOffset > offset { + rp.isStreaming = false + } + rp.lastReadOffset = offset +} + +func (rp *ReaderPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *ReaderPattern) IsRandomMode() bool { + return !rp.isStreaming +} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 34affddb9..4f7b7f37e 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -62,10 +62,11 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) fh.Lock() defer fh.Unlock() + glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) + if req.Size <= 0 { return nil } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 38cc10d62..5ce2278bf 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -29,8 +29,6 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range")) - stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()