1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-04 08:06:53 +02:00

fix the bug of volume never be vacuumed

This commit is contained in:
zhangsong 2019-12-01 23:29:41 +08:00 committed by zhangsong
parent 0da7b894cc
commit e83c36e26f

View file

@ -13,8 +13,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
) )
func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
ch := make(chan bool, locationlist.Length()) locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
ch := make(chan int, locationlist.Length())
errCount := int32(0)
for index, dn := range locationlist.list { for index, dn := range locationlist.list {
go func(index int, url string, vid needle.VolumeId) { go func(index int, url string, vid needle.VolumeId) {
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@ -22,11 +24,15 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
VolumeId: uint32(vid), VolumeId: uint32(vid),
}) })
if err != nil { if err != nil {
ch <- false atomic.AddInt32(&errCount, 1)
ch <- -1
return err return err
} }
isNeeded := resp.GarbageRatio > garbageThreshold if resp.GarbageRatio >= garbageThreshold {
ch <- isNeeded ch <- index
} else {
ch <- -1
}
return nil return nil
}) })
if err != nil { if err != nil {
@ -34,18 +40,21 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
} }
}(index, dn.Url(), vid) }(index, dn.Url(), vid)
} }
isCheckSuccess := true vacuumLocationList := NewVolumeLocationList()
for range locationlist.list { for range locationlist.list {
select { select {
case canVacuum := <-ch: case index := <-ch:
isCheckSuccess = isCheckSuccess && canVacuum if index != -1 {
vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
}
case <-time.After(30 * time.Minute): case <-time.After(30 * time.Minute):
return false return vacuumLocationList, false
} }
} }
return isCheckSuccess return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
} }
func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock() vl.accessLock.Lock()
vl.removeFromWritable(vid) vl.removeFromWritable(vid)
vl.accessLock.Unlock() vl.accessLock.Unlock()
@ -163,11 +172,12 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
} }
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) { if vacuumLocationList, needVacuum := batchVacuumVolumeCheck(
if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) { grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum {
batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList) if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} else { } else {
batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList) batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} }
} }
} }