diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9d5b9d831..8eb68c098 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -251,16 +251,21 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } return persistEventFn(resp) } + processor := NewMetadataProcessor(processEventFn, 128) var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) - processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { + processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { + processor.AddSyncJob(resp) + return nil + }, 3*time.Second, func(counter int64, lastTsNs int64) error { + // use processor.processedTsWatermark instead of the lastTsNs from the most recent job now := time.Now().UnixNano() - glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now // collect synchronous offset - statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs)) - return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark)) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark) }) return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch, diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go new file mode 100644 index 000000000..0d4d83adb --- /dev/null +++ b/weed/command/filer_sync_jobs.go @@ -0,0 +1,148 @@ +package command + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "sync" +) + +type MetadataProcessFunc func(resp *filer_pb.SubscribeMetadataResponse) error + +type MetadataProcessor struct { + activeJobs map[int64]*filer_pb.SubscribeMetadataResponse + activeJobsLock sync.Mutex + activeJobsCond *sync.Cond + concurrencyLimit int + fn MetadataProcessFunc + processedTsWatermark int64 +} + +func NewMetadataProcessor(fn MetadataProcessFunc, concurrency int) *MetadataProcessor { + t := &MetadataProcessor{ + fn: fn, + activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), + concurrencyLimit: concurrency, + } + t.activeJobsCond = sync.NewCond(&t.activeJobsLock) + return t +} + +func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) { + if filer_pb.IsEmpty(resp) { + return + } + + t.activeJobsLock.Lock() + defer t.activeJobsLock.Unlock() + + for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) { + t.activeJobsCond.Wait() + } + t.activeJobs[resp.TsNs] = resp + go func() { + + util.RetryForever("metadata processor", func() error { + return t.fn(resp) + }, func(err error) bool { + glog.Errorf("process %v: %v", resp, err) + return true + }) + + t.activeJobsLock.Lock() + defer t.activeJobsLock.Unlock() + + delete(t.activeJobs, resp.TsNs) + + // if is the oldest job, write down the watermark + isOldest := true + for t, _ := range t.activeJobs { + if resp.TsNs > t { + isOldest = false + break + } + } + if isOldest { + t.processedTsWatermark = resp.TsNs + } + t.activeJobsCond.Signal() + }() +} + +func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool { + for _, r := range t.activeJobs { + if shouldWaitFor(resp, r) { + return true + } + } + return false +} + +// a is one possible job to schedule +// b is one existing active job +func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool { + aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a) + bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b) + + if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) { + return true + } + if aNewPath != "" { + if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) { + return true + } + } + if bNewPath != "" { + if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) { + return true + } + } + if aNewPath != "" && bNewPath != "" { + if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) { + return true + } + } + return false +} + +func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool { + if bIsDirectory { + if aIsDirectory { + return aPath.IsUnder(bPath) || bPath.IsUnder(aPath) + } else { + return aPath.IsUnder(bPath) + } + } else { + if aIsDirectory { + return bPath.IsUnder(aPath) + } else { + return aPath == bPath + } + } +} + +func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) { + oldEntry := resp.EventNotification.OldEntry + newEntry := resp.EventNotification.NewEntry + // create + if filer_pb.IsCreate(resp) { + path = util.FullPath(resp.Directory).Child(newEntry.Name) + isDirectory = newEntry.IsDirectory + return + } + if filer_pb.IsDelete(resp) { + path = util.FullPath(resp.Directory).Child(oldEntry.Name) + isDirectory = oldEntry.IsDirectory + return + } + if filer_pb.IsUpdate(resp) { + path = util.FullPath(resp.Directory).Child(newEntry.Name) + isDirectory = newEntry.IsDirectory + return + } + // renaming + path = util.FullPath(resp.Directory).Child(oldEntry.Name) + isDirectory = oldEntry.IsDirectory + newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name) + return +} diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index 92580dc38..6ac2af14f 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -63,6 +63,14 @@ func Join(names ...string) string { func JoinPath(names ...string) FullPath { return FullPath(Join(names...)) } + +func (fp FullPath) IsUnder(other FullPath) bool { + if other == "/" { + return true + } + return strings.HasPrefix(string(fp), string(other)+"/") +} + func Split(separatedValues string, sep string) []string { if separatedValues == "" { return nil