From 8dfac6a4cf8d2d21a3a69528136422e7f7a75d32 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 4 Nov 2018 11:58:59 -0800 Subject: [PATCH] working b2 sink --- weed/command/filer_replication.go | 2 ++ weed/command/scaffold.go | 11 +++++----- weed/replication/replicator.go | 4 +++- weed/replication/sink/azuresink/azure_sink.go | 13 +++++++++++ weed/replication/sink/b2sink/b2_sink.go | 22 ++++++++++++++++--- weed/replication/sink/s3sink/s3_sink.go | 13 +++++++++++ 6 files changed, 55 insertions(+), 10 deletions(-) diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 419ae3174..9639ac98a 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -115,6 +115,8 @@ func runFilerReplicate(cmd *Command, args []string) bool { } if err = replicator.Replicate(key, m); err != nil { glog.Errorf("replicate %s: %+v", key, err) + } else { + glog.V(4).Infof("replicated %s", key) } } diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 4d836d9bc..56fc50516 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -209,7 +209,7 @@ aws_access_key_id = "" # if empty, loads from the shared credentials fil aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). region = "us-east-2" bucket = "your_bucket_name" # an existing bucket -directory = "" # destination directory (do not prefix or suffix with "/") +directory = "/" # destination directory [sink.google_cloud_storage] # read credentials doc at https://cloud.google.com/docs/authentication/getting-started @@ -224,15 +224,14 @@ enabled = false account_name = "" account_key = "" container = "mycontainer" # an existing container -directory = "" # destination directory (do not prefix or suffix with "/") +directory = "/" # destination directory [sink.backblaze] -# experimental, let me know if it works enabled = false -account_id = "" -account_key = "" +b2_account_id = "" +b2_master_application_key = "" bucket = "mybucket" # an existing bucket -directory = "" # destination directory (do not prefix or suffix with "/") +directory = "/" # destination directory ` ) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 208e0894a..ac8235fd5 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -34,7 +34,9 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) return nil } - key = filepath.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) + newKey := filepath.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) + glog.V(3).Infof("replicate %s => %s", key, newKey) + key = newKey if message.OldEntry != nil && message.NewEntry == nil { glog.V(4).Infof("deleting %v", key) return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 6adbfd75b..7acf37fa5 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/url" + "strings" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -71,6 +72,8 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + key = cleanKey(key) + if isDirectory { key = key + "/" } @@ -88,6 +91,8 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { + key = cleanKey(key) + if entry.IsDirectory { return nil } @@ -132,6 +137,14 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } func (g *AzureSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { + key = cleanKey(key) // TODO improve efficiency return false, nil } + +func cleanKey(key string) string { + if strings.HasPrefix(key, "/") { + key = key[1:] + } + return key +} diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index ce0e9eb3c..17f5e39b2 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -2,6 +2,8 @@ package B2Sink import ( "context" + "strings" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" @@ -31,8 +33,8 @@ func (g *B2Sink) GetSinkToDirectory() string { func (g *B2Sink) Initialize(configuration util.Configuration) error { return g.initialize( - configuration.GetString("account_id"), - configuration.GetString("account_key"), + configuration.GetString("b2_account_id"), + configuration.GetString("b2_master_application_key"), configuration.GetString("bucket"), configuration.GetString("directory"), ) @@ -46,7 +48,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { ctx := context.Background() client, err := b2.NewClient(ctx, accountId, accountKey) if err != nil { - return nil + return err } g.client = client @@ -58,6 +60,8 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + key = cleanKey(key) + if isDirectory { key = key + "/" } @@ -77,6 +81,8 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { + key = cleanKey(key) + if entry.IsDirectory { return nil } @@ -123,6 +129,16 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { + + key = cleanKey(key) + // TODO improve efficiency return false, nil } + +func cleanKey(key string) string { + if strings.HasPrefix(key, "/") { + key = key[1:] + } + return key +} diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 3d497d795..0a4e78318 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -2,6 +2,7 @@ package S3Sink import ( "fmt" + "strings" "sync" "github.com/aws/aws-sdk-go/aws" @@ -77,6 +78,8 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, buc func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + key = cleanKey(key) + if isDirectory { key = key + "/" } @@ -87,6 +90,8 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { + key = cleanKey(key) + if entry.IsDirectory { return nil } @@ -125,6 +130,14 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } func (s3sink *S3Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { + key = cleanKey(key) // TODO improve efficiency return false, nil } + +func cleanKey(key string) string { + if strings.HasPrefix(key, "/") { + key = key[1:] + } + return key +}