diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 57198c865..bb1052656 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -57,7 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { lc: lc, } go func() { - util.RetryForever("create lock:"+key, func() error { + util.RetryUntil("create lock:"+key, func() error { errorMessage, err := lock.doLock(lock_manager.MaxDuration) if err != nil { glog.Infof("create lock %s: %s", key, err) @@ -98,7 +98,7 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st lockDuration = lc.maxLockDuration needRenewal = true } - util.RetryForever("create lock:"+key, func() error { + util.RetryUntil("create lock:"+key, func() error { errorMessage, err := lock.doLock(lockDuration) if err != nil { time.Sleep(time.Second) @@ -148,7 +148,7 @@ func (lc *LockClient) keepLock(lock *LiveLock) { select { case <-ticker: // renew the lock if lock.expireAtNs is still greater than now - util.RetryForever("keep lock:"+lock.key, func() error { + util.RetryUntil("keep lock:"+lock.key, func() error { lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond if lockDuration > lc.maxLockDuration { lockDuration = lc.maxLockDuration diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index f7b1f3146..61a5d26a2 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -111,7 +111,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool { // synchronize /buckets folder fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) - util.RetryForever("filer.remote.sync buckets", func() error { + util.RetryUntil("filer.remote.sync buckets", func() error { return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) }, func(err error) bool { if err != nil { diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 261e024a6..2d6133367 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -90,7 +90,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { if dir != "" { fmt.Printf("synchronize %s to remote storage...\n", dir) - util.RetryForever("filer.remote.sync "+dir, func() error { + util.RetryUntil("filer.remote.sync "+dir, func() error { return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) }, func(err error) bool { if err != nil { diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index d5d8150bc..9500bd9db 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -70,7 +70,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil StopTsNs: 0, EventErrorType: pb.FatalOnError, } - util.RetryForever("followMetaUpdates", func() error { + util.RetryUntil("followMetaUpdates", func() error { clientEpoch++ return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn) }, func(err error) bool { diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 17e767472..108ec3df2 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -116,7 +116,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A return uploadErr } if uploadOption.RetryForever { - util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) { + util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) { glog.V(0).Infof("upload content: %v", err) return true }) diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 674d55ad8..b54dad871 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -90,7 +90,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc case FatalOnError: glog.Fatalf("process %v: %v", resp, err) case RetryForeverOnError: - util.RetryForever("followMetaUpdates", func() error { + util.RetryUntil("followMetaUpdates", func() error { return processEventFn(resp) }, func(err error) bool { glog.Errorf("process %v: %v", resp, err) diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 8006ff326..766408909 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -46,7 +46,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p StopTsNs: 0, EventErrorType: pb.FatalOnError, } - util.RetryForever("followIamChanges", func() error { + util.RetryUntil("followIamChanges", func() error { clientEpoch++ return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn) }, func(err error) bool { diff --git a/weed/util/retry.go b/weed/util/retry.go index 997fa5d13..cdc020b7a 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -57,7 +57,8 @@ func MultiRetry(name string, errList []string, job func() error) (err error) { return err } -func RetryForever(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) { +// RetryUntil retries until the job returns no error or onErrFn returns false +func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) { waitTime := time.Second for { err := job() @@ -74,6 +75,8 @@ func RetryForever(name string, job func() error, onErrFn func(err error) (should waitTime += waitTime / 2 } continue + } else { + break } } }