diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index cc43eba64..6b4a820e6 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -4,15 +4,15 @@ import ( "bytes" "crypto/md5" "fmt" - "golang.org/x/exp/slices" "hash" "io" "net/http" "strconv" "sync" - "sync/atomic" "time" + "golang.org/x/exp/slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -49,23 +49,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) var wg sync.WaitGroup - var bytesBufferCounter int64 - bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) + var bytesBufferCounter int64 = 4 + bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter) var fileChunksLock sync.Mutex var uploadErrLock sync.Mutex for { // need to throttle used byte buffer - bytesBufferLimitCond.L.Lock() - for atomic.LoadInt64(&bytesBufferCounter) >= 4 { - glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter)) - bytesBufferLimitCond.Wait() - } - atomic.AddInt64(&bytesBufferCounter, 1) - bytesBufferLimitCond.L.Unlock() + bytesBufferLimitChan <- struct{}{} bytesBuffer := bufPool.Get().(*bytes.Buffer) - glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter)) limitedReader := io.LimitReader(partReader, int64(chunkSize)) @@ -76,8 +69,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque // data, err := io.ReadAll(limitedReader) if err != nil || dataSize == 0 { bufPool.Put(bytesBuffer) - atomic.AddInt64(&bytesBufferCounter, -1) - bytesBufferLimitCond.Signal() + <-bytesBufferLimitChan uploadErrLock.Lock() uploadErr = err uploadErrLock.Unlock() @@ -89,8 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque smallContent = make([]byte, dataSize) bytesBuffer.Read(smallContent) bufPool.Put(bytesBuffer) - atomic.AddInt64(&bytesBufferCounter, -1) - bytesBufferLimitCond.Signal() + <-bytesBufferLimitChan stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc() break } @@ -102,8 +93,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque go func(offset int64) { defer func() { bufPool.Put(bytesBuffer) - atomic.AddInt64(&bytesBufferCounter, -1) - bytesBufferLimitCond.Signal() + <-bytesBufferLimitChan wg.Done() }()