1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-06-29 13:50:10 +02:00

feat(filer.sync): add fromTsMs. Extract signature from doSubscribeFilerMetaChanges

This commit is contained in:
zhihao.qu 2022-06-09 10:53:19 +08:00
parent 4a5135961f
commit cd5cca36a4

View file

@ -15,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
"os"
"strings"
"time"
)
@ -35,6 +36,8 @@ type SyncOptions struct {
bDiskType *string
aDebug *bool
bDebug *bool
aFromTsMs *int64
bFromTsMs *int64
aProxyByFiler *bool
bProxyByFiler *bool
clientId int32
@ -65,6 +68,8 @@ func init() {
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.clientId = util.RandomInt32()
@ -97,10 +102,32 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
filerA := pb.ServerAddress(*syncOptions.filerA)
filerB := pb.ServerAddress(*syncOptions.filerB)
// read a filer signature
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
if aFilerErr != nil {
glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
return true
}
// read b filer signature
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
if bFilerErr != nil {
glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
return true
}
go func() {
// a->b
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
os.Exit(2)
}
for {
err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
*syncOptions.bDebug, aFilerSignature, bFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@ -109,10 +136,18 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}()
if !*syncOptions.isActivePassive {
// b->a
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
os.Exit(2)
}
go func() {
for {
err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
*syncOptions.aDebug, bFilerSignature, aFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@ -126,19 +161,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// initOffsetFromTsMs Initialize offset
func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
if fromTsMs <= 0 {
return nil
}
// convert to nanosecond
fromTsNs := fromTsMs * 1000_000
// If not successful, exit the program.
setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
if setOffsetErr != nil {
return setOffsetErr
}
glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
return nil
}
// read source filer signature
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
if sourceErr != nil {
return sourceErr
}
// read target filer signature
targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
if targetErr != nil {
return targetErr
}
func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time