From 9fe24991d5b5be0cd3f56cbb65883c67c20fdfe6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 23 Sep 2018 00:40:36 -0700 Subject: [PATCH] refactoring --- weed/command/filer_replication.go | 13 +++++++++++++ weed/replication/replicator.go | 12 ++---------- .../sink/{ => filersink}/fetch_write.go | 2 +- .../replication/sink/{ => filersink}/filer_sink.go | 10 +--------- weed/replication/sink/replication_sink.go | 14 ++++++++++++++ 5 files changed, 31 insertions(+), 20 deletions(-) rename weed/replication/sink/{ => filersink}/fetch_write.go (99%) rename weed/replication/sink/{ => filersink}/filer_sink.go (94%) create mode 100644 weed/replication/sink/replication_sink.go diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 9f8f4442a..b19597245 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -5,6 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/server" "github.com/spf13/viper" + "strings" ) func init() { @@ -44,6 +45,18 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } + // avoid recursive replication + if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") { + sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer") + if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { + fromDir := sourceConfig.GetString("directory") + toDir := sinkConfig.GetString("directory") + if strings.HasPrefix(toDir, fromDir) { + glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) + } + } + } + replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer")) for { diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 3e4bccc10..5884bd35b 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -3,9 +3,9 @@ package replication import ( "strings" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -17,20 +17,12 @@ type Replicator struct { func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { - sink := &sink.FilerSink{} + sink := &filersink.FilerSink{} sink.Initialize(sinkConfig) source := &source.FilerSource{} source.Initialize(sourceConfig) - if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { - fromDir := sourceConfig.GetString("directory") - toDir := sinkConfig.GetString("directory") - if strings.HasPrefix(toDir, fromDir) { - glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) - } - } - sink.SetSourceFiler(source) return &Replicator{ diff --git a/weed/replication/sink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go similarity index 99% rename from weed/replication/sink/fetch_write.go rename to weed/replication/sink/filersink/fetch_write.go index ef7c201c9..c14566723 100644 --- a/weed/replication/sink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -1,4 +1,4 @@ -package sink +package filersink import ( "context" diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go similarity index 94% rename from weed/replication/sink/filer_sink.go rename to weed/replication/sink/filersink/filer_sink.go index f0a7e68d3..1cbf52864 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -1,4 +1,4 @@ -package sink +package filersink import ( "context" @@ -11,14 +11,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -type ReplicationSink interface { - DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error - CreateEntry(key string, entry *filer_pb.Entry) error - UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error - GetSinkToDirectory() string - SetSourceFiler(s *source.FilerSource) -} - type FilerSink struct { filerSource *source.FilerSource grpcAddress string diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go new file mode 100644 index 000000000..bb4a8aa83 --- /dev/null +++ b/weed/replication/sink/replication_sink.go @@ -0,0 +1,14 @@ +package sink + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" +) + +type ReplicationSink interface { + DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error + CreateEntry(key string, entry *filer_pb.Entry) error + UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + GetSinkToDirectory() string + SetSourceFiler(s *source.FilerSource) +}