diff --git a/weed/topology/topology.go b/weed/topology/topology.go index aa01190c9..7cbf80b4d 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -16,6 +16,7 @@ import ( ) type Topology struct { + vacuumLockCounter int64 NodeImpl collectionMap *util.ConcurrentReadMap @@ -33,6 +34,7 @@ type Topology struct { Configuration *Configuration RaftServer raft.Server + } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 438ca8de9..37a6a30b9 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,6 +2,7 @@ package topology import ( "context" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -121,6 +122,16 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, } func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int { + + // if there is vacuum going on, return immediately + swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) + if !swapped { + return 0 + } + defer atomic.StoreInt64(&t.vacuumLockCounter, 0) + + // now only one vacuum process going on + glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) for _, col := range t.collectionMap.Items() { c := col.(*Collection)