From 170b63d6f89284b804b153343380537fd09e0025 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 7 Mar 2024 21:35:51 +0500 Subject: [PATCH] [filer.backup] add param uploader_part_size for S3sink (#5352) * fix: install cronie * chore: refactor configure S3Sink * chore: refactor cinfig * add filer-backup compose file * fix: X-Amz-Meta-Mtime and resolve with comments * fix: attr mtime * fix: MaxUploadPartst is reduced to the maximum allowable * fix: env and force set max MaxUploadParts * fix: env WEED_SINK_S3_UPLOADER_PART_SIZE_MB --- docker/Makefile | 3 + docker/compose/local-filer-backup-compose.yml | 54 ++++++++ weed/command/filer_backup.go | 3 +- weed/replication/sink/s3sink/s3_sink.go | 121 ++++++++++++------ weed/s3api/s3_constants/header.go | 1 + 5 files changed, 140 insertions(+), 42 deletions(-) create mode 100644 docker/compose/local-filer-backup-compose.yml diff --git a/docker/Makefile b/docker/Makefile index 89a896f77..6d6c88190 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -81,6 +81,9 @@ cluster: build 2mount: build docker compose -f compose/local-sync-mount-compose.yml -p seaweedfs up +filer_backup: build + docker compose -f compose/local-filer-backup-compose.yml -p seaweedfs up + hashicorp_raft: build docker compose -f compose/local-hashicorp-raft-compose.yml -p seaweedfs up diff --git a/docker/compose/local-filer-backup-compose.yml b/docker/compose/local-filer-backup-compose.yml new file mode 100644 index 000000000..3e4baf5fa --- /dev/null +++ b/docker/compose/local-filer-backup-compose.yml @@ -0,0 +1,54 @@ +version: '3.9' + +services: + server-left: + image: chrislusf/seaweedfs:local + command: "-v=0 server -ip=server-left -filer -filer.maxMB 5 -s3 -s3.config=/etc/seaweedfs/s3.json -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1" + volumes: + - ./s3.json:/etc/seaweedfs/s3.json + healthcheck: + test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ] + interval: 3s + start_period: 15s + timeout: 30s + server-right: + image: chrislusf/seaweedfs:local + command: "-v=0 server -ip=server-right -filer -filer.maxMB 64 -s3 -s3.config=/etc/seaweedfs/s3.json -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1" + volumes: + - ./s3.json:/etc/seaweedfs/s3.json + healthcheck: + test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ] + interval: 3s + start_period: 15s + timeout: 30s + filer-backup: + image: chrislusf/seaweedfs:local + command: "-v=0 filer.backup -debug -doDeleteFiles=False -filer server-left:8888" + volumes: + - ./replication.toml:/etc/seaweedfs/replication.toml + environment: + WEED_SINK_LOCAL_INCREMENTAL_ENABLED: "false" + WEED_SINK_S3_ENABLED: "true" + WEED_SINK_S3_BUCKET: "backup" + WEED_SINK_S3_ENDPOINT: "http://server-right:8333" + WEED_SINK_S3_DIRECTORY: "/" + WEED_SINK_S3_AWS_ACCESS_KEY_ID: "some_access_key1" + WEED_SINK_S3_AWS_SECRET_ACCESS_KEY: "some_secret_key1" + WEED_SINK_S3_S3_DISABLE_CONTENT_MD5_VALIDATION: "false" + WEED_SINK_S3_UPLOADER_PART_SIZE_MB: "5" + WEED_SINK_S3_KEEP_PART_SIZE: "false" + depends_on: + server-left: + condition: service_healthy + server-right: + condition: service_healthy + minio-warp: + image: minio/warp + command: 'mixed --duration 5s --obj.size=6mb --md5 --objects 10 --concurrent 2' + restart: on-failure + environment: + WARP_HOST: "server-left:8333" + WARP_ACCESS_KEY: "some_access_key1" + WARP_SECRET_KEY: "some_secret_key1" + depends_on: + - filer-backup \ No newline at end of file diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 691b1c0b5..4aeab60f2 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -85,8 +85,7 @@ const ( func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error { // find data sink - config := util.GetViper() - dataSink := findSink(config) + dataSink := findSink(util.GetViper()) if dataSink == nil { return fmt.Errorf("no data sink configured in replication.toml") } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index a032b58e8..276ea30d6 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -8,6 +8,8 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "strconv" "strings" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -19,14 +21,20 @@ import ( ) type S3Sink struct { - conn s3iface.S3API - region string - bucket string - dir string - endpoint string - acl string - filerSource *source.FilerSource - isIncremental bool + conn s3iface.S3API + filerSource *source.FilerSource + isIncremental bool + keepPartSize bool + s3DisableContentMD5Validation bool + s3ForcePathStyle bool + uploaderConcurrency int + uploaderMaxUploadParts int + uploaderPartSizeMb int + region string + bucket string + dir string + endpoint string + acl string } func init() { @@ -46,21 +54,49 @@ func (s3sink *S3Sink) IsIncremental() bool { } func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) - glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) - glog.V(0).Infof("sink.s3.acl: %v", configuration.GetString(prefix+"acl")) - glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental")) + configuration.SetDefault(prefix+"region", "us-east-2") + configuration.SetDefault(prefix+"directory", "/") + configuration.SetDefault(prefix+"keep_part_size", true) + configuration.SetDefault(prefix+"uploader_max_upload_parts", 1000) + configuration.SetDefault(prefix+"uploader_part_size_mb", 8) + configuration.SetDefault(prefix+"uploader_concurrency", 8) + configuration.SetDefault(prefix+"s3_disable_content_md5_validation", true) + configuration.SetDefault(prefix+"s3_force_path_style", true) + s3sink.region = configuration.GetString(prefix + "region") + s3sink.bucket = configuration.GetString(prefix + "bucket") + s3sink.dir = configuration.GetString(prefix + "directory") + s3sink.endpoint = configuration.GetString(prefix + "endpoint") + s3sink.acl = configuration.GetString(prefix + "acl") s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental") + s3sink.keepPartSize = configuration.GetBool(prefix + "keep_part_size") + s3sink.s3DisableContentMD5Validation = configuration.GetBool(prefix + "s3_disable_content_md5_validation") + s3sink.s3ForcePathStyle = configuration.GetBool(prefix + "s3_force_path_style") + s3sink.uploaderMaxUploadParts = configuration.GetInt(prefix + "uploader_max_upload_parts") + s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size") + s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency") + + glog.V(0).Infof("sink.s3.region: %v", s3sink.region) + glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket) + glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir) + glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint) + glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl) + glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental) + glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation) + glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle) + glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize) + if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts { + s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts + glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3") + glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts) + } else { + glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts) + } + glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb) + glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency) + return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), - configuration.GetString(prefix+"region"), - configuration.GetString(prefix+"bucket"), - configuration.GetString(prefix+"directory"), - configuration.GetString(prefix+"endpoint"), - configuration.GetString(prefix+"acl"), ) } @@ -68,18 +104,12 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { s3sink.filerSource = s } -func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint, acl string) error { - s3sink.region = region - s3sink.bucket = bucket - s3sink.dir = dir - s3sink.endpoint = endpoint - s3sink.acl = acl - +func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey string) error { config := &aws.Config{ Region: aws.String(s3sink.region), Endpoint: aws.String(s3sink.endpoint), - S3ForcePathStyle: aws.Bool(true), - S3DisableContentMD5Validation: aws.Bool(true), + S3DisableContentMD5Validation: aws.Bool(s3sink.s3DisableContentMD5Validation), + S3ForcePathStyle: aws.Bool(s3sink.s3ForcePathStyle), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") @@ -128,19 +158,26 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures reader := filer.NewFileReader(s3sink.filerSource, entry) - fileSize := int64(filer.FileSize(entry)) - - partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB - for partSize*1000 < fileSize { - partSize *= 4 - } - // Create an uploader with the session and custom options uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) { - u.PartSize = partSize - u.Concurrency = 8 + u.PartSize = int64(s3sink.uploaderPartSizeMb * 1024 * 1024) + u.Concurrency = s3sink.uploaderConcurrency + u.MaxUploadParts = s3sink.uploaderMaxUploadParts }) + if s3sink.keepPartSize { + switch chunkCount := len(entry.Chunks); { + case chunkCount > 1: + if firstChunkSize := int64(entry.Chunks[0].Size); firstChunkSize > s3manager.MinUploadPartSize { + uploader.PartSize = firstChunkSize + } + default: + uploader.PartSize = 0 + } + } + if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; !ok { + entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10)) + } // process tagging tags := "" if true { @@ -153,14 +190,18 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures } // Upload the file to S3. - _, err = uploader.Upload(&s3manager.UploadInput{ + uploadInput := s3manager.UploadInput{ Bucket: aws.String(s3sink.bucket), Key: aws.String(key), Body: reader, Tagging: aws.String(tags), - }) + } + if len(entry.Attributes.Md5) > 0 { + uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5)) + } + _, err = uploader.Upload(&uploadInput) - return + return err } diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 5037f4691..30a878ccb 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -30,6 +30,7 @@ const ( // S3 user-defined metadata AmzUserMetaPrefix = "X-Amz-Meta-" AmzUserMetaDirective = "X-Amz-Metadata-Directive" + AmzUserMetaMtime = "X-Amz-Meta-Mtime" // S3 object tagging AmzObjectTagging = "X-Amz-Tagging"