diff --git a/weed/topology/replication_health_checker.go b/weed/topology/replication_health_checker.go deleted file mode 100644 index 947e7d45c..000000000 --- a/weed/topology/replication_health_checker.go +++ /dev/null @@ -1,297 +0,0 @@ -package topology - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" - "google.golang.org/grpc" - "sort" - "strings" - "sync" -) - -/** - check the replication health - */ -func (t *Topology) RepairUnhealthyReplicationInLayout(grpcDialOption grpc.DialOption, layout *VolumeLayout, eVid storage.VolumeId) error { - ctx := context.Background() - locations, exist := layout.vid2location[eVid] - if !exist { - retErr := fmt.Errorf("the volume:%v has no locations", eVid) - glog.V(0).Infof(retErr.Error()) - return retErr - } - - //glog.V(5).Infof("volume:%v, locations:%v", eVid, locations.list) - fileStat, err := getReplicationInfo(grpcDialOption, ctx, eVid, locations) - if err != nil { - glog.Errorf("get replication status failed, %v", err) - return err - } - - if isSameVolumeReplications(fileStat, layout.volumeSizeLimit) { - glog.V(0).Infof("the volume:%v has %d same replication, need not repair", eVid, len(fileStat)) - return nil - } - - // compact all the replications of volume - { - glog.V(4).Infof("begin compact all the replications of volume:%v", eVid) - allUrls := make([]string, 0, len(fileStat)) - for _, fs := range fileStat { - allUrls = append(allUrls, fs.url) - } - - if tryBatchCompactVolume(ctx, grpcDialOption, eVid, allUrls) == false { - err := fmt.Errorf("compact all the replications of volume:%v", eVid) - glog.Error(err.Error()) - return err - } - glog.V(4).Infof("success compact all the replications of volume:%v", eVid) - } - - // get replication status again - fileStat, err = getReplicationInfo(grpcDialOption, ctx, eVid, locations) - if err != nil { - return err - } - - okUrls, errUrls := filterErrorReplication(fileStat) - if len(errUrls) == 0 { - return nil // they are the same - } - - if len(okUrls) == 0 { - return fmt.Errorf("no correct volume replications, that's impossible") - } - - glog.V(4).Infof("need repair replication : %v", errUrls) - if len(locations.list) <= 0 { - return fmt.Errorf("that's impossible, the locatins of volume:%v is empty", eVid) - } - for _, url := range errUrls { - vInfo := locations.list[0].volumes[eVid] - err = syncReplication(grpcDialOption, okUrls[0], url, vInfo) - if nil != err { - glog.Error(err) - return err - } - } - return nil -} - -type FileStatus struct { - url string - fileStat *volume_server_pb.ReadVolumeFileStatusResponse -} - -func getReplicationInfo(grpcDialOption grpc.DialOption, ctx context.Context, vid storage.VolumeId, locs *VolumeLocationList) (fs []FileStatus, err error) { - type ResponsePair struct { - url string - status *volume_server_pb.ReadVolumeFileStatusResponse - err error - } - - var wg sync.WaitGroup - resultChan := make(chan ResponsePair, len(locs.list)) - wg.Add(len(locs.list)) - getFileStatFunc := func(url string, volumeId storage.VolumeId) { - defer wg.Done() - glog.V(4).Infof("volumeId:%v, location:%v", volumeId, url) - err := operation.WithVolumeServerClient(url, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - req := &volume_server_pb.ReadVolumeFileStatusRequest{ - VolumeId: uint32(volumeId), - } - respTmp, err := client.ReadVolumeFileStatus(ctx, req) - resultChan <- ResponsePair{ - url: url, - status: respTmp, - err: err, - } - return nil - }) - if nil != err { - glog.Error(err) - } - } - for _, node := range locs.list { - go getFileStatFunc(node.Url(), vid) - } - - go func() { // close channel - wg.Wait() - close(resultChan) - }() - - var errs []string - for result := range resultChan { - if result.err == nil { - fs = append(fs, FileStatus{ - url: result.url, - fileStat: result.status, - }) - continue - } - tmp := fmt.Sprintf("url : %s, error : %v", result.url, result.err) - errs = append(errs, tmp) - } - - if len(fs) == len(locs.list) { - return fs, nil - } - err = fmt.Errorf("get volume[%v] replication status failed, err : %s", vid, strings.Join(errs, "; ")) - return nil, err -} - -/** - : - the file count is the total count of the volume received from user clients -todo: this policy is not perfected or not rigorous, need fix - */ -func filterErrorReplication(fileStat []FileStatus) (okUrls, errUrls []string) { - sort.Slice(fileStat, func(i, j int) bool { - return fileStat[i].fileStat.FileCount > fileStat[j].fileStat.FileCount - }) - if fileStat[0].fileStat.FileCount != fileStat[len(fileStat)-1].fileStat.FileCount { - okFileCounter := fileStat[0].fileStat.FileCount - for _, v := range fileStat { - if okFileCounter == v.fileStat.FileCount { - okUrls = append(okUrls, v.url) - } else { - errUrls = append(errUrls, v.url) - } - } - return - } - return -} - -// execute the compact transaction -func compactVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { - glog.V(0).Infoln("Start vacuuming", vid, "on", volumeUrl) - err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ - VolumeId: uint32(vid), - }) - return err - }) - if err != nil { - glog.Errorf("Error when vacuuming %d on %s: %v", vid, volumeUrl, err) - return false - } - glog.V(0).Infof("Complete vacuuming volume:%v on %s", vid, volumeUrl) - return true -} - -// commit the compact transaction when compactVolume() return true -func commitCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { - err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ - VolumeId: uint32(vid), - }) - return err - }) - if err != nil { - glog.Errorf("Error when committing vacuum %d on %s: %v", vid, volumeUrl, err) - return false - } - glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, volumeUrl) - return true -} - -// rollback the compact transaction when compactVolume return false -func cleanupCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { - glog.V(0).Infoln("Start cleaning up", vid, "on", volumeUrl) - err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumeId: uint32(vid), - }) - return err - }) - if err != nil { - glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, volumeUrl, err) - return false - } - glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, volumeUrl) - return false -} - -func tryCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid storage.VolumeId, volumeUrl string) bool { - if compactVolume(ctx, grpcDialOption, volumeUrl, vid) == false { - return cleanupCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) - } - return commitCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) -} - -func tryBatchCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, - vid storage.VolumeId, urls []string) bool { - resultChan := make(chan error) - var wg sync.WaitGroup - wg.Add(len(urls)) - for _, url := range urls { - go func(volumeUrl string) { - defer wg.Done() - if tryCompactVolume(ctx, grpcDialOption, vid, volumeUrl) == false { - resultChan <- fmt.Errorf("url:%s", volumeUrl) - } - }(url) - } - - go func() { - wg.Wait() - close(resultChan) - }() - - var errs []string - for result := range resultChan { - if result != nil { - errs = append(errs, result.Error()) - } - } - if len(errs) > 0 { - glog.Errorf("consist volume:%v compact reversion failed, %s", vid, strings.Join(errs, "; ")) - return false - } - return true -} - -func isSameVolumeReplications(fileStat []FileStatus, volumeSizeLimit uint64) bool { - fileSizeSet := make(map[uint64]bool) - fileCountSet := make(map[uint64]bool) - lastModifiedSet := make(map[uint64]bool) - var oneFileSize uint64 = 0 - for _, v := range fileStat { - fileCountSet[v.fileStat.FileCount] = true - lastModifiedSet[v.fileStat.DatFileTimestamp] = true - fileSizeSet[v.fileStat.DatFileSize] = true - oneFileSize = v.fileStat.DatFileSize - } - - if (len(lastModifiedSet) == 1) && (len(fileCountSet) == 1) && - (len(fileSizeSet) == 1) && (oneFileSize >= volumeSizeLimit) { - return true - } - return false -} - -func syncReplication(grpcDialOption grpc.DialOption, srcUrl, destUrl string, vinfo storage.VolumeInfo) error { - ctx := context.Background() - err := operation.WithVolumeServerClient(destUrl, grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - if _, err := client.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ - VolumeId: uint32(vinfo.Id), - Collection: vinfo.Collection, - Replication: vinfo.ReplicaPlacement.String(), - Ttl: vinfo.Ttl.String(), - SourceDataNode: srcUrl, - }); err != nil { - glog.Errorf("sync replication failed, %v", err) - return err - } - return nil - }) - return err -}