1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-28 05:12:13 +02:00
seaweedfs/weed/wdclient/exclusive_locks/exclusive_locker.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

132 lines
3.3 KiB
Go

package exclusive_locks
import (
"context"
"sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
const (
RenewInteval = 4 * time.Second
SafeRenewInteval = 3 * time.Second
InitLockInteval = 1 * time.Second
)
type ExclusiveLocker struct {
token int64
lockTsNs int64
isLocking bool
masterClient *wdclient.MasterClient
lockName string
message string
}
func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker {
return &ExclusiveLocker{
masterClient: masterClient,
lockName: lockName,
}
}
func (l *ExclusiveLocker) IsLocking() bool {
return l.isLocking
}
func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
// wait until now is within the safe lock period, no immediate renewal to change the token
time.Sleep(100 * time.Millisecond)
}
return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
}
func (l *ExclusiveLocker) RequestLock(clientName string) {
if l.isLocking {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// retry to get the lease
for {
if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: l.lockName,
ClientName: clientName,
})
if err == nil {
atomic.StoreInt64(&l.token, resp.Token)
atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
}
return err
}); err != nil {
println("lock:", err.Error())
time.Sleep(InitLockInteval)
} else {
break
}
}
l.isLocking = true
// start a goroutine to renew the lease
go func() {
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
for l.isLocking {
if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: l.lockName,
ClientName: clientName,
Message: l.message,
})
if err == nil {
atomic.StoreInt64(&l.token, resp.Token)
atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
// println("ts", l.lockTsNs, "token", l.token)
}
return err
}); err != nil {
glog.Errorf("failed to renew lock: %v", err)
return
} else {
time.Sleep(RenewInteval)
}
}
}()
}
func (l *ExclusiveLocker) ReleaseLock() {
l.isLocking = false
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: l.lockName,
})
return nil
})
atomic.StoreInt64(&l.token, 0)
atomic.StoreInt64(&l.lockTsNs, 0)
}
func (l *ExclusiveLocker) SetMessage(message string) {
l.message = message
}