From 6a4546d2c088b5d71f8b2200b758a82f1aad08c6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 01:30:07 -0800 Subject: [PATCH] shell: add volume.tier.move --- weed/shell/command_volume_fix_replication.go | 8 +-- weed/shell/command_volume_tier_move.go | 59 +++++++++++++++++++- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index bf20d3574..538351fd0 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -162,10 +162,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false hasSkippedCollection := false - keepDataNodesSorted(allLocations, replica.info.DiskType) + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) for _, dst := range allLocations { // check whether data nodes satisfy the constraints - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern if *c.collectionPattern != "" { @@ -216,8 +216,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm return nil } -func keepDataNodesSorted(dataNodes []location, diskType string) { - fn := capacityByFreeVolumeCount(types.ToDiskType(diskType)) +func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { + fn := capacityByFreeVolumeCount(diskType) sort.Slice(dataNodes, func(i, j int) bool { return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) }) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 42c710db3..18385667d 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -23,7 +23,7 @@ func (c *commandVolumeTierMove) Name() string { } func (c *commandVolumeTierMove) Help() string { - return ` change a volume from one disk type to another + return `change a volume from one disk type to another volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] @@ -45,6 +45,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") source := tierCommand.String("fromDiskType", "", "the source disk type") target := tierCommand.String("toDiskType", "", "the target disk type") + applyChange := tierCommand.Bool("force", false, "actually apply the changes") if err = tierCommand.Parse(args); err != nil { return nil } @@ -62,13 +63,67 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer return err } - // apply to all volumes in the collection + // collect all volumes that should change volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("tier move volumes: %v\n", volumeIds) + _, allLocations := collectVolumeReplicaLocations(topologyInfo) + for _, vid := range volumeIds { + if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil { + fmt.Printf("tier move volume %d: %v\n", vid, err) + } + } + + return nil +} + +func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { + // find volume location + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { + return fmt.Errorf("volume %d not found", vid) + } + + // find one server with the most empty volume slots with target disk type + hasFoundTarget := false + keepDataNodesSorted(allLocations, toDiskType) + fn := capacityByFreeVolumeCount(toDiskType) + for _, dst := range allLocations { + if fn(dst.dataNode) > 0 { + // ask the volume server to replicate the volume + sourceVolumeServer := "" + for _, loc := range locations { + if loc.Url != dst.dataNode.Id { + sourceVolumeServer = loc.Url + } + } + if sourceVolumeServer == "" { + continue + } + fmt.Fprintf(writer, "moving volume %d %s from %s to dataNode %s with disk type ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.String()) + hasFoundTarget = true + + if !applyChanges { + break + } + + // mark all replicas as read only + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations) + if err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) + } + return LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()) + + } + } + + if !hasFoundTarget { + fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.String(), vid) + } + return nil }