From 97236389e8a7e72680c4508cdc1db3746dfd8c76 Mon Sep 17 00:00:00 2001 From: Seyed Mahdi Sadegh Shobeiri <36403983+SmsS4@users.noreply.github.com> Date: Sat, 23 Dec 2023 23:47:30 +0330 Subject: [PATCH] Add modifyTimeAgo to volume.fsck (#5133) * Add modifyTimeAgo to volume.fsck * Fix AppendAtNs --- weed/shell/command_volume_fsck.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 7a3932aa1..8916e90bd 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -89,6 +89,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, " delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") + modifyTimeAgo := fsckCommand.Duration("modifyTimeAgo", 0, "only include entries after this modify time to check orphan chunks") c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server") if err = fsckCommand.Parse(args); err != nil { @@ -137,6 +138,10 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. } collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano() + var collectModifyFromAtNs int64 = 0 + if modifyTimeAgo.Seconds() != 0 { + collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano() + } // collect each volume file ids for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo { @@ -150,7 +155,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. delete(volumeIdToVInfo, volumeId) continue } - err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectCutoffFromAtNs)) + err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) } @@ -163,7 +168,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. if *c.findMissingChunksInFiler { // collect all filer file ids and paths - if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectCutoffFromAtNs); err != nil { + if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectModifyFromAtNs, collectCutoffFromAtNs); err != nil { return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) } for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { @@ -174,7 +179,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. } } else { // collect all filer file ids - if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0); err != nil { + if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0, 0); err != nil { return fmt.Errorf("failed to collect file ids from filer: %v", err) } // volume file ids subtract filer file ids @@ -186,7 +191,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return nil } -func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, cutoffFromAtNs int64) error { +func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, collectModifyFromAtNs int64, cutoffFromAtNs int64) error { if *c.verbose { fmt.Fprintf(c.writer, "checking each file from filer path %s...\n", c.getCollectFilerFilePath()) } @@ -224,6 +229,9 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m if cutoffFromAtNs != 0 && chunk.ModifiedTsNs > cutoffFromAtNs { continue } + if collectModifyFromAtNs != 0 && chunk.ModifiedTsNs < collectModifyFromAtNs { + continue + } outputChan <- &Item{ vid: chunk.Fid.VolumeId, fileKey: chunk.Fid.FileKey, @@ -371,7 +379,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn return nil } -func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, cutoffFrom uint64) error { +func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error { if *c.verbose { fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) @@ -420,7 +428,10 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId if err != nil { return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) } - return resp.AppendAtNs <= cutoffFrom, nil + if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) { + return true, nil + } + return false, nil }) if err != nil { fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err)