diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 9c7a68b8e..0f7c3c176 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,8 +2,11 @@ package filer2 import ( "bytes" + "context" + "fmt" "io" "math" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -51,18 +54,51 @@ type ChunkStreamReader struct { bufferOffset int64 bufferPos int chunkIndex int + lookupFileId func(fileId string) (targetUrl string, err error) } var _ = io.ReadSeeker(&ChunkStreamReader{}) -func NewChunkStreamReader(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) return &ChunkStreamReader{ - masterClient: masterClient, - chunkViews: chunkViews, - bufferOffset: -1, + chunkViews: chunkViews, + lookupFileId: func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + }, + } +} + +func NewChunkStreamReaderFromClient(filerClient FilerClient, chunkViews []*ChunkView) *ChunkStreamReader { + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: func(fileId string) (targetUrl string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + vid := fileIdToVolumeId(fileId) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations := resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + + return nil + }) + return + }, } } @@ -72,6 +108,7 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { return 0, io.EOF } chunkView := c.chunkViews[c.chunkIndex] + println("fetch1") c.fetchChunkToBuffer(chunkView) c.chunkIndex++ } @@ -105,7 +142,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { for i, chunk := range c.chunkViews { if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != offset { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { c.fetchChunkToBuffer(chunk) c.chunkIndex = i + 1 break @@ -119,7 +156,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlString, err := c.masterClient.LookupFileId(chunkView.FileId) + urlString, err := c.lookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err @@ -136,5 +173,15 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { c.bufferPos = 0 c.bufferOffset = chunkView.LogicOffset + // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + return nil } + +func fileIdToVolumeId(fileId string) (volumeId string) { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return fileId + } + return parts[0] +} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 69d440a73..14b9cb208 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -2,6 +2,7 @@ package filesys import ( "context" + "io" "os" "sort" "time" @@ -32,6 +33,7 @@ type File struct { entry *filer_pb.Entry entryViewCache []filer2.VisibleInterval isOpen int + reader io.ReadSeeker } func (file *File) fullpath() filer2.FullPath { @@ -119,6 +121,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } file.entry.Chunks = chunks file.entryViewCache = nil + file.reader = nil } file.entry.Attributes.FileSize = req.Size } @@ -245,6 +248,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { file.entryViewCache = newVisibles newVisibles = t } + file.reader = nil glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) @@ -254,6 +258,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.reader = nil } func (file *File) saveEntry() error { diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 100c9eba0..bfdafd580 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,6 +3,8 @@ package filesys import ( "context" "fmt" + "io" + "math" "mime" "path" "time" @@ -85,17 +87,23 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { if fh.f.entryViewCache == nil { fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.reader = nil + } + if fh.f.reader == nil { + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32) + fh.f.reader = filer2.NewChunkStreamReaderFromClient(fh.f.wfs, chunkViews) } - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff)) - - totalRead, err := filer2.ReadIntoBuffer(fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset) + fh.f.reader.Seek(offset, io.SeekStart) + totalRead, err := fh.f.reader.Read(buff) if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - return totalRead, err + // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + + return int64(totalRead), err } // Write to the file handle diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 5322492dc..14414de65 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -94,7 +94,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, ext := filepath.Ext(filename) width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { - chunkedFileReader := filer2.NewChunkStreamReader(fs.filer.MasterClient, entry.Chunks) + chunkedFileReader := filer2.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, entry.Chunks) rs, _, _ := images.Resized(ext, chunkedFileReader, width, height, mode) io.Copy(w, rs) return