1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-05 00:26:51 +02:00

paginate when filer deleting and FUSE mount renaming

This commit is contained in:
Chris Lu 2018-12-16 23:20:08 -08:00
parent 39bf274a83
commit 3ac54792e1
4 changed files with 66 additions and 25 deletions

View file

@ -195,13 +195,24 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet
if isRecursive { if isRecursive {
limit = math.MaxInt32 limit = math.MaxInt32
} }
entries, err := f.ListDirectoryEntries(p, "", false, limit) lastFileName := ""
includeLastFile := false
for limit > 0 {
entries, err := f.ListDirectoryEntries(p, lastFileName, includeLastFile, 1024)
if err != nil { if err != nil {
return fmt.Errorf("list folder %s: %v", p, err) return fmt.Errorf("list folder %s: %v", p, err)
} }
if len(entries) == 0 {
break
} else {
if isRecursive { if isRecursive {
for _, sub := range entries { for _, sub := range entries {
lastFileName = sub.Name()
f.DeleteEntryMetaAndData(sub.FullPath, isRecursive, shouldDeleteChunks) f.DeleteEntryMetaAndData(sub.FullPath, isRecursive, shouldDeleteChunks)
limit--
if limit <= 0 {
break
}
} }
} else { } else {
if len(entries) > 0 { if len(entries) > 0 {
@ -209,6 +220,11 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet
} }
} }
f.cacheDelDirectory(string(p)) f.cacheDelDirectory(string(p))
if len(entries) < 1024 {
break
}
}
}
} }
if shouldDeleteChunks { if shouldDeleteChunks {

View file

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"path/filepath" "path/filepath"
"math"
) )
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
@ -40,22 +41,38 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
func moveEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, oldParent string, entry *filer_pb.Entry, newParent, newName string) error { func moveEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, oldParent string, entry *filer_pb.Entry, newParent, newName string) error {
if entry.IsDirectory { if entry.IsDirectory {
currentDirPath := filepath.Join(oldParent, entry.Name) currentDirPath := filepath.Join(oldParent, entry.Name)
lastFileName := ""
includeLastFile := false
limit := math.MaxInt32
for limit > 0 {
request := &filer_pb.ListEntriesRequest{ request := &filer_pb.ListEntriesRequest{
Directory: currentDirPath, Directory: currentDirPath,
StartFromFileName: lastFileName,
InclusiveStartFrom: includeLastFile,
Limit: 1024,
} }
glog.V(4).Infof("read directory: %v", request) glog.V(4).Infof("read directory: %v", request)
resp, err := client.ListEntries(ctx, request) resp, err := client.ListEntries(ctx, request)
if err != nil { if err != nil {
glog.V(0).Infof("list %s: %v", oldParent, err) glog.V(0).Infof("list %s: %v", oldParent, err)
return fuse.EIO return fuse.EIO
} }
if len(resp.Entries) == 0 {
break
}
for _, item := range resp.Entries { for _, item := range resp.Entries {
lastFileName = item.Name
err := moveEntry(ctx, client, currentDirPath, item, filepath.Join(newParent, newName), item.Name) err := moveEntry(ctx, client, currentDirPath, item, filepath.Join(newParent, newName), item.Name)
if err != nil { if err != nil {
return err return err
} }
limit--
}
if len(resp.Entries) < 1024 {
break
}
} }
} }

View file

@ -12,6 +12,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"time"
) )
type FileHandle struct { type FileHandle struct {
@ -207,6 +208,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Attributes.Mime = fh.contentType fh.f.entry.Attributes.Mime = fh.contentType
fh.f.entry.Attributes.Uid = req.Uid fh.f.entry.Attributes.Uid = req.Uid
fh.f.entry.Attributes.Gid = req.Gid fh.f.entry.Attributes.Gid = req.Gid
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.Crtime = time.Now().Unix()
fh.f.entry.Attributes.FileMode = uint32(0770)
} }
request := &filer_pb.CreateEntryRequest{ request := &filer_pb.CreateEntryRequest{

View file

@ -45,7 +45,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
lastFileName := req.StartFromFileName lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom includeLastFile := req.InclusiveStartFrom
for limit > 0 { for limit > 0 {
entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, limit) entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -74,6 +74,10 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
limit-- limit--
} }
if len(resp.Entries) < 1024 {
break
}
} }
return resp, nil return resp, nil