diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go new file mode 100644 index 000000000..ea23daf5e --- /dev/null +++ b/weed/command/filer_remote_gateway.go @@ -0,0 +1,113 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "os" + "time" +) + +type RemoteGatewayOptions struct { + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + timeAgo *time.Duration + createBucketAt *string + createBucketRandomSuffix *bool + + mappings *remote_pb.RemoteStorageMapping + remoteConfs map[string]*remote_pb.RemoteConf + bucketsDir string +} + +var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) + +func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) +} +func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +var ( + remoteGatewayOptions RemoteGatewayOptions +) + +func init() { + cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle + remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") + remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") + remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts") + remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") + remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerRemoteGateway = &Command{ + UsageLine: "filer.remote.gateway", + Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote storage", + Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote storage + + filer.remote.gateway listens on filer local buckets update events. + If any bucket is created, deleted, or updated, it will mirror the changes to remote object store. + + weed filer.remote.sync -createBucketAt=cloud1 + +`, +} + +func runFilerRemoteGateway(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + remoteGatewayOptions.grpcDialOption = grpcDialOption + + filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress) + + filerSource := &source.FilerSource{} + filerSource.DoInitialize( + filerAddress.ToHttpAddress(), + filerAddress.ToGrpcAddress(), + "/", // does not matter + *remoteGatewayOptions.readChunkFromFiler, + ) + + remoteGatewayOptions.bucketsDir = "/buckets" + // check buckets again + remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + remoteGatewayOptions.bucketsDir = resp.DirBuckets + return nil + }) + + // read filer remote storage mount mappings + if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil { + fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) + return true + } + + // synchronize /buckets folder + fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) + util.RetryForever("filer.remote.sync buckets", func() error { + return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err) + } + return true + }) + return true + +} diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_gateway_buckets.go similarity index 94% rename from weed/command/filer_remote_sync_buckets.go rename to weed/command/filer_remote_gateway_buckets.go index 73c8de1a9..e16e4f731 100644 --- a/weed/command/filer_remote_sync_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -17,7 +17,7 @@ import ( "time" ) -func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { +func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { // read filer remote storage mount mappings if detectErr := option.collectRemoteStorageConf(); detectErr != nil { @@ -35,13 +35,13 @@ func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSourc return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) }) - lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir) + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } -func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { +func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { handleCreateBucket := func(entry *filer_pb.Entry) error { if !entry.IsDirectory { @@ -307,7 +307,7 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. return eachEntryFunc, nil } -func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { +func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { bucket := util.FullPath(option.bucketsDir).Child(bucketName) var isMounted bool @@ -327,7 +327,7 @@ func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (cli return client, remoteStorageMountLocation, nil } -func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { +func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { bucket, ok = extractBucketPath(option.bucketsDir, actualDir) if !ok { return "", nil, nil, false @@ -355,7 +355,7 @@ func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { return util.FullPath(bucketsDir).Child(parts[0]), true } -func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { +func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) { if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil { return err diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 857fbb0eb..65cf8e91f 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -1,17 +1,14 @@ package command import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" - "os" "time" ) @@ -19,15 +16,9 @@ type RemoteSyncOptions struct { filerAddress *string grpcDialOption grpc.DialOption readChunkFromFiler *bool - debug *bool timeAgo *time.Duration dir *string - createBucketAt *string - createBucketRandomSuffix *bool - mappings *remote_pb.RemoteStorageMapping - remoteConfs map[string]*remote_pb.RemoteConf - bucketsDir string } var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) @@ -49,10 +40,7 @@ func init() { cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer") - remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") - remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") - remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") } @@ -100,18 +88,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { *remoteSyncOptions.readChunkFromFiler, ) - remoteSyncOptions.bucketsDir = "/buckets" - // check buckets again - remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { - resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - remoteSyncOptions.bucketsDir = resp.DirBuckets - return nil - }) - - if dir != "" && dir != remoteSyncOptions.bucketsDir { + if dir != "" { fmt.Printf("synchronize %s to remote storage...\n", dir) util.RetryForever("filer.remote.sync "+dir, func() error { return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) @@ -124,22 +101,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { return true } - // read filer remote storage mount mappings - if detectErr := remoteSyncOptions.collectRemoteStorageConf(); detectErr != nil { - fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) - return true - } - - // synchronize /buckets folder - fmt.Printf("synchronize buckets in %s ...\n", remoteSyncOptions.bucketsDir) - util.RetryForever("filer.remote.sync buckets", func() error { - return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource) - }, func(err error) bool { - if err != nil { - glog.Errorf("synchronize %s: %v", remoteSyncOptions.bucketsDir, err) - } - return true - }) return true } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 50f1e35cf..8ff933833 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" + "google.golang.org/grpc" "os" "strings" "time" @@ -36,7 +37,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs) }) - lastOffsetTs := collectLastSyncOffset(option, mountedDir) + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) @@ -159,19 +160,19 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, return eachEntryFunc, nil } -func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time { +func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time { // 1. specified by timeAgo // 2. last offset timestamp for this directory // 3. directory creation time var lastOffsetTs time.Time - if *option.timeAgo == 0 { - mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) + if timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir)) if err != nil { glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) return time.Now() } - lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir) + lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir) if mountedDirEntry != nil { if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { lastOffsetTs = time.Unix(0, lastOffsetTsNs) @@ -183,7 +184,7 @@ func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Ti lastOffsetTs = time.Now() } } else { - lastOffsetTs = time.Now().Add(-*option.timeAgo) + lastOffsetTs = time.Now().Add(-timeAgo) } return lastOffsetTs }