From 290b5e2cd0a71f97b2335bec21eb5201cadaa905 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 17 Nov 2020 17:20:21 -0800 Subject: [PATCH] directly delete file chunks keeping current async deletions for now --- weed/filer/filer_delete_entry.go | 2 +- weed/filer/filer_deletion.go | 44 ++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 69219fbfa..4415d45d9 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -47,7 +47,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR } if shouldDeleteChunks && !isCollection { - go f.DeleteChunks(chunks) + f.DirectDeleteChunks(chunks) } // A case not handled: // what if the chunk is in a different collection? diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 126d162ec..9eee38277 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -68,6 +68,50 @@ func (f *Filer) loopProcessingDeletion() { } } +func (f *Filer) doDeleteFileIds(fileIds []string) { + + lookupFunc := LookupByMasterClientFn(f.MasterClient) + DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. + + for len(fileIds) > 0 { + var toDeleteFileIds []string + if len(fileIds) > DeletionBatchSize { + toDeleteFileIds = fileIds[:DeletionBatchSize] + fileIds = fileIds[DeletionBatchSize:] + } else { + toDeleteFileIds = fileIds + fileIds = fileIds[:0] + } + deletionCount := len(toDeleteFileIds) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) + if err != nil { + if !strings.Contains(err.Error(), "already deleted") { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } + } + } +} + +func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) { + var fildIdsToDelete []string + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + fildIdsToDelete = append(fildIdsToDelete, dChunk.GetFileIdString()) + } + fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString()) + } + + f.doDeleteFileIds(fildIdsToDelete) +} + func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { if !chunk.IsChunkManifest {