diff --git a/weed/command/backup.go b/weed/command/backup.go index eb2b5ba4a..615be80cf 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil { + if err = v.Compact2(30*1024*1024*1024, 0); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } diff --git a/weed/command/compact.go b/weed/command/compact.go index 85313b749..4e28aa725 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -50,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { - if err = v.Compact2(preallocate); err != nil { + if err = v.Compact2(preallocate, 0); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index e94d9b516..38159496e 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -16,8 +16,7 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { if v := s.findVolume(vid); v != nil { - return v.Compact2(preallocate) // compactionBytePerSecond - // return v.Compact(preallocate, compactionBytePerSecond) + return v.Compact2(preallocate, compactionBytePerSecond) } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 5d0d63877..669d5dd6c 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -57,7 +57,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error } // compact a volume based on deletions in .idx files -func (v *Volume) Compact2(preallocate int64) error { +func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error { if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -73,7 +73,7 @@ func (v *Volume) Compact2(preallocate int64) error { v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate) + return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) } func (v *Volume) CommitCompact() error { @@ -366,7 +366,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return } -func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) { +func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) { var ( srcDatBackend, dstDatBackend backend.BackendStorageFile dataFile *os.File @@ -395,6 +395,8 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str dstDatBackend.WriteAt(sb.Bytes(), 0) newOffset := int64(sb.BlockSize()) + writeThrottler := util.NewWriteThrottler(compactionBytePerSecond) + oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { offset, size := value.Offset, value.Size @@ -419,7 +421,9 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil { return fmt.Errorf("cannot append needle: %s", err) } - newOffset += n.DiskSize(version) + delta := n.DiskSize(version) + newOffset += delta + writeThrottler.MaybeSlowdown(delta) glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) return nil diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 95f43d6ec..51f04c8b1 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -84,7 +84,7 @@ func TestCompaction(t *testing.T) { } startTime := time.Now() - v.Compact2(0) + v.Compact2(0, 0) speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed)