1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-05-09 13:00:45 +02:00

Compare commits

...

4 commits

Author SHA1 Message Date
Konstantin Lebedev 7d27d5cb97
Merge 8c1fb28641 into abf01a0eb7 2024-04-26 12:35:50 +05:00
M@ abf01a0eb7
Fixes unlocked read from logBuffer.LastTsNs that is racey. (#5536) 2024-04-25 15:46:12 -07:00
Konstantin Lebedev 8c1fb28641 s3 DeleteBucketLifecycle 2024-04-20 01:59:36 +05:00
Konstantin Lebedev f5dc251b1e s3 PutBucketLifecycle
https://github.com/seaweedfs/seaweedfs/issues/4533
2024-04-20 01:24:18 +05:00
5 changed files with 143 additions and 12 deletions

View file

@ -194,4 +194,6 @@ jobs:
s3tests_boto3/functional/test_s3.py::test_ranged_request_skip_leading_bytes_response_code \
s3tests_boto3/functional/test_s3.py::test_ranged_request_return_trailing_bytes_response_code \
s3tests_boto3/functional/test_s3.py::test_copy_object_ifmatch_good \
s3tests_boto3/functional/test_s3.py::test_copy_object_ifnonematch_failed
s3tests_boto3/functional/test_s3.py::test_copy_object_ifnonematch_failed \
s3tests_boto3/functional/test_s3.py::test_lifecycle_set \
s3tests_boto3/functional/test_s3.py::test_lifecycle_set_filter

1
go.mod
View file

@ -149,6 +149,7 @@ require (
github.com/hanwen/go-fuse/v2 v2.5.0
github.com/hashicorp/raft v1.6.1
github.com/hashicorp/raft-boltdb/v2 v2.3.0
github.com/minio/minio-go/v7 v7.0.66
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/rabbitmq/amqp091-go v1.9.0
github.com/rclone/rclone v1.66.0

2
go.sum
View file

@ -662,6 +662,8 @@ github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/minio-go/v7 v7.0.66 h1:bnTOXOHjOqv/gcMuiVbN9o2ngRItvqE774dG9nq0Dzw=
github.com/minio/minio-go/v7 v7.0.66/go.mod h1:DHAgmyQEGdW3Cif0UooKOyrT3Vxs82zNdV6tkKhRtbs=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=

View file

@ -1,15 +1,18 @@
package s3api
import (
"bytes"
"context"
"encoding/xml"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
"github.com/minio/minio-go/v7/pkg/lifecycle"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/util"
"math"
"net/http"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -325,19 +328,23 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration)
return
}
response := Lifecycle{}
for prefix, internalTtl := range ttls {
response := lifecycle.Configuration{}
for locationPrefix, internalTtl := range ttls {
ttl, _ := needle.ReadTTL(internalTtl)
days := int(ttl.Minutes() / 60 / 24)
if days == 0 {
continue
}
response.Rules = append(response.Rules, Rule{
Status: Enabled, Filter: Filter{
Prefix: Prefix{string: prefix, set: true},
set: true,
},
Expiration: Expiration{Days: days, set: true},
prefix, found := strings.CutPrefix(locationPrefix, fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket))
if !found {
continue
}
response.Rules = append(response.Rules, lifecycle.Rule{
ID: prefix,
Status: "Enabled",
Prefix: prefix,
RuleFilter: lifecycle.Filter{Prefix: prefix},
Expiration: lifecycle.Expiration{Days: lifecycle.ExpirationDays(days)},
})
}
writeSuccessResponseXML(w, r, response)
@ -346,17 +353,128 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr
// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html
func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketLifecycleConfigurationHandler %s", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
lifecycleConfig := &lifecycle.Configuration{}
if err := xmlDecoder(r.Body, lifecycleConfig, r.ContentLength); err != nil {
glog.Warningf("PutBucketLifecycleConfigurationHandler xml decode: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
return
}
fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
collectionTtls := fc.GetCollectionTtls(bucket)
changed := false
for _, rule := range lifecycleConfig.Rules {
if !strings.EqualFold(rule.Status, "Enabled") {
continue
}
if rule.Expiration.Days == 0 {
continue
}
var rulePrefix string
switch {
case len(rule.RuleFilter.Prefix) > 0:
rulePrefix = rule.RuleFilter.Prefix
case len(rule.Prefix) > 0:
rulePrefix = rule.Prefix
}
if len(rulePrefix) == 0 {
continue
}
locConf := &filer_pb.FilerConf_PathConf{
LocationPrefix: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix),
Collection: bucket,
Ttl: fmt.Sprintf("%dd", rule.Expiration.Days),
}
if ttl, ok := collectionTtls[locConf.LocationPrefix]; ok && ttl == locConf.Ttl {
continue
}
if err := fc.AddLocationConf(locConf); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler add location config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
changed = true
}
if changed {
var buf bytes.Buffer
if err := fc.ToText(&buf); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler save config to text: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
}); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler save config inside filer: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
writeSuccessResponseEmpty(w, r)
}
// DeleteBucketLifecycleHandler Delete Bucket Lifecycle
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html
func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("DeleteBucketLifecycleHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
collectionTtls := fc.GetCollectionTtls(bucket)
changed := false
for prefix, ttl := range collectionTtls {
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
if strings.HasPrefix(prefix, bucketPrefix) && strings.HasSuffix(ttl, "d") {
fc.DeleteLocationConf(prefix)
changed = true
}
}
if changed {
var buf bytes.Buffer
if err := fc.ToText(&buf); err != nil {
glog.Errorf("DeleteBucketLifecycleHandler save config to text: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
}); err != nil {
glog.Errorf("DeleteBucketLifecycleHandler save config inside filer: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
// GetBucketLocationHandler Get bucket location

View file

@ -66,9 +66,17 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
isDone = true
return
}
logBuffer.RLock()
lastTsNs := logBuffer.LastTsNs
for lastTsNs == logBuffer.LastTsNs {
logBuffer.RUnlock()
loopTsNs := lastTsNs // make a copy
for lastTsNs == loopTsNs {
if waitForDataFn() {
// Update loopTsNs and loop again
logBuffer.RLock()
loopTsNs = logBuffer.LastTsNs
logBuffer.RUnlock()
continue
} else {
isDone = true