From 7836f7574e65bd9b7a8b8f1beda97b21cd279da0 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Sun, 16 Oct 2022 08:38:46 +0500 Subject: [PATCH] [volume.fsck] hotfix apply purging and add option verifyNeedle #3860 (#3861) * fix apply purging and add verifyNeedle * common readSourceNeedleBlob * use consts --- weed/shell/command_volume_check_disk.go | 7 +++-- weed/shell/command_volume_fsck.go | 38 +++++++++++++++++++++---- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 075d29712..962566751 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "golang.org/x/exp/slices" + "google.golang.org/grpc" "io" "math" ) @@ -166,7 +167,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m for _, needleValue := range missingNeedles { - needleBlob, err := c.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) + needleBlob, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) if err != nil { return hasChanges, err } @@ -190,9 +191,9 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m return } -func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { +func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, Offset: needleValue.Offset.ToActualOffset(), diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 70c6cd91c..1750ae94c 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -34,6 +34,11 @@ func init() { Commands = append(Commands, &commandVolumeFsck{}) } +const ( + readbufferSize = 16 + verifyProbeBlobSize = 16 +) + type commandVolumeFsck struct { env *CommandEnv writer io.Writer @@ -44,6 +49,7 @@ type commandVolumeFsck struct { verbose *bool forcePurging *bool findMissingChunksInFiler *bool + verifyNeedle *bool } func (c *commandVolumeFsck) Name() string { @@ -82,6 +88,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, " delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") + c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "try get head needle blob from volume server") if err = fsckCommand.Parse(args); err != nil { return nil @@ -219,7 +226,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m return nil }, func(outputChan chan interface{}) { - buffer := make([]byte, 16) + buffer := make([]byte, readbufferSize) for item := range outputChan { i := item.(*Item) if f, ok := files[i.vid]; ok { @@ -285,7 +292,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn if *c.verbose { for _, fid := range orphanFileIds { - fmt.Fprintf(c.writer, "%s\n", fid) + fmt.Fprintf(c.writer, "%s:%s\n", vinfo.collection, fid) } } isEcVolumeReplicas[volumeId] = vinfo.isEcVolume @@ -440,7 +447,7 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI defer fp.Close() br := bufio.NewReader(fp) - buffer := make([]byte, 16) + buffer := make([]byte, readbufferSize) var readSize int var readErr error item := &Item{vid: volumeId} @@ -452,7 +459,7 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI if readErr != nil { return readErr } - if readSize != 16 { + if readSize != readbufferSize { return fmt.Errorf("readSize mismatch") } item.fileKey = util.BytesToUint64(buffer[:8]) @@ -541,11 +548,27 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri return } + voluemAddr := pb.NewServerAddressWithGrpcPort(dataNodeId, 0) if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) { + inUseCount++ + if *c.verifyNeedle { + if v, ok := db.Get(nId); ok && v.Size.IsValid() { + newSize := types.Size(verifyProbeBlobSize) + if v.Size > newSize { + v.Size = newSize + } + if _, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, voluemAddr, volumeId, *v); err != nil { + fmt.Fprintf(c.writer, "failed to read file %s NeedleBlob %+v: %+v", itemPath, nId, err) + if *c.forcePurging { + return + } + } + } + } + if err = db.Delete(nId); err != nil && *c.verbose { fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err) } - inUseCount++ }); err != nil { err = fmt.Errorf("failed to readFilerFileIdFile %+v", err) return @@ -553,7 +576,10 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri var orphanFileCount uint64 if err = db.AscendingVisit(func(n needle_map.NeedleValue) error { - orphanFileIds = append(orphanFileIds, fmt.Sprintf("%s:%d,%s00000000", collection, volumeId, n.Key.String())) + if !n.Size.IsValid() { + return nil + } + orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String())) orphanFileCount++ orphanDataSize += uint64(n.Size) return nil