1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-08 18:16:50 +02:00

fix deadlock for filer upload (#4527)

Signed-off-by: wang wusong <wangwusong@virtaitech.com>
Co-authored-by: wang wusong <wangwusong@virtaitech.com>
This commit is contained in:
wusong 2023-06-04 04:38:27 +08:00 committed by GitHub
parent e23f3d6eca
commit 5aec6da8a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,15 +4,15 @@ import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"golang.org/x/exp/slices"
"hash" "hash"
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"golang.org/x/exp/slices"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -49,23 +49,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
var wg sync.WaitGroup var wg sync.WaitGroup
var bytesBufferCounter int64 var bytesBufferCounter int64 = 4
bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter)
var fileChunksLock sync.Mutex var fileChunksLock sync.Mutex
var uploadErrLock sync.Mutex var uploadErrLock sync.Mutex
for { for {
// need to throttle used byte buffer // need to throttle used byte buffer
bytesBufferLimitCond.L.Lock() bytesBufferLimitChan <- struct{}{}
for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
bytesBufferLimitCond.Wait()
}
atomic.AddInt64(&bytesBufferCounter, 1)
bytesBufferLimitCond.L.Unlock()
bytesBuffer := bufPool.Get().(*bytes.Buffer) bytesBuffer := bufPool.Get().(*bytes.Buffer)
glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
limitedReader := io.LimitReader(partReader, int64(chunkSize)) limitedReader := io.LimitReader(partReader, int64(chunkSize))
@ -76,8 +69,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
// data, err := io.ReadAll(limitedReader) // data, err := io.ReadAll(limitedReader)
if err != nil || dataSize == 0 { if err != nil || dataSize == 0 {
bufPool.Put(bytesBuffer) bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1) <-bytesBufferLimitChan
bytesBufferLimitCond.Signal()
uploadErrLock.Lock() uploadErrLock.Lock()
uploadErr = err uploadErr = err
uploadErrLock.Unlock() uploadErrLock.Unlock()
@ -89,8 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
smallContent = make([]byte, dataSize) smallContent = make([]byte, dataSize)
bytesBuffer.Read(smallContent) bytesBuffer.Read(smallContent)
bufPool.Put(bytesBuffer) bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1) <-bytesBufferLimitChan
bytesBufferLimitCond.Signal()
stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc() stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc()
break break
} }
@ -102,8 +93,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
go func(offset int64) { go func(offset int64) {
defer func() { defer func() {
bufPool.Put(bytesBuffer) bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1) <-bytesBufferLimitChan
bytesBufferLimitCond.Signal()
wg.Done() wg.Done()
}() }()