1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-05 00:26:51 +02:00

master: ensure only one exclusive vacuum process

fix https://github.com/chrislusf/seaweedfs/issues/1011
This commit is contained in:
Chris Lu 2019-07-21 21:49:10 -07:00
parent f3b99cbfe5
commit 79762385bd
2 changed files with 13 additions and 0 deletions

View file

@ -16,6 +16,7 @@ import (
) )
type Topology struct { type Topology struct {
vacuumLockCounter int64
NodeImpl NodeImpl
collectionMap *util.ConcurrentReadMap collectionMap *util.ConcurrentReadMap
@ -33,6 +34,7 @@ type Topology struct {
Configuration *Configuration Configuration *Configuration
RaftServer raft.Server RaftServer raft.Server
} }
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology {

View file

@ -2,6 +2,7 @@ package topology
import ( import (
"context" "context"
"sync/atomic"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
@ -121,6 +122,16 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout,
} }
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int { func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
if !swapped {
return 0
}
defer atomic.StoreInt64(&t.vacuumLockCounter, 0)
// now only one vacuum process going on
glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() { for _, col := range t.collectionMap.Items() {
c := col.(*Collection) c := col.(*Collection)