1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-05-20 02:10:20 +02:00
seaweedfs/weed/shell/command_remote_unmount.go
chrislu 9f9ef1340c use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
2021-12-26 00:15:03 -08:00

121 lines
3.2 KiB
Go

package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"time"
)
func init() {
Commands = append(Commands, &commandRemoteUnmount{})
}
type commandRemoteUnmount struct {
}
func (c *commandRemoteUnmount) Name() string {
return "remote.unmount"
}
func (c *commandRemoteUnmount) Help() string {
return `unmount remote storage
# assume a remote storage is configured to name "s3_1"
remote.configure -name=s3_1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
# mount and pull one bucket
remote.mount -dir=/xxx -remote=s3_1/bucket
# unmount the mounted directory and remove its cache
remote.unmount -dir=/xxx
`
}
func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
dir := remoteMountCommand.String("dir", "", "a directory in filer")
if err = remoteMountCommand.Parse(args); err != nil {
return nil
}
mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
if listErr != nil {
return listErr
}
if *dir == "" {
return jsonPrintln(writer, mappings)
}
_, found := mappings.Mappings[*dir]
if !found {
return fmt.Errorf("directory %s is not mounted", *dir)
}
// store a mount configuration in filer
fmt.Fprintf(writer, "deleting mount for %s ...\n", *dir)
if err = filer.DeleteMountMapping(commandEnv, *dir); err != nil {
return fmt.Errorf("delete mount mapping: %v", err)
}
// purge mounted data
fmt.Fprintf(writer, "purge %s ...\n", *dir)
if err = c.purgeMountedData(commandEnv, *dir); err != nil {
return fmt.Errorf("purge mounted data: %v", err)
}
// reset remote sync offset in case the folder is mounted again
if err = remote_storage.SetSyncOffset(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, *dir, time.Now().UnixNano()); err != nil {
return fmt.Errorf("reset remote.sync offset for %s: %v", *dir, err)
}
return nil
}
func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error {
// find existing directory, and ensure the directory is empty
err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
Name: name,
})
if lookupErr != nil {
return fmt.Errorf("lookup %s: %v", dir, lookupErr)
}
oldEntry := lookupResp.Entry
deleteError := filer_pb.DoRemove(client, parent, name, true, true, true, false, nil)
if deleteError != nil {
return fmt.Errorf("delete %s: %v", dir, deleteError)
}
mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) {
entry.Attributes = oldEntry.Attributes
entry.Extended = oldEntry.Extended
entry.Attributes.Crtime = time.Now().Unix()
entry.Attributes.Mtime = time.Now().Unix()
})
if mkdirErr != nil {
return fmt.Errorf("mkdir %s: %v", dir, mkdirErr)
}
return nil
})
if err != nil {
return err
}
return nil
}