diff --git a/weed/command/backup.go b/weed/command/backup.go index a6d660006..0f6bed225 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -121,7 +121,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact(0, 0); err != nil { + if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } diff --git a/weed/command/compact.go b/weed/command/compact.go index 4a54f5670..85313b749 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -17,6 +17,9 @@ var cmdCompact = &Command{ The compacted .dat file is stored as .cpd file. The compacted .idx file is stored as .cpx file. + For method=0, it compacts based on the .dat file, works if .idx file is corrupted. + For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files. + `, } @@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { - if err = v.Compact2(); err != nil { + if err = v.Compact2(preallocate); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index e4273f1b2..37dee7889 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -34,14 +34,12 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) - // glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) if !oldOffset.IsZero() && oldSize != TombstoneFileSize { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } } else { oldSize := nm.m.Delete(NeedleId(key)) - // glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go new file mode 100644 index 000000000..ae2177a30 --- /dev/null +++ b/weed/storage/needle_map_metric_test.go @@ -0,0 +1,31 @@ +package storage + +import ( + "io/ioutil" + "math/rand" + "testing" + + "github.com/chrislusf/seaweedfs/weed/glog" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func TestFastLoadingNeedleMapMetrics(t *testing.T) { + + idxFile, _ := ioutil.TempFile("", "tmp.idx") + nm := NewCompactNeedleMap(idxFile) + + for i := 0; i < 10000; i++ { + nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1)) + if rand.Float32() < 0.2 { + nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0))) + } + } + + mm, _ := newNeedleMapMetricFromIndexFile(idxFile) + + glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount()) + glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize()) + glog.V(0).Infof("ContentSize expected %d actual %d", nm.ContentSize(), mm.ContentSize()) + glog.V(0).Infof("DeletedCount expected %d actual %d", nm.DeletedCount(), mm.DeletedCount()) + glog.V(0).Infof("MaxFileKey expected %d actual %d", nm.MaxFileKey(), mm.MaxFileKey()) +} diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index b1f1a6277..5dacb71bf 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -16,7 +16,7 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { if v := s.findVolume(vid); v != nil { - return v.Compact(preallocate, compactionBytePerSecond) + return v.Compact2(preallocate) // compactionBytePerSecond } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index d87a58302..a09939447 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -75,7 +75,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if err == nil && alsoLoadIndex { var indexFile *os.File if v.noWriteOrDelete { - glog.V(1).Infoln("open to read file", fileName+".idx") + glog.V(0).Infoln("open to read file", fileName+".idx") if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil { return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 704f1f4ef..434b5989d 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -20,9 +20,19 @@ func (v *Volume) garbageLevel() float64 { if v.ContentSize() == 0 { return 0 } - return float64(v.DeletedSize()) / float64(v.ContentSize()) + deletedSize := v.DeletedSize() + fileSize := v.ContentSize() + if v.DeletedCount() > 0 && v.DeletedSize() == 0 { + // this happens for .sdx converted back to normal .idx + // where deleted entry size is missing + datFileSize, _, _ := v.FileStat() + deletedSize = datFileSize - fileSize - super_block.SuperBlockSize + fileSize = datFileSize + } + return float64(deletedSize) / float64(fileSize) } +// compact a volume based on deletions in .dat files func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory @@ -45,7 +55,8 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) } -func (v *Volume) Compact2() error { +// compact a volume based on deletions in .idx files +func (v *Volume) Compact2(preallocate int64) error { if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -58,8 +69,10 @@ func (v *Volume) Compact2() error { }() filePath := v.FileName() + v.lastCompactIndexOffset = v.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") + return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx", preallocate) } func (v *Volume) CommitCompact() error { @@ -140,6 +153,7 @@ func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (com return superBlock.CompactionRevision, nil } +// if old .dat and .idx files are updated, this func tries to apply the same changes to new files accordingly func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { var indexSize int64 @@ -150,6 +164,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI oldDatBackend := backend.NewDiskFile(oldDatFile) defer oldDatBackend.Close() + // skip if the old .idx file has not changed if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) } @@ -157,6 +172,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return nil } + // fail if the old .dat file has changed to a new revision oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend) if err != nil { return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err) @@ -337,14 +353,14 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return } -func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { +func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) { var ( - dst, oldIndexFile *os.File + dstDatBackend backend.BackendStorageFile + oldIndexFile *os.File ) - if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil { return } - dstDatBackend := backend.NewDiskFile(dst) defer dstDatBackend.Close() if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { @@ -357,7 +373,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { now := uint64(time.Now().Unix()) v.SuperBlock.CompactionRevision++ - dst.Write(v.SuperBlock.Bytes()) + dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0) newOffset := int64(v.SuperBlock.BlockSize()) idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 07b9f70c1..95f43d6ec 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -84,7 +84,7 @@ func TestCompaction(t *testing.T) { } startTime := time.Now() - v.Compact(0, 1024*1024) + v.Compact2(0) speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed)