diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 87a58fd86..4107886e8 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -28,12 +29,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR isDeleteCollection := f.isBucket(entry) if entry.IsDirectory() { // delete the folder children, not including the folder itself - err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures, func(chunks []*filer_pb.FileChunk) error { - if shouldDeleteChunks && !isDeleteCollection { - f.DirectDeleteChunks(chunks) - } - return nil - }, func(hardLinkIds []HardLinkId) error { + err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures, func(hardLinkIds []HardLinkId) error { // A case not handled: // what if the chunk is in a different collection? if shouldDeleteChunks { @@ -47,16 +43,16 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR } } - if shouldDeleteChunks && !isDeleteCollection { - f.DirectDeleteChunks(entry.GetChunks()) - } - // delete the file or folder err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures) if err != nil { return fmt.Errorf("delete file %s: %v", p, err) } + if shouldDeleteChunks && !isDeleteCollection { + f.DirectDeleteChunks(entry.GetChunks()) + } + if isDeleteCollection { collectionName := entry.Name() f.doDeleteCollection(collectionName) @@ -65,8 +61,10 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return nil } -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32, onChunksFn OnChunksFunc, onHardLinkIdsFn OnHardLinkIdsFunc) (err error) { +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32, onHardLinkIdsFn OnHardLinkIdsFunc) (err error) { + //collect all the chunks of this layer and delete them together at the end + var chunksToDelete []*filer_pb.FileChunk lastFileName := "" includeLastFile := false if !isDeletingBucket || !f.Store.CanDropWholeBucket() { @@ -86,14 +84,16 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry lastFileName = sub.Name() if sub.IsDirectory() { subIsDeletingBucket := f.isBucket(sub) - err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil, onChunksFn, onHardLinkIdsFn) + err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil, onHardLinkIdsFn) } else { f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) if len(sub.HardLinkId) != 0 { // hard link chunk data are deleted separately err = onHardLinkIdsFn([]HardLinkId{sub.HardLinkId}) } else { - err = onChunksFn(sub.GetChunks()) + if shouldDeleteChunks { + chunksToDelete = append(chunksToDelete, sub.GetChunks()...) + } } } if err != nil && !ignoreRecursiveError { @@ -114,6 +114,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry } f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) + f.DeleteChunks(chunksToDelete) return nil }