From 2ef6ab998c68f385be3076f41af703ac83bbf5b2 Mon Sep 17 00:00:00 2001 From: Patrick Schmidt Date: Sun, 21 Aug 2022 21:18:13 +0200 Subject: [PATCH] Avoid race conditions with current filer address (#3474) When multiple filer requests are in-flight and the current filer disappears and a new one is selected by the first goroutine, then there can be a lot of race conditions while retrieving the current filer. Therefore, load/save the current filer index atomically. --- weed/mount/weedfs.go | 22 +++++++++++++--------- weed/mount/wfs_filer_client.go | 12 +++++++----- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index aaaa2877d..d7d5695da 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -2,7 +2,16 @@ package mount import ( "context" + "math/rand" + "os" + "path" + "path/filepath" + "sync/atomic" + "time" + "github.com/hanwen/go-fuse/v2/fuse" + "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -13,12 +22,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/grace" "github.com/seaweedfs/seaweedfs/weed/wdclient" - "google.golang.org/grpc" - "math/rand" - "os" - "path" - "path/filepath" - "time" "github.com/hanwen/go-fuse/v2/fs" ) @@ -26,7 +29,7 @@ import ( type Option struct { MountDirectory string FilerAddresses []pb.ServerAddress - filerIndex int + filerIndex int32 GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string @@ -86,7 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { dhmap: NewDirectoryHandleToInode(), } - wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses)) + wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses))) wfs.option.setupUniqueCacheDirectory() if option.CacheSizeMB > 0 { wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024) @@ -181,7 +184,8 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { } func (wfs *WFS) getCurrentFiler() pb.ServerAddress { - return wfs.option.FilerAddresses[wfs.option.filerIndex] + i := atomic.LoadInt32(&wfs.option.filerIndex) + return wfs.option.FilerAddresses[i] } func (option *Option) setupUniqueCacheDirectory() { diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go index 947369d21..020970df7 100644 --- a/weed/mount/wfs_filer_client.go +++ b/weed/mount/wfs_filer_client.go @@ -1,12 +1,14 @@ package mount import ( - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" + "sync/atomic" + "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" ) var _ = filer_pb.FilerClient(&WFS{}) @@ -15,7 +17,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile return util.Retry("filer grpc", func() error { - i := wfs.option.filerIndex + i := atomic.LoadInt32(&wfs.option.filerIndex) n := len(wfs.option.FilerAddresses) for x := 0; x < n; x++ { @@ -28,12 +30,12 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile if err != nil { glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) } else { - wfs.option.filerIndex = i + atomic.StoreInt32(&wfs.option.filerIndex, i) return nil } i++ - if i >= n { + if i >= int32(n) { i = 0 }