diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 2df342d1f..821bcfb9f 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -295,6 +295,66 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { + if !req.Dir { + return dir.removeOneFile(ctx, req) + } + + return dir.removeFolder(ctx, req) + +} + +func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { + + var entry *filer_pb.Entry + err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir.Path, + Name: req.Name, + } + + glog.V(4).Infof("lookup to-be-removed entry: %v", request) + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) + return fuse.ENOENT + } + + entry = resp.Entry + + return nil + }) + + if err != nil { + return err + } + + dir.wfs.asyncDeleteFileChunks(entry.Chunks) + + return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.DeleteEntryRequest{ + Directory: dir.Path, + Name: req.Name, + IsDeleteData: false, + } + + glog.V(3).Infof("remove file: %v", request) + _, err := client.DeleteEntry(ctx, request) + if err != nil { + glog.V(3).Infof("remove file %s/%s: %v", dir.Path, req.Name, err) + return fuse.ENOENT + } + + dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) + + return nil + }) + +} + +func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { + return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 8e1d6fadd..8909da52a 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" @@ -159,7 +160,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f for _, chunk := range chunks { fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) fh.f.entryViewCache = nil - glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) fh.dirtyMetadata = true } @@ -179,8 +180,6 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err return nil } -// Flush - experimenting with uploading at flush, this slows operations down till it has been -// completely flushed func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { // fflush works at fh level // send the data to the OS @@ -216,10 +215,16 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { Entry: fh.f.entry, } - glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) - for i, chunk := range fh.f.entry.Chunks { - glog.V(1).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) - } + //glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) + //for i, chunk := range fh.f.entry.Chunks { + // glog.V(4).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + //} + + chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) + fh.f.entry.Chunks = chunks + fh.f.entryViewCache = nil + fh.f.wfs.asyncDeleteFileChunks(garbages) + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } @@ -228,6 +233,48 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { }) } +func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error { + + var vids []string + for _, fileId := range fileIds { + vids = append(vids, volumeId(fileId)) + } + + lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { + + m := make(map[string]operation.LookupResult) + + glog.V(4).Infof("remove file lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return m, err + } + + for _, vid := range vids { + lr := operation.LookupResult{ + VolumeId: vid, + Locations: nil, + } + locations := resp.LocationsMap[vid] + for _, loc := range locations.Locations { + lr.Locations = append(lr.Locations, operation.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + }) + } + m[vid] = lr + } + + return m, err + } + + _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + + return err +} + func volumeId(fileId string) string { lastCommaIndex := strings.LastIndex(fileId, ",") if lastCommaIndex > 0 { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 0978d72f7..858846217 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -8,12 +8,12 @@ import ( "sync" "time" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/karlseguin/ccache" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" "google.golang.org/grpc" ) @@ -46,6 +46,8 @@ type WFS struct { pathToHandleLock sync.Mutex bufPool sync.Pool + fileIdsDeletionChan chan []string + stats statsCache } type statsCache struct { @@ -54,7 +56,7 @@ type statsCache struct { } func NewSeaweedFileSystem(option *Option) *WFS { - return &WFS{ + wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(int64(option.DirListingLimit) + 200).ItemsToPrune(100)), pathToHandleIndex: make(map[string]int), @@ -63,7 +65,12 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, + fileIdsDeletionChan: make(chan []string, 32), } + + go wfs.loopProcessingDeletion() + + return wfs } func (wfs *WFS) Root() (fs.Node, error) { diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go new file mode 100644 index 000000000..c2fcb4f7a --- /dev/null +++ b/weed/filesys/wfs_deletion.go @@ -0,0 +1,46 @@ +package filesys + +import ( + "context" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (wfs *WFS) loopProcessingDeletion() { + + ticker := time.NewTicker(2 * time.Second) + + wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + var fileIds []string + for { + select { + case fids := <-wfs.fileIdsDeletionChan: + fileIds = append(fileIds, fids...) + if len(fileIds) >= 256 { + glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) + deleteFileIds(context.Background(), client, fileIds) + fileIds = fileIds[:0] + } + case <-ticker.C: + if len(fileIds) > 0 { + glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) + deleteFileIds(context.Background(), client, fileIds) + fileIds = fileIds[:0] + } + } + } + }) + +} + +func (wfs *WFS) asyncDeleteFileChunks(chunks []*filer_pb.FileChunk) { + if len(chunks) > 0 { + var fileIds []string + for _, chunk := range chunks { + fileIds = append(fileIds, chunk.FileId) + } + wfs.fileIdsDeletionChan <- fileIds + } +}