diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 4a1d2e056..d94e7ded8 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -3,8 +3,8 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" - "math/rand" "sort" "github.com/chrislusf/seaweedfs/weed/operation" @@ -27,11 +27,13 @@ func (c *commandVolumeFixReplication) Name() string { func (c *commandVolumeFixReplication) Help() string { return `add replicas to volumes that are missing replicas - This command finds all under-replicated volumes, and finds volume servers with free slots. + This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop. + + This command also finds all under-replicated volumes, and finds volume servers with free slots. If the free slots satisfy the replication requirement, the volume content is copied over and mounted. volume.fix.replication -n # do not take action - volume.fix.replication # actually copying the volume files and mount the volume + volume.fix.replication # actually deleting or copying the volume files and mount the volume Note: * each time this will only add back one replica for one volume id. If there are multiple replicas @@ -69,12 +71,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, v := range dn.VolumeInfos { - if v.ReplicaPlacement > 0 { - volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ - location: &loc, - info: v, - }) - } + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) } allLocations = append(allLocations, loc) }) @@ -82,7 +82,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all under replicated volumes var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 for vid, replicas := range volumeReplicas { - replica := replicas[rand.Intn(len(replicas))] + replica := replicas[0] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) if replicaPlacement.GetCopyCount() > len(replicas) { underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) @@ -92,6 +92,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } } + if len(overReplicatedVolumeIds) > 0 { + return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) + } + if len(underReplicatedVolumeIds) == 0 { return fmt.Errorf("no under replicated volumes") } @@ -107,10 +111,31 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } +func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range overReplicatedVolumeIds { + replicas := volumeReplicas[vid] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) + + replica := pickOneReplicaToDelete(replicas, replicaPlacement) + + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) + + if !takeAction { + break + } + + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil { + return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) + } + + } + return nil +} + func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { for _, vid := range underReplicatedVolumeIds { replicas := volumeReplicas[vid] - replica := replicas[rand.Intn(len(replicas))] + replica := pickOneReplicaToCopyFrom(replicas) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false for _, dst := range allLocations { @@ -191,20 +216,13 @@ func keepDataNodesSorted(dataNodes []location) { */ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool { - existingDataNodes := make(map[string]int) - for _, replica := range replicas { - existingDataNodes[replica.location.String()] += 1 - } - sameDataNodeCount := existingDataNodes[possibleLocation.String()] - // avoid duplicated volume on the same data node - if sameDataNodeCount > 0 { + existingDataCenters, _, existingDataNodes := countReplicas(replicas) + + if _, found := existingDataNodes[possibleLocation.String()]; found { + // avoid duplicated volume on the same data node return false } - existingDataCenters := make(map[string]int) - for _, replica := range replicas { - existingDataCenters[replica.location.DataCenter()] += 1 - } primaryDataCenters, _ := findTopKeys(existingDataCenters) // ensure data center count is within limit @@ -225,20 +243,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, rep } // now this is one of the primary dcs - existingRacks := make(map[string]int) + primaryDcRacks := make(map[string]int) for _, replica := range replicas { if replica.location.DataCenter() != possibleLocation.DataCenter() { continue } - existingRacks[replica.location.Rack()] += 1 + primaryDcRacks[replica.location.Rack()] += 1 } - primaryRacks, _ := findTopKeys(existingRacks) - sameRackCount := existingRacks[possibleLocation.Rack()] + primaryRacks, _ := findTopKeys(primaryDcRacks) + sameRackCount := primaryDcRacks[possibleLocation.Rack()] // ensure rack count is within limit - if _, found := existingRacks[possibleLocation.Rack()]; !found { + if _, found := primaryDcRacks[possibleLocation.Rack()]; !found { // different from existing racks - if len(existingRacks) < replicaPlacement.DiffRackCount+1 { + if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 { // lack on different racks return true } else { @@ -317,3 +335,43 @@ func (l location) Rack() string { func (l location) DataCenter() string { return l.dc } + +func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica { + mostRecent := replicas[0] + for _, replica := range replicas { + if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond { + mostRecent = replica + } + } + return mostRecent +} + +func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) { + diffDc = make(map[string]int) + diffRack = make(map[string]int) + diffNode = make(map[string]int) + for _, replica := range replicas { + diffDc[replica.location.DataCenter()] += 1 + diffRack[replica.location.Rack()] += 1 + diffNode[replica.location.String()] += 1 + } + return +} + +func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica { + + allSame := true + oldest := replicas[0] + for _, replica := range replicas { + if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond { + oldest = replica + allSame = false + } + } + if !allSame { + return oldest + } + + // TODO what if all the replicas have the same timestamp? + return oldest +}