1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-07-23 20:12:46 +02:00
seaweedfs/weed/shell/command_s3_clean_uploads.go
dependabot[bot] 216c52e377
chore(deps): bump gocloud.dev from 0.40.0 to 0.41.0 (#6679)
* chore(deps): bump gocloud.dev from 0.40.0 to 0.41.0

Bumps [gocloud.dev](https://github.com/google/go-cloud) from 0.40.0 to 0.41.0.
- [Release notes](https://github.com/google/go-cloud/releases)
- [Commits](https://github.com/google/go-cloud/compare/v0.40.0...v0.41.0)

---
updated-dependencies:
- dependency-name: gocloud.dev
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix error

* fix printing errors

* Update go.mod

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
2025-03-31 21:42:54 -07:00

105 lines
3.1 KiB
Go

package shell
import (
"flag"
"fmt"
"io"
"math"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func init() {
Commands = append(Commands, &commandS3CleanUploads{})
}
type commandS3CleanUploads struct{}
func (c *commandS3CleanUploads) Name() string {
return "s3.clean.uploads"
}
func (c *commandS3CleanUploads) Help() string {
return `clean up stale multipart uploads
Example:
s3.clean.uploads -timeAgo 1.5h
`
}
func (c *commandS3CleanUploads) HasTag(CommandTag) bool {
return false
}
func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
if err = bucketCommand.Parse(args); err != nil {
return nil
}
signingKey := util.GetViper().GetString("jwt.filer_signing.key")
var filerBucketsPath string
filerBucketsPath, err = readFilerBucketsPath(commandEnv)
if err != nil {
return fmt.Errorf("read buckets: %v", err)
}
var buckets []string
err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
buckets = append(buckets, entry.Name)
return nil
}, "", false, math.MaxUint32)
if err != nil {
return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
}
for _, bucket := range buckets {
if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil {
fmt.Fprintf(writer, "failed cleanup uploads for bucket %s: %v", bucket, err)
}
}
return err
}
func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error {
uploadsDir := filerBucketsPath + "/" + bucket + "/" + s3_constants.MultipartUploadsFolder
var staleUploads []string
now := time.Now()
err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
ctime := time.Unix(entry.Attributes.Crtime, 0)
if ctime.Add(timeAgo).Before(now) {
staleUploads = append(staleUploads, entry.Name)
}
return nil
}, "", false, math.MaxUint32)
if err != nil {
return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
}
var encodedJwt security.EncodedJwt
if signingKey != "" {
encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60)
}
for _, staleUpload := range staleUploads {
deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
err = util_http.Delete(deleteUrl, string(encodedJwt))
if err != nil && err.Error() != "" {
return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
}
}
return nil
}