From 3bf0116de1525517c82854de15d8dc3a0b59817b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 21 Oct 2020 02:16:21 -0700 Subject: [PATCH] mount: less channel waiting --- weed/filesys/dirty_page.go | 25 +++++++++++++++++++----- weed/filesys/filehandle.go | 14 +++---------- weed/util/limiter.go | 40 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 16 deletions(-) create mode 100644 weed/util/limiter.go diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index a200050c4..9080b2aef 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,11 +2,17 @@ package filesys import ( "bytes" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" + "runtime" "sync" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var ( + concurrentWriterLimit = runtime.NumCPU() ) type ContinuousDirtyPages struct { @@ -15,17 +21,26 @@ type ContinuousDirtyPages struct { writeWaitGroup sync.WaitGroup chunkSaveErrChan chan error chunkSaveErrChanClosed bool + lastErr error lock sync.Mutex collection string replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { - return &ContinuousDirtyPages{ + dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, 8), + chunkSaveErrChan: make(chan error, concurrentWriterLimit), } + go func() { + for t := range dirtyPages.chunkSaveErrChan { + if t != nil { + dirtyPages.lastErr = t + } + } + }() + return dirtyPages } func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { @@ -105,7 +120,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.chunkSaveErrChan <- nil + glog.V(0).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) }() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 412d7e73f..e3163117c 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -208,25 +208,17 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { fh.dirtyPages.saveExistingPagesToStorage() - var err error - go func() { - for t := range fh.dirtyPages.chunkSaveErrChan { - if t != nil { - err = t - } - } - }() fh.dirtyPages.writeWaitGroup.Wait() - if err != nil { - return err + if fh.dirtyPages.lastErr != nil { + return fh.dirtyPages.lastErr } if !fh.f.dirtyMetadata { return nil } - err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType diff --git a/weed/util/limiter.go b/weed/util/limiter.go new file mode 100644 index 000000000..91499632c --- /dev/null +++ b/weed/util/limiter.go @@ -0,0 +1,40 @@ +package util + +// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go + +// LimitedConcurrentExecutor object +type LimitedConcurrentExecutor struct { + limit int + tokenChan chan int +} + +func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { + + // allocate a limiter instance + c := &LimitedConcurrentExecutor{ + limit: limit, + tokenChan: make(chan int, limit), + } + + // allocate the tokenChan: + for i := 0; i < c.limit; i++ { + c.tokenChan <- i + } + + return c +} + +// Execute adds a function to the execution queue. +// if num of go routines allocated by this instance is < limit +// launch a new go routine to execute job +// else wait until a go routine becomes available +func (c *LimitedConcurrentExecutor) Execute(job func()) { + token := <-c.tokenChan + go func() { + defer func() { + c.tokenChan <- token + }() + // run the job + job() + }() +}