1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-05 08:36:55 +02:00

Avoid race conditions with current filer address (#3474)

When multiple filer requests are in-flight and the current filer
disappears and a new one is selected by the first goroutine, then
there can be a lot of race conditions while retrieving the current
filer.
Therefore, load/save the current filer index atomically.
This commit is contained in:
Patrick Schmidt 2022-08-21 21:18:13 +02:00 committed by GitHub
parent f49a9297c2
commit 2ef6ab998c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 14 deletions

View file

@ -2,7 +2,16 @@ package mount
import (
"context"
"math/rand"
"os"
"path"
"path/filepath"
"sync/atomic"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -13,12 +22,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"math/rand"
"os"
"path"
"path/filepath"
"time"
"github.com/hanwen/go-fuse/v2/fs"
)
@ -26,7 +29,7 @@ import (
type Option struct {
MountDirectory string
FilerAddresses []pb.ServerAddress
filerIndex int
filerIndex int32
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
@ -86,7 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
dhmap: NewDirectoryHandleToInode(),
}
wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
@ -181,7 +184,8 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
i := atomic.LoadInt32(&wfs.option.filerIndex)
return wfs.option.FilerAddresses[i]
}
func (option *Option) setupUniqueCacheDirectory() {

View file

@ -1,12 +1,14 @@
package mount
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync/atomic"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var _ = filer_pb.FilerClient(&WFS{})
@ -15,7 +17,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile
return util.Retry("filer grpc", func() error {
i := wfs.option.filerIndex
i := atomic.LoadInt32(&wfs.option.filerIndex)
n := len(wfs.option.FilerAddresses)
for x := 0; x < n; x++ {
@ -28,12 +30,12 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile
if err != nil {
glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
} else {
wfs.option.filerIndex = i
atomic.StoreInt32(&wfs.option.filerIndex, i)
return nil
}
i++
if i >= n {
if i >= int32(n) {
i = 0
}