mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-07-24 20:42:47 +02:00
414 lines
14 KiB
Go
414 lines
14 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandRemoteCache{})
|
|
}
|
|
|
|
type commandRemoteCache struct {
|
|
}
|
|
|
|
func (c *commandRemoteCache) Name() string {
|
|
return "remote.cache"
|
|
}
|
|
|
|
func (c *commandRemoteCache) Help() string {
|
|
return `comprehensive synchronization and caching between local and remote storage
|
|
|
|
# assume a remote storage is configured to name "cloud1"
|
|
remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
|
|
# mount and pull one bucket
|
|
remote.mount -dir=/xxx -remote=cloud1/bucket
|
|
|
|
# comprehensive sync and cache: update metadata, cache content, and remove deleted files
|
|
remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
|
|
remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
|
|
remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
|
|
remote.cache -dir=/xxx -concurrent=32 # with custom concurrency
|
|
remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
|
|
remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
|
|
remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
|
|
|
|
This command will:
|
|
1. Synchronize metadata from remote storage
|
|
2. Cache file content from remote by default
|
|
3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable)
|
|
|
|
This is designed to run regularly. So you can add it to some cronjob.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandRemoteCache) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
|
|
dir := remoteCacheCommand.String("dir", "", "a directory in filer")
|
|
cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
|
|
deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
|
|
concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
|
|
dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
|
|
fileFiler := newFileFilter(remoteCacheCommand)
|
|
|
|
if err = remoteCacheCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if *dir == "" {
|
|
return fmt.Errorf("need to specify -dir option")
|
|
}
|
|
|
|
mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
|
|
if detectErr != nil {
|
|
jsonPrintln(writer, mappings)
|
|
return detectErr
|
|
}
|
|
|
|
// perform comprehensive sync
|
|
return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler)
|
|
}
|
|
|
|
func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error {
|
|
|
|
// visit remote storage
|
|
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync)
|
|
|
|
// Step 1: Collect all remote files
|
|
remoteFiles := make(map[string]*filer_pb.RemoteEntry)
|
|
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
|
|
localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
|
|
fullPath := string(localDir.Child(name))
|
|
remoteFiles[fullPath] = remoteEntry
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to traverse remote storage: %w", err)
|
|
}
|
|
|
|
fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles))
|
|
|
|
// Step 2: Collect all local files (only if we need to delete local extra files)
|
|
localFiles := make(map[string]*filer_pb.Entry)
|
|
if deleteLocalExtra {
|
|
err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool {
|
|
if entry.RemoteEntry != nil { // only consider files that are part of remote mount
|
|
fullPath := string(dir.Child(entry.Name))
|
|
localFiles[fullPath] = entry
|
|
}
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to traverse local directory: %w", err)
|
|
}
|
|
fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles))
|
|
} else {
|
|
fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n")
|
|
}
|
|
|
|
// Step 3: Determine actions needed
|
|
var filesToDelete []string
|
|
var filesToUpdate []string
|
|
var filesToCache []string
|
|
|
|
// Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled
|
|
if deleteLocalExtra {
|
|
for localPath := range localFiles {
|
|
if _, exists := remoteFiles[localPath]; !exists {
|
|
filesToDelete = append(filesToDelete, localPath)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Find files to update/cache (exist remotely)
|
|
for remotePath, remoteEntry := range remoteFiles {
|
|
if deleteLocalExtra {
|
|
// When deleteLocalExtra is enabled, we have localFiles to compare with
|
|
if localEntry, exists := localFiles[remotePath]; exists {
|
|
// File exists locally, check if it needs updating
|
|
if localEntry.RemoteEntry == nil ||
|
|
localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag ||
|
|
localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
|
|
filesToUpdate = append(filesToUpdate, remotePath)
|
|
}
|
|
// Check if it needs caching
|
|
if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
|
|
filesToCache = append(filesToCache, remotePath)
|
|
}
|
|
} else {
|
|
// File doesn't exist locally, needs to be created
|
|
filesToUpdate = append(filesToUpdate, remotePath)
|
|
}
|
|
} else {
|
|
// When deleteLocalExtra is disabled, we check each file individually
|
|
// All remote files are candidates for update/creation
|
|
filesToUpdate = append(filesToUpdate, remotePath)
|
|
|
|
// For caching, we need to check if the local file exists and needs caching
|
|
if shouldCache {
|
|
// We need to look up the local file to check if it needs caching
|
|
localDir, name := util.FullPath(remotePath).DirAndName()
|
|
err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: localDir,
|
|
Name: name,
|
|
})
|
|
if lookupErr == nil {
|
|
localEntry := lookupResp.Entry
|
|
if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
|
|
filesToCache = append(filesToCache, remotePath)
|
|
}
|
|
}
|
|
return nil // Don't propagate lookup errors here
|
|
})
|
|
if err != nil {
|
|
// Log error but continue
|
|
fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n",
|
|
len(filesToDelete), len(filesToUpdate), len(filesToCache))
|
|
|
|
if dryRun {
|
|
fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n")
|
|
for _, path := range filesToDelete {
|
|
fmt.Fprintf(writer, "DELETE: %s\n", path)
|
|
}
|
|
for _, path := range filesToUpdate {
|
|
fmt.Fprintf(writer, "UPDATE: %s\n", path)
|
|
}
|
|
for _, path := range filesToCache {
|
|
fmt.Fprintf(writer, "CACHE: %s\n", path)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Step 4: Execute actions
|
|
return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
ctx := context.Background()
|
|
|
|
// Delete files that no longer exist on remote (only if deleteLocalExtra is enabled)
|
|
if deleteLocalExtra {
|
|
for _, pathToDelete := range filesToDelete {
|
|
fmt.Fprintf(writer, "Deleting %s... ", pathToDelete)
|
|
|
|
dir, name := util.FullPath(pathToDelete).DirAndName()
|
|
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
|
|
Directory: dir,
|
|
Name: name,
|
|
IgnoreRecursiveError: false,
|
|
IsDeleteData: true,
|
|
IsRecursive: false,
|
|
IsFromOtherCluster: false,
|
|
})
|
|
if err != nil {
|
|
fmt.Fprintf(writer, "failed: %v\n", err)
|
|
return err
|
|
}
|
|
fmt.Fprintf(writer, "done\n")
|
|
}
|
|
}
|
|
|
|
// Update metadata for files that exist on remote
|
|
for _, pathToUpdate := range filesToUpdate {
|
|
remoteEntry := remoteFiles[pathToUpdate]
|
|
localDir, name := util.FullPath(pathToUpdate).DirAndName()
|
|
|
|
fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate)
|
|
|
|
// Check if file exists locally
|
|
lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: string(localDir),
|
|
Name: name,
|
|
})
|
|
|
|
if lookupErr != nil && lookupErr != filer_pb.ErrNotFound {
|
|
fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr)
|
|
continue
|
|
}
|
|
|
|
isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0
|
|
if lookupErr == filer_pb.ErrNotFound {
|
|
// Create new entry
|
|
_, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
|
|
Directory: string(localDir),
|
|
Entry: &filer_pb.Entry{
|
|
Name: name,
|
|
IsDirectory: isDirectory,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: uint64(remoteEntry.RemoteSize),
|
|
Mtime: remoteEntry.RemoteMtime,
|
|
FileMode: uint32(0644),
|
|
},
|
|
RemoteEntry: remoteEntry,
|
|
},
|
|
})
|
|
if createErr != nil {
|
|
fmt.Fprintf(writer, "failed to create: %v\n", createErr)
|
|
continue
|
|
}
|
|
} else {
|
|
// Update existing entry
|
|
existingEntry := lookupResp.Entry
|
|
if existingEntry.RemoteEntry == nil {
|
|
// This is a local file, skip to avoid overwriting
|
|
fmt.Fprintf(writer, "skipped (local file)\n")
|
|
continue
|
|
}
|
|
|
|
existingEntry.RemoteEntry = remoteEntry
|
|
existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
|
|
existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
|
|
existingEntry.Attributes.Md5 = nil
|
|
existingEntry.Chunks = nil
|
|
existingEntry.Content = nil
|
|
|
|
_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
|
Directory: string(localDir),
|
|
Entry: existingEntry,
|
|
})
|
|
if updateErr != nil {
|
|
fmt.Fprintf(writer, "failed to update: %v\n", updateErr)
|
|
continue
|
|
}
|
|
}
|
|
fmt.Fprintf(writer, "done\n")
|
|
}
|
|
|
|
// Cache file content if requested
|
|
if shouldCache && len(filesToCache) > 0 {
|
|
fmt.Fprintf(writer, "Caching file content...\n")
|
|
|
|
var wg sync.WaitGroup
|
|
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
|
|
var executionErr error
|
|
|
|
for _, pathToCache := range filesToCache {
|
|
wg.Add(1)
|
|
pathToCacheCopy := pathToCache // Capture for closure
|
|
limitedConcurrentExecutor.Execute(func() {
|
|
defer wg.Done()
|
|
|
|
// Get local entry (either from localFiles map or by lookup)
|
|
var localEntry *filer_pb.Entry
|
|
if deleteLocalExtra {
|
|
localEntry = localFiles[pathToCacheCopy]
|
|
if localEntry == nil {
|
|
fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy)
|
|
return
|
|
}
|
|
} else {
|
|
// Look up the local entry since we don't have it in localFiles
|
|
localDir, name := util.FullPath(pathToCacheCopy).DirAndName()
|
|
lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: localDir,
|
|
Name: name,
|
|
})
|
|
if err == nil {
|
|
localEntry = lookupResp.Entry
|
|
}
|
|
return err
|
|
})
|
|
if lookupErr != nil {
|
|
fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
|
|
remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy))
|
|
|
|
fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
|
|
|
|
if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil {
|
|
fmt.Fprintf(writer, "failed: %v\n", err)
|
|
if executionErr == nil {
|
|
executionErr = err
|
|
}
|
|
return
|
|
}
|
|
fmt.Fprintf(writer, "done\n")
|
|
})
|
|
}
|
|
|
|
wg.Wait()
|
|
if executionErr != nil {
|
|
return executionErr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
|
|
|
|
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
if entry.IsDirectory {
|
|
if !visitEntry(dirPath, entry) {
|
|
return nil
|
|
}
|
|
subDir := dirPath.Child(entry.Name)
|
|
if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if !visitEntry(dirPath, entry) {
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func shouldCacheToLocal(entry *filer_pb.Entry) bool {
|
|
if entry.IsDirectory {
|
|
return false
|
|
}
|
|
if entry.RemoteEntry == nil {
|
|
return false
|
|
}
|
|
if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
|
|
if entry.IsDirectory {
|
|
return false
|
|
}
|
|
if entry.RemoteEntry == nil {
|
|
return false // should not uncache an entry that is not in remote
|
|
}
|
|
if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|