mirror of
https://github.com/chrislusf/seaweedfs
synced 2024-07-01 06:40:45 +02:00
fix: add doDeleteFile option for filer backup
This commit is contained in:
parent
17710e1ecb
commit
2b229e98ce
|
@ -20,6 +20,7 @@ type FilerBackupOptions struct {
|
||||||
excludeFileName *string
|
excludeFileName *string
|
||||||
debug *bool
|
debug *bool
|
||||||
proxyByFiler *bool
|
proxyByFiler *bool
|
||||||
|
doDeleteFiles *bool
|
||||||
timeAgo *time.Duration
|
timeAgo *time.Duration
|
||||||
retentionDays *int
|
retentionDays *int
|
||||||
}
|
}
|
||||||
|
@ -35,10 +36,10 @@ func init() {
|
||||||
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
|
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
|
||||||
filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer")
|
filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer")
|
||||||
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
||||||
|
filerBackupOptions.doDeleteFiles = cmdFilerBackup.Flag.Bool("doDeleteFiles", false, "delete files on the destination")
|
||||||
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
||||||
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||||
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
|
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdFilerBackup = &Command{
|
var cmdFilerBackup = &Command{
|
||||||
|
@ -129,7 +130,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||||
*backupOption.proxyByFiler)
|
*backupOption.proxyByFiler)
|
||||||
dataSink.SetSourceFiler(filerSource)
|
dataSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, debug)
|
processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
|
||||||
|
|
||||||
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
||||||
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
||||||
|
|
|
@ -251,7 +251,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
|
||||||
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
||||||
filerSink.SetSourceFiler(filerSource)
|
filerSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, debug)
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, true, debug)
|
||||||
|
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
@ -369,7 +369,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
// process function
|
// process function
|
||||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
@ -397,17 +397,20 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||||
if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) {
|
if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if dataSink.IsIncremental() {
|
||||||
|
doDeleteFiles = false
|
||||||
|
}
|
||||||
// handle deletions
|
// handle deletions
|
||||||
if filer_pb.IsDelete(resp) {
|
if filer_pb.IsDelete(resp) {
|
||||||
|
if doDeleteFiles {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
||||||
if !dataSink.IsIncremental() {
|
|
||||||
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle new entries
|
// handle new entries
|
||||||
if filer_pb.IsCreate(resp) {
|
if filer_pb.IsCreate(resp) {
|
||||||
|
@ -432,7 +435,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||||
// old key is in the watched directory
|
// old key is in the watched directory
|
||||||
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
|
||||||
// new key is also in the watched directory
|
// new key is also in the watched directory
|
||||||
if !dataSink.IsIncremental() {
|
if doDeleteFiles {
|
||||||
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
|
||||||
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
|
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
|
||||||
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
|
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
|
||||||
|
@ -455,7 +458,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// new key is outside of the watched directory
|
// new key is outside of the watched directory
|
||||||
if !dataSink.IsIncremental() {
|
if doDeleteFiles {
|
||||||
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
|
||||||
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue