From ddd6bee970e5a09903b115d48f47ea729f0d6d3e Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Sat, 10 Sep 2022 15:29:17 -0700 Subject: [PATCH] ADHOC: Volume fsck use a time cutoff param (#3626) * ADHOC: cut off volumn fsck * more * fix typo * add test * modify name * fix comment * fix comments * nit * fix typo * Update weed/shell/command_volume_fsck.go Co-authored-by: root Co-authored-by: Chris Lu --- weed/shell/command_volume_fsck.go | 55 ++++++++++++++++++------- weed/storage/idx/binary_search.go | 29 +++++++++++++ weed/storage/idx_binary_search_test.go | 57 ++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 15 deletions(-) create mode 100644 weed/storage/idx/binary_search.go create mode 100644 weed/storage/idx_binary_search_test.go diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index e48e53d85..cae8e22d4 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -2,7 +2,9 @@ package shell import ( "bufio" + "bytes" "context" + "errors" "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -11,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" @@ -72,6 +75,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, " delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") + cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") if err = fsckCommand.Parse(args); err != nil { return nil @@ -126,7 +130,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. delete(volumeIdToVInfo, volumeId) continue } - err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer) + cutoffFrom := time.Now().Add(-*cutoffTimeAgo).UnixNano() + err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer, uint64(cutoffFrom)) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) } @@ -351,7 +356,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn return nil } -func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error { +func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer, cutoffFrom uint64) error { if verbose { fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) @@ -377,13 +382,42 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } - err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)) + var buf bytes.Buffer + for { + resp, err := copyFileClient.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return err + } + buf.Write(resp.FileContent) + } + if vinfo.isReadOnly == false { + index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { + resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ + VolumeId: volumeId, + NeedleId: uint64(key), + Offset: offset.ToActualOffset(), + Size: int32(size), + }) + if err != nil { + return false, fmt.Errorf("to read needle meta with id %d from volume %d with error %v", key, volumeId, err) + } + return resp.LastModified <= cutoffFrom, nil + }) + if err != nil { + fmt.Fprintf(writer, "Failed to search for last vilad index on volume %d with error %v", volumeId, err) + } + buf.Truncate(index * types.NeedleMapEntrySize) + } + idxFilename := getVolumeFileIdFile(tempFolder, dataNodeId, volumeId) + err = writeToFile(buf.Bytes(), idxFilename) if err != nil { return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err) } return nil - }) } @@ -673,7 +707,7 @@ func getFilerFileIdFile(tempFolder string, vid uint32) string { return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid)) } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { +func writeToFile(bytes []byte, fileName string) error { flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { @@ -681,15 +715,6 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s } defer dst.Close() - for { - resp, receiveErr := client.Recv() - if receiveErr == io.EOF { - break - } - if receiveErr != nil { - return fmt.Errorf("receiving %s: %v", fileName, receiveErr) - } - dst.Write(resp.FileContent) - } + dst.Write(bytes) return nil } diff --git a/weed/storage/idx/binary_search.go b/weed/storage/idx/binary_search.go new file mode 100644 index 000000000..93bdfd7d8 --- /dev/null +++ b/weed/storage/idx/binary_search.go @@ -0,0 +1,29 @@ +package idx + +import ( + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// firstInvalidIndex find the first index the failed lessThanOrEqualToFn function's requirement. +func FirstInvalidIndex(bytes []byte, lessThanOrEqualToFn func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error)) (int, error) { + left, right := 0, len(bytes)/types.NeedleMapEntrySize-1 + index := right + 1 + for left <= right { + mid := left + (right-left)>>1 + loc := mid * types.NeedleMapEntrySize + key := types.BytesToNeedleId(bytes[loc : loc+types.NeedleIdSize]) + offset := types.BytesToOffset(bytes[loc+types.NeedleIdSize : loc+types.NeedleIdSize+types.OffsetSize]) + size := types.BytesToSize(bytes[loc+types.NeedleIdSize+types.OffsetSize : loc+types.NeedleIdSize+types.OffsetSize+types.SizeSize]) + res, err := lessThanOrEqualToFn(key, offset, size) + if err != nil { + return -1, err + } + if res { + left = mid + 1 + } else { + index = mid + right = mid - 1 + } + } + return index, nil +} diff --git a/weed/storage/idx_binary_search_test.go b/weed/storage/idx_binary_search_test.go new file mode 100644 index 000000000..48f48852e --- /dev/null +++ b/weed/storage/idx_binary_search_test.go @@ -0,0 +1,57 @@ +package storage + +import ( + "github.com/seaweedfs/seaweedfs/weed/storage/idx" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestFirstInvalidIndex(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + type WriteInfo struct { + offset int64 + size int32 + } + // initialize 20 needles then update first 10 needles + for i := 1; i <= 30; i++ { + n := newRandomNeedle(uint64(i)) + n.Flags = 0x08 + _, _, _, err := v.writeNeedle2(n, true, false) + if err != nil { + t.Fatalf("write needle %d: %v", i, err) + } + } + b, err := os.ReadFile(v.IndexFileName() + ".idx") + // base case every record is valid -> nothing is filtered + index, err := idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { + return true, nil + }) + if err != nil { + t.Fatalf("failed to complete binary search %v", err) + } + assert.Equal(t, 30, index, "when every record is valid nothing should be filtered from binary search") + index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { + return false, nil + }) + assert.Equal(t, 0, index, "when every record is invalid everything should be filtered from binary search") + index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { + return key < 20, nil + }) + // needle key range from 1 to 30 so < 20 means 19 keys are valid and cutoff the bytes at 19 * 16 = 304 + assert.Equal(t, 19, index, "when every record is invalid everything should be filtered from binary search") + + index, err = idx.FirstInvalidIndex(b, func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { + return key <= 1, nil + }) + // needle key range from 1 to 30 so <=1 1 means 1 key is valid and cutoff the bytes at 1 * 16 = 16 + assert.Equal(t, 1, index, "when every record is invalid everything should be filtered from binary search") +}