mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-07-06 17:17:25 +02:00
move to util.RetryWaitTime
This commit is contained in:
parent
ef908e166b
commit
8750cac090
|
@ -5,8 +5,6 @@ package command
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
|
||||||
"os"
|
"os"
|
||||||
"os/user"
|
"os/user"
|
||||||
"path"
|
"path"
|
||||||
|
@ -15,6 +13,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
||||||
|
|
||||||
"github.com/seaweedfs/fuse"
|
"github.com/seaweedfs/fuse"
|
||||||
"github.com/seaweedfs/fuse/fs"
|
"github.com/seaweedfs/fuse/fs"
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ func runMount(cmd *Command, args []string) bool {
|
||||||
if *mountReadRetryTime < time.Second {
|
if *mountReadRetryTime < time.Second {
|
||||||
*mountReadRetryTime = time.Second
|
*mountReadRetryTime = time.Second
|
||||||
}
|
}
|
||||||
filer.ReadWaitTime = *mountReadRetryTime
|
util.RetryWaitTime = *mountReadRetryTime
|
||||||
|
|
||||||
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
|
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
|
||||||
if umaskErr != nil {
|
if umaskErr != nil {
|
||||||
|
|
|
@ -99,7 +99,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
var shouldRetry bool
|
var shouldRetry bool
|
||||||
|
|
||||||
for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
|
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
|
||||||
for _, urlString := range urlStrings {
|
for _, urlString := range urlStrings {
|
||||||
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
||||||
buffer.Write(data)
|
buffer.Write(data)
|
||||||
|
|
|
@ -3,20 +3,16 @@ package filer
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"github.com/golang/groupcache/singleflight"
|
"github.com/golang/groupcache/singleflight"
|
||||||
"io"
|
|
||||||
"math/rand"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
ReadWaitTime = 6 * time.Second
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ChunkReadAt struct {
|
type ChunkReadAt struct {
|
||||||
|
@ -47,7 +43,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
|
||||||
vicCacheLock.RUnlock()
|
vicCacheLock.RUnlock()
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
util.Retry("lookup volume "+vid, ReadWaitTime, func() error {
|
util.Retry("lookup volume "+vid, func() error {
|
||||||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
||||||
VolumeIds: []string{vid},
|
VolumeIds: []string{vid},
|
||||||
|
|
|
@ -16,7 +16,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
|
||||||
|
|
||||||
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
|
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
|
||||||
|
|
||||||
util.Retry("ReadDirAllEntries", filer.ReadWaitTime, func() error {
|
util.Retry("ReadDirAllEntries", func() error {
|
||||||
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
||||||
entry := filer.FromPbEntry(string(dirPath), pbEntry)
|
entry := filer.FromPbEntry(string(dirPath), pbEntry)
|
||||||
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
|
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package filesys
|
package filesys
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
@ -13,7 +12,7 @@ var _ = filer_pb.FilerClient(&WFS{})
|
||||||
|
|
||||||
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
|
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||||
|
|
||||||
err := util.Retry("filer grpc", filer.ReadWaitTime, func() error {
|
err := util.Retry("filer grpc", func() error {
|
||||||
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
|
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
|
||||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||||
return fn(client)
|
return fn(client)
|
||||||
|
|
|
@ -7,10 +7,12 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Retry(name string, waitTimeLimit time.Duration, job func() error) (err error) {
|
var RetryWaitTime = 6 * time.Second
|
||||||
|
|
||||||
|
func Retry(name string, job func() error) (err error) {
|
||||||
waitTime := time.Second
|
waitTime := time.Second
|
||||||
hasErr := false
|
hasErr := false
|
||||||
for waitTime < waitTimeLimit {
|
for waitTime < RetryWaitTime {
|
||||||
err = job()
|
err = job()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if hasErr {
|
if hasErr {
|
||||||
|
|
|
@ -151,7 +151,7 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
|
func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
|
||||||
return util.Retry("master grpc", 6*time.Second, func() error {
|
return util.Retry("master grpc", func() error {
|
||||||
for mc.currentMaster == "" {
|
for mc.currentMaster == "" {
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue