diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 07f3fd9c1..e2204c191 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -12,9 +12,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/util" + "golang.org/x/exp/slices" "io" "math" "strings" + "sync" "time" ) @@ -24,10 +26,14 @@ func init() { type commandFsVerify struct { env *CommandEnv + volumeServers []pb.ServerAddress volumeIds map[uint32][]pb.ServerAddress verbose *bool + concurrency *int modifyTimeAgoAtSec int64 writer io.Writer + waitChan map[string]chan struct{} + waitChanLock sync.RWMutex } func (c *commandFsVerify) Name() string { @@ -45,10 +51,10 @@ func (c *commandFsVerify) Help() string { func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { c.env = commandEnv c.writer = writer - fsVerifyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files") modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify") + c.concurrency = fsVerifyCommand.Int("concurrency", 0, "number of parallel verification per volume server") if err = fsVerifyCommand.Parse(args); err != nil { return err @@ -60,13 +66,23 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr } c.modifyTimeAgoAtSec = int64(modifyTimeAgo.Seconds()) + c.volumeIds = make(map[uint32][]pb.ServerAddress) + c.waitChan = make(map[string]chan struct{}) + c.volumeServers = []pb.ServerAddress{} if err := c.collectVolumeIds(); err != nil { return parseErr } - fCount, eConut, terr := c.verifyTraverseBfs(path) + if *c.concurrency > 0 { + for _, volumeServer := range c.volumeServers { + volumeServerStr := string(volumeServer) + c.waitChan[volumeServerStr] = make(chan struct{}, *c.concurrency) + defer close(c.waitChan[volumeServerStr]) + } + } + fCount, eConut, terr := c.verifyTraverseBfs(path) if terr == nil { fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut) } @@ -76,7 +92,6 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr } func (c *commandFsVerify) collectVolumeIds() error { - c.volumeIds = make(map[uint32][]pb.ServerAddress) topologyInfo, _, err := collectTopologyInfo(c.env, 0) if err != nil { return err @@ -84,15 +99,19 @@ func (c *commandFsVerify) collectVolumeIds() error { eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) { for _, diskInfo := range nodeInfo.DiskInfos { for _, vi := range diskInfo.VolumeInfos { - c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], pb.NewServerAddressFromDataNode(nodeInfo)) + volumeServer := pb.NewServerAddressFromDataNode(nodeInfo) + c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], volumeServer) + if !slices.Contains(c.volumeServers, volumeServer) { + c.volumeServers = append(c.volumeServers, volumeServer) + } } } }) return nil } -func (c *commandFsVerify) verifyEntry(fileId *filer_pb.FileId, volumeServer *pb.ServerAddress) error { - err := operation.WithVolumeServerClient(false, *volumeServer, c.env.option.GrpcDialOption, +func (c *commandFsVerify) verifyEntry(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error { + err := operation.WithVolumeServerClient(false, volumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ @@ -104,9 +123,6 @@ func (c *commandFsVerify) verifyEntry(fileId *filer_pb.FileId, volumeServer *pb. if err != nil && !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) { return err } - if *c.verbose { - fmt.Fprintf(c.writer, ".") - } return nil } @@ -140,27 +156,50 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo func(outputChan chan interface{}) { for itemEntry := range outputChan { i := itemEntry.(*ItemEntry) - fileMsg := fmt.Sprintf("file:%s needle status ", i.path) - if *c.verbose { - fmt.Fprintf(c.writer, fileMsg) - fileMsg = "" - } + itemPath := string(i.path) + fileMsg := fmt.Sprintf("file:%s", itemPath) + errItem := make(map[string]error) + errItemLock := sync.RWMutex{} for _, chunk := range i.chunks { if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { for _, volumeServer := range volumeIds { - if err = c.verifyEntry(chunk.Fid, &volumeServer); err != nil { - fmt.Fprintf(c.writer, "%sfailed verify %d:%d: %+v\n", - fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) - break + if *c.concurrency == 0 { + if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil { + fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n", + fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + } + continue } + c.waitChanLock.RLock() + waitChan, ok := c.waitChan[string(volumeServer)] + c.waitChanLock.RUnlock() + if !ok { + fmt.Fprintf(c.writer, "%s failed to get channel for %s chunk: %d:%d: %+v\n", + string(volumeServer), fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + continue + } + waitChan <- struct{}{} + go func(fId *filer_pb.FileId, path string, volumeServer pb.ServerAddress, msg string) { + if err = c.verifyEntry(volumeServer, fId); err != nil { + errItemLock.Lock() + errItem[path] = err + fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n", + msg, fId.VolumeId, fId.FileKey, err) + errItemLock.Unlock() + } + <-waitChan + }(chunk.Fid, itemPath, volumeServer, fileMsg) } } else { err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) - fmt.Fprintf(c.writer, "%sfailed verify chunk %d:%d: %+v\n", + fmt.Fprintf(c.writer, "%s %d:%d: %+v\n", fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) break } } + errItemLock.RLock() + err, _ = errItem[itemPath] + errItemLock.RUnlock() if err != nil { errCount++ @@ -168,7 +207,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo } if *c.verbose { - fmt.Fprintf(c.writer, " verifed\n") + fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks)) } fileCount++ }