1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-05-04 18:40:44 +02:00
seaweedfs/weed/shell/command_volume_delete_empty.go
柏杰 0b0fb9b9e4
avoid data race read volume.IsEmpty (#4574)
* avoid data race read volume.IsEmpty

-   avoid phantom read isEmpty for onlyEmpty
-   use `v.DataBackend.GetStat()` in v.dataFileAccessLock scope

* add Destroy(onlyEmpty: true) test

* add Destroy(onlyEmpty: false) test

* remove unused `IsEmpty()`

* change literal `8` to `SuperBlockSize`
2023-06-14 14:39:58 -07:00

79 lines
2.2 KiB
Go

package shell
import (
"flag"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"io"
"log"
"time"
)
func init() {
Commands = append(Commands, &commandVolumeDeleteEmpty{})
}
type commandVolumeDeleteEmpty struct {
}
func (c *commandVolumeDeleteEmpty) Name() string {
return "volume.deleteEmpty"
}
func (c *commandVolumeDeleteEmpty) Help() string {
return `delete empty volumes from all volume servers
volume.deleteEmpty -quietFor=24h -force
This command deletes all empty volumes from one volume server.
`
}
func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
quietPeriod := volDeleteCommand.Duration("quietFor", 24*time.Hour, "select empty volumes with no recent writes, avoid newly created ones")
applyBalancing := volDeleteCommand.Bool("force", false, "apply to delete empty volumes")
if err = volDeleteCommand.Parse(args); err != nil {
return nil
}
infoAboutSimulationMode(writer, *applyBalancing, "-force")
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
quietSeconds := int64(*quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if *applyBalancing {
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id),
pb.NewServerAddressFromDataNode(dn), true); deleteErr != nil {
err = deleteErr
}
continue
} else {
log.Printf("empty volume %d from %s", v.Id, dn.Id)
}
}
}
}
})
return
}