1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-05 16:47:04 +02:00

use a short-lived lock

This commit is contained in:
chrislu 2024-02-01 23:01:44 -08:00
parent 0aed16a9c4
commit d30150dde1
3 changed files with 25 additions and 38 deletions

View file

@ -40,9 +40,19 @@ type LiveLock struct {
lc *LockClient lc *LockClient
} }
// NewLock creates a lock with a very long duration // NewShortLivedLock creates a lock with a 5-second duration
func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) { func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) {
return lc.doNewLock(key, lock_manager.MaxDuration, owner) lock = &LiveLock{
key: key,
filer: lc.seedFiler,
cancelCh: make(chan struct{}),
expireAtNs: time.Now().Add(5*time.Second).UnixNano(),
grpcDialOption: lc.grpcDialOption,
owner: owner,
lc: lc,
}
lock.retryUntilLocked(5*time.Second)
return
} }
// StartLock starts a goroutine to lock the key and returns immediately. // StartLock starts a goroutine to lock the key and returns immediately.
@ -57,40 +67,15 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
lc: lc, lc: lc,
} }
go func() { go func() {
lock.CreateLock(lock_manager.MaxDuration) lock.retryUntilLocked(lock_manager.MaxDuration)
lc.keepLock(lock) lc.keepLock(lock)
}() }()
return return
} }
func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) { func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
lock = &LiveLock{
key: key,
filer: lc.seedFiler,
cancelCh: make(chan struct{}),
expireAtNs: time.Now().Add(lockDuration).UnixNano(),
grpcDialOption: lc.grpcDialOption,
owner: owner,
lc: lc,
}
var needRenewal bool
if lockDuration > lc.maxLockDuration {
lockDuration = lc.maxLockDuration
needRenewal = true
}
lock.CreateLock(lockDuration)
if needRenewal {
go lc.keepLock(lock)
}
return
}
func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
util.RetryUntil("create lock:"+lock.key, func() error { util.RetryUntil("create lock:"+lock.key, func() error {
return lock.DoLock(lockDuration) return lock.AttemptToLock(lockDuration)
}, func(err error) (shouldContinue bool) { }, func(err error) (shouldContinue bool) {
if err != nil { if err != nil {
glog.Warningf("create lock %s: %s", lock.key, err) glog.Warningf("create lock %s: %s", lock.key, err)
@ -99,7 +84,7 @@ func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
}) })
} }
func (lock *LiveLock) DoLock(lockDuration time.Duration) error { func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
errorMessage, err := lock.doLock(lockDuration) errorMessage, err := lock.doLock(lockDuration)
if err != nil { if err != nil {
time.Sleep(time.Second) time.Sleep(time.Second)
@ -117,11 +102,13 @@ func (lock *LiveLock) IsLocked() bool {
return lock!=nil && lock.isLocked return lock!=nil && lock.isLocked
} }
func (lock *LiveLock) StopLock() error { func (lock *LiveLock) StopShortLivedLock() error {
close(lock.cancelCh)
if !lock.isLocked { if !lock.isLocked {
return nil return nil
} }
defer func() {
lock.isLocked = false
}()
return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
Name: lock.key, Name: lock.key,

View file

@ -92,8 +92,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
glog.V(0).Infof("BrokerConnectToBalancer: %v", err) glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil { if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil {
glog.V(0).Infof("DoLock: %v", err) glog.V(0).Infof("AttemptToLock: %v", err)
} }
} }
}() }()

View file

@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
fullpath := util.NewFullPath(req.Directory, req.EntryName) fullpath := util.NewFullPath(req.Directory, req.EntryName)
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host) lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
lock := lockClient.NewLock(string(fullpath), string(fs.option.Host)) lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
defer lock.StopLock() defer lock.StopShortLivedLock()
var offset int64 = 0 var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, fullpath) entry, err := fs.filer.FindEntry(ctx, fullpath)