diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index bb7f4c87e..b409dbf61 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -2,11 +2,12 @@ package filer import ( "fmt" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/wdclient" - "sync" - "time" ) type ReaderCache struct { @@ -19,17 +20,17 @@ type ReaderCache struct { type SingleChunkCacher struct { sync.Mutex - cond *sync.Cond - parent *ReaderCache - chunkFileId string - data []byte - err error - cipherKey []byte - isGzipped bool - chunkSize int - shouldCache bool - wg sync.WaitGroup - completedTime time.Time + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + cacheStartedCh chan struct{} + completedTime time.Time } func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { @@ -62,9 +63,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset) // cache this chunk if not yet cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false) - cacher.wg.Add(1) go cacher.startCaching() - cacher.wg.Wait() + <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher } @@ -87,6 +87,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt } } + // clean up old downloaders if len(rc.downloaders) >= rc.limit { oldestFid, oldestTime := "", time.Now() for fid, downloader := range rc.downloaders { @@ -106,9 +107,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt // glog.V(4).Infof("cache1 %s", fileId) cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache) - cacher.wg.Add(1) go cacher.startCaching() - cacher.wg.Wait() + <-cacher.cacheStartedCh rc.downloaders[fileId] = cacher return cacher.readChunkAt(buffer, offset) @@ -135,23 +135,24 @@ func (rc *ReaderCache) destroy() { } func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { - t := &SingleChunkCacher{ - parent: parent, - chunkFileId: fileId, - cipherKey: cipherKey, - isGzipped: isGzipped, - chunkSize: chunkSize, - shouldCache: shouldCache, + return &SingleChunkCacher{ + parent: parent, + chunkFileId: fileId, + cipherKey: cipherKey, + isGzipped: isGzipped, + chunkSize: chunkSize, + shouldCache: shouldCache, + cacheStartedCh: make(chan struct{}), } - t.cond = sync.NewCond(t) - return t } func (s *SingleChunkCacher) startCaching() { + s.wg.Add(1) + defer s.wg.Done() s.Lock() defer s.Unlock() - s.wg.Done() // means this has been started + s.cacheStartedCh <- struct{}{} // means this has been started urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId) if err != nil { @@ -168,16 +169,17 @@ func (s *SingleChunkCacher) startCaching() { return } - s.completedTime = time.Now() if s.shouldCache { s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) } - s.cond.Broadcast() + s.completedTime = time.Now() return } func (s *SingleChunkCacher) destroy() { + // wait for all reads to finish before destroying the data + s.wg.Wait() s.Lock() defer s.Unlock() @@ -185,16 +187,15 @@ func (s *SingleChunkCacher) destroy() { mem.Free(s.data) s.data = nil } + close(s.cacheStartedCh) } func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { + s.wg.Add(1) + defer s.wg.Done() s.Lock() defer s.Unlock() - for s.completedTime.IsZero() { - s.cond.Wait() - } - if s.err != nil { return 0, s.err }