From 4c97ff3717dc642fd2cad311a79df9ba266669cb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 31 Oct 2018 01:11:19 -0700 Subject: [PATCH] support AWS SQS as file change notification message queue --- weed/command/filer_replication.go | 9 +- weed/command/scaffold.go | 15 +++ weed/filer2/filer_notify_test.go | 51 ++++++++ weed/notification/aws_sqs/aws_sqs_pub.go | 91 ++++++++++++++ weed/replication/sink/s3sink/s3_sink.go | 4 + weed/replication/sub/notification_aws_sqs.go | 111 ++++++++++++++++++ .../{ => sub}/notification_kafka.go | 2 +- weed/replication/{ => sub}/notifications.go | 2 +- weed/server/filer_server.go | 1 + 9 files changed, 282 insertions(+), 4 deletions(-) create mode 100644 weed/filer2/filer_notify_test.go create mode 100644 weed/notification/aws_sqs/aws_sqs_pub.go create mode 100644 weed/replication/sub/notification_aws_sqs.go rename weed/replication/{ => sub}/notification_kafka.go (99%) rename weed/replication/{ => sub}/notifications.go (95%) diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 05076143a..3ae4f1e2f 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -13,6 +13,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/server" "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/replication/sub" ) func init() { @@ -37,9 +38,9 @@ func runFilerReplicate(cmd *Command, args []string) bool { weed_server.LoadConfiguration("replication", true) config := viper.GetViper() - var notificationInput replication.NotificationInput + var notificationInput sub.NotificationInput - for _, input := range replication.NotificationInputs { + for _, input := range sub.NotificationInputs { if config.GetBool("notification." + input.GetName() + ".enabled") { viperSub := config.Sub("notification." + input.GetName()) if err := input.Initialize(viperSub); err != nil { @@ -99,6 +100,10 @@ func runFilerReplicate(cmd *Command, args []string) bool { glog.Errorf("receive %s: %+v", key, err) continue } + if key == "" { + // long poll received no messages + continue + } if m.OldEntry != nil && m.NewEntry == nil { glog.V(1).Infof("delete: %s", key) } else if m.OldEntry == nil && m.NewEntry != nil { diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 95ddbd57c..cc6e5d6ef 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -146,6 +146,14 @@ hosts = [ ] topic = "seaweedfs_filer" +[notification.aws_sqs] +# experimental, let me know if it works +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +sqs_queue_name = "my_filer_queue" # an existing queue name + ` REPLICATION_TOML_EXAMPLE = ` # A sample TOML config file for replicating SeaweedFS filer @@ -169,6 +177,13 @@ topic = "seaweedfs_filer1_to_filer2" offsetFile = "./last.offset" offsetSaveIntervalSeconds = 10 +[notification.aws_sqs] +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +sqs_queue_name = "my_filer_queue" # an existing queue name + [sink.filer] enabled = false grpcAddress = "localhost:18888" diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go new file mode 100644 index 000000000..ab54cd1a2 --- /dev/null +++ b/weed/filer2/filer_notify_test.go @@ -0,0 +1,51 @@ +package filer2 + +import ( + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/golang/protobuf/proto" +) + +func TestProtoMarshalText(t *testing.T) { + + oldEntry := &Entry{ + FullPath: FullPath("/this/path/to"), + Attr: Attr{ + Mtime: time.Now(), + Mode: 0644, + Uid: 1, + Mime: "text/json", + TtlSec: 25, + }, + Chunks: []*filer_pb.FileChunk{ + &filer_pb.FileChunk{ + FileId: "234,2423423422", + Offset: 234234, + Size: 234, + Mtime: 12312423, + ETag: "2342342354", + SourceFileId: "23234,2342342342", + }, + }, + } + + notification := &filer_pb.EventNotification{ + OldEntry: toProtoEntry(oldEntry), + NewEntry: toProtoEntry(nil), + DeleteChunks: true, + } + + text := proto.MarshalTextString(notification) + + notification2 := &filer_pb.EventNotification{} + proto.UnmarshalText(text, notification2) + + if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId { + t.Fatalf("marshal/unmarshal error: %s", text) + } + + println(text) + +} diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go new file mode 100644 index 000000000..ab8d28006 --- /dev/null +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -0,0 +1,91 @@ +package aws_sqs + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "fmt" + "github.com/aws/aws-sdk-go/aws/awserr" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &AwsSqsPub{}) +} + +type AwsSqsPub struct { + svc *sqs.SQS + queueUrl string +} + +func (k *AwsSqsPub) GetName() string { + return "aws_sqs" +} + +func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { + glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) + glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) + return k.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("sqs_queue_name"), + ) +} + +func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { + + config := &aws.Config{ + Region: aws.String(region), + } + if awsAccessKeyId != "" && aswSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return fmt.Errorf("create aws session: %v", err) + } + k.svc = sqs.New(sess) + + result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + return fmt.Errorf("unable to find queue %s", queueName) + } + return fmt.Errorf("get queue %s url: %v", queueName, err) + } + + k.queueUrl = *result.QueueUrl + + return nil +} + +func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) { + + text := proto.MarshalTextString(message) + + _, err = k.svc.SendMessage(&sqs.SendMessageInput{ + DelaySeconds: aws.Int64(10), + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "key": &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(key), + }, + }, + MessageBody: aws.String(text), + QueueUrl: &k.queueUrl, + }) + + if err != nil { + return fmt.Errorf("send message to sqs %s: %v", k.queueUrl, err) + } + + return nil +} diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index b9caa839b..50146a57d 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" ) type S3Sink struct { @@ -37,6 +38,9 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { } func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region")) + glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket")) + glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory")) return s3sink.initialize( configuration.GetString("aws_access_key_id"), configuration.GetString("aws_secret_access_key"), diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go new file mode 100644 index 000000000..fe1732e88 --- /dev/null +++ b/weed/replication/sub/notification_aws_sqs.go @@ -0,0 +1,111 @@ +package sub + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/aws/awserr" +) + +func init() { + NotificationInputs = append(NotificationInputs, &AwsSqsInput{}) +} + +type AwsSqsInput struct { + svc *sqs.SQS + queueUrl string +} + +func (k *AwsSqsInput) GetName() string { + return "aws_sqs" +} + +func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) + return k.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("sqs_queue_name"), + ) +} + +func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { + + config := &aws.Config{ + Region: aws.String(region), + } + if awsAccessKeyId != "" && aswSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return fmt.Errorf("create aws session: %v", err) + } + k.svc = sqs.New(sess) + + result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + return fmt.Errorf("unable to find queue %s", queueName) + } + return fmt.Errorf("get queue %s url: %v", queueName, err) + } + + k.queueUrl = *result.QueueUrl + + return nil +} + +func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + // receive message + result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + AttributeNames: []*string{ + aws.String(sqs.MessageSystemAttributeNameSentTimestamp), + }, + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + QueueUrl: &k.queueUrl, + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(20), // 20 seconds + WaitTimeSeconds: aws.Int64(20), + }) + if err != nil { + err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err) + return + } + if len(result.Messages) == 0 { + return + } + + // process the message + key = *result.Messages[0].Attributes["key"] + text := *result.Messages[0].Body + message = &filer_pb.EventNotification{} + err = proto.UnmarshalText(text, message) + + // delete the message + _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &k.queueUrl, + ReceiptHandle: result.Messages[0].ReceiptHandle, + }) + + if err != nil { + glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err) + } + + return +} diff --git a/weed/replication/notification_kafka.go b/weed/replication/sub/notification_kafka.go similarity index 99% rename from weed/replication/notification_kafka.go rename to weed/replication/sub/notification_kafka.go index 3bf917376..1a86a8307 100644 --- a/weed/replication/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -1,4 +1,4 @@ -package replication +package sub import ( "encoding/json" diff --git a/weed/replication/notifications.go b/weed/replication/sub/notifications.go similarity index 95% rename from weed/replication/notifications.go rename to weed/replication/sub/notifications.go index 6ae95d36b..66fbef824 100644 --- a/weed/replication/notifications.go +++ b/weed/replication/sub/notifications.go @@ -1,4 +1,4 @@ -package replication +package sub import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2aabb9932..65fa26987 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" + _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" _ "github.com/chrislusf/seaweedfs/weed/notification/log" "github.com/chrislusf/seaweedfs/weed/security" "github.com/spf13/viper"