1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-07-02 07:06:44 +02:00

volume: sync volume file right before compaction

fix https://github.com/chrislusf/seaweedfs/issues/1237
This commit is contained in:
Chris Lu 2020-03-19 23:54:52 -07:00
parent 709f231e23
commit 81797a059a
5 changed files with 22 additions and 0 deletions

View file

@ -19,6 +19,7 @@ type BackendStorageFile interface {
io.Closer
GetStat() (datSize int64, modTime time.Time, err error)
Name() string
Sync() error
}
type BackendStorage interface {

View file

@ -48,3 +48,7 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
func (df *DiskFile) Name() string {
return df.fullFilePath
}
func (df *DiskFile) Sync() error {
return df.File.Sync()
}

View file

@ -58,3 +58,7 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er
func (mmf *MemoryMappedFile) Name() string {
return mmf.mm.File.Name()
}
func (mm *MemoryMappedFile) Sync() error {
return nil
}

View file

@ -179,3 +179,7 @@ func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTi
func (s3backendStorageFile S3BackendStorageFile) Name() string {
return s3backendStorageFile.key
}
func (s3backendStorageFile S3BackendStorageFile) Sync() error {
return nil
}

View file

@ -53,6 +53,9 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact fail to sync volume %d", v.Id)
}
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
}
@ -73,6 +76,9 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume %d", v.Id)
}
return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
}
@ -93,6 +99,9 @@ func (v *Volume) CommitCompact() error {
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
v.nm.Close()
if v.DataBackend != nil {
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("fail to sync volume %d", v.Id)
}
if err := v.DataBackend.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id)
}