From b335f81a4fb51e57b0da37b2b662e823369459f6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 3 May 2019 17:22:39 -0700 Subject: [PATCH] volume: add option to limit compaction speed --- weed/command/backup.go | 2 +- weed/command/compact.go | 2 +- weed/command/server.go | 1 + weed/command/volume.go | 3 ++ weed/server/volume_grpc_vacuum.go | 2 +- weed/server/volume_server.go | 26 ++++++++------- weed/storage/store_vacuum.go | 4 +-- weed/storage/volume_vacuum.go | 52 ++++++++++++++++++++++-------- weed/storage/volume_vacuum_test.go | 10 ++++-- 9 files changed, 69 insertions(+), 33 deletions(-) diff --git a/weed/command/backup.go b/weed/command/backup.go index 51a89a5af..022e784c7 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -91,7 +91,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact(0); err != nil { + if err = v.Compact(0, 0); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } diff --git a/weed/command/compact.go b/weed/command/compact.go index 3ac09259e..79d50c095 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -43,7 +43,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Load Volume [ERROR] %s\n", err) } if *compactMethod == 0 { - if err = v.Compact(preallocate); err != nil { + if err = v.Compact(preallocate, 0); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { diff --git a/weed/command/server.go b/weed/command/server.go index 228594ad0..ce402f1cd 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -94,6 +94,7 @@ func init() { serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") diff --git a/weed/command/volume.go b/weed/command/volume.go index b87555456..4d34fbc1e 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -43,6 +43,7 @@ type VolumeServerOptions struct { readRedirect *bool cpuProfile *string memProfile *string + compactionMBPerSecond *int } func init() { @@ -63,6 +64,7 @@ func init() { v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") + v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit compaction speed in mega bytes per second") } var cmdVolume = &Command{ @@ -157,6 +159,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readRedirect, + *v.compactionMBPerSecond, ) listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index 4aa6588cb..24f982241 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCompactResponse{} - err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate) + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond) if err != nil { glog.Errorf("compact volume %d: %v", req.VolumeId, err) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 8e77ec570..be1b433f7 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -20,9 +20,10 @@ type VolumeServer struct { guard *security.Guard grpcDialOption grpc.DialOption - needleMapKind storage.NeedleMapType - FixJpgOrientation bool - ReadRedirect bool + needleMapKind storage.NeedleMapType + FixJpgOrientation bool + ReadRedirect bool + compactionBytePerSecond int64 } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -33,20 +34,23 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool) *VolumeServer { + readRedirect bool, + compactionMBPerSecond int, +) *VolumeServer { v := viper.GetViper() signingKey := v.GetString("jwt.signing.key") enableUiAccess := v.GetBool("access.ui") vs := &VolumeServer{ - pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, - needleMapKind: needleMapKind, - FixJpgOrientation: fixJpgOrientation, - ReadRedirect: readRedirect, - grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + needleMapKind: needleMapKind, + FixJpgOrientation: fixJpgOrientation, + ReadRedirect: readRedirect, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), + compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, } vs.MasterNodes = masterNodes vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 5f982a8c3..b1f1a6277 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -14,9 +14,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) } -func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64) error { +func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { if v := s.findVolume(vid); v != nil { - return v.Compact(preallocate) + return v.Compact(preallocate, compactionBytePerSecond) } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index e31908764..d8141cf71 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -18,7 +18,7 @@ func (v *Volume) garbageLevel() float64 { return float64(v.nm.DeletedSize()) / float64(v.ContentSize()) } -func (v *Volume) Compact(preallocate int64) error { +func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { glog.V(3).Infof("Compacting volume %d ...", v.Id) //no need to lock for copy on write //v.accessLock.Lock() @@ -29,7 +29,7 @@ func (v *Volume) Compact(preallocate int64) error { v.lastCompactIndexOffset = v.nm.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) } func (v *Volume) Compact2() error { @@ -236,12 +236,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } type VolumeFileScanner4Vacuum struct { - version needle.Version - v *Volume - dst *os.File - nm *NeedleMap - newOffset int64 - now uint64 + version needle.Version + v *Volume + dst *os.File + nm *NeedleMap + newOffset int64 + now uint64 + compactionBytePerSecond int64 + lastSizeCounter int64 + lastSizeCheckTime time.Time } func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error { @@ -269,13 +272,32 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } - scanner.newOffset += n.DiskSize(scanner.version) + delta := n.DiskSize(scanner.version) + scanner.newOffset += delta + scanner.maybeSlowdown(delta) glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size) } return nil } +func (scanner *VolumeFileScanner4Vacuum) maybeSlowdown(delta int64) { + if scanner.compactionBytePerSecond > 0 { + scanner.lastSizeCounter += delta + now := time.Now() + elapsedDuration := now.Sub(scanner.lastSizeCheckTime) + if elapsedDuration > 100*time.Millisecond { + overLimitBytes := scanner.lastSizeCounter - scanner.compactionBytePerSecond/10 + if overLimitBytes > 0 { + overRatio := float64(overLimitBytes) / float64(scanner.compactionBytePerSecond) + sleepTime := time.Duration(overRatio*1000) * time.Millisecond + // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio) + time.Sleep(sleepTime) + } + scanner.lastSizeCounter, scanner.lastSizeCheckTime = 0, time.Now() + } + } +} -func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64) (err error) { +func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { var ( dst, idx *os.File ) @@ -290,10 +312,12 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca defer idx.Close() scanner := &VolumeFileScanner4Vacuum{ - v: v, - now: uint64(time.Now().Unix()), - nm: NewBtreeNeedleMap(idx), - dst: dst, + v: v, + now: uint64(time.Now().Unix()), + nm: NewBtreeNeedleMap(idx), + dst: dst, + compactionBytePerSecond: compactionBytePerSecond, + lastSizeCheckTime: time.Now(), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) return diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index d038eeda3..54899c788 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "os" "testing" + "time" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -72,8 +73,8 @@ func TestCompaction(t *testing.T) { t.Fatalf("volume creation: %v", err) } - beforeCommitFileCount := 1000 - afterCommitFileCount := 1000 + beforeCommitFileCount := 10000 + afterCommitFileCount := 10000 infos := make([]*needleInfo, beforeCommitFileCount+afterCommitFileCount) @@ -81,7 +82,10 @@ func TestCompaction(t *testing.T) { doSomeWritesDeletes(i, v, t, infos) } - v.Compact(0) + startTime := time.Now() + v.Compact(0, 1024*1024) + speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() + t.Logf("compaction speed: %.2f bytes/s", speed) for i := 1; i <= afterCommitFileCount; i++ { doSomeWritesDeletes(i+beforeCommitFileCount, v, t, infos)