From 38e4b79125c211a38bc04cc26ece94d5a4d4f234 Mon Sep 17 00:00:00 2001 From: stlpmo Date: Tue, 24 Dec 2019 17:20:34 +0800 Subject: [PATCH] decouple the volume.Destroy() from the operation of unmountVolume() --- weed/storage/disk_location.go | 65 +++++++++++++++++++++++--------- weed/storage/disk_location_ec.go | 14 +++++++ 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 0536f085b..538f7469f 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -71,7 +71,6 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne } else { glog.V(0).Infof("new volume %s error %s", name, e) } - } } } @@ -116,28 +115,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { l.volumesLock.Lock() - for k, v := range l.volumes { - if v.Collection == collection { - e = l.deleteVolumeById(k) - if e != nil { - l.volumesLock.Unlock() - return - } - } - } + delVolsMap := l.unmountVolumeByCollection(collection) l.volumesLock.Unlock() l.ecVolumesLock.Lock() - for k, v := range l.ecVolumes { - if v.Collection == collection { - e = l.deleteEcVolumeById(k) - if e != nil { - l.ecVolumesLock.Unlock() - return + delEcVolsMap := l.unmountEcVolumeByCollection(collection) + l.ecVolumesLock.Unlock() + + errChain := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(2) + go func() { + for _, v := range delVolsMap { + if err := v.Destroy(); err != nil { + errChain <- err } } + wg.Done() + }() + + go func() { + for _, v := range delEcVolsMap { + v.Destroy() + } + wg.Done() + }() + + go func() { + wg.Wait() + close(errChain) + }() + + errBuilder := strings.Builder{} + for err := range errChain { + errBuilder.WriteString(err.Error()) + errBuilder.WriteString("; ") } - l.ecVolumesLock.Unlock() + if errBuilder.Len() > 0 { + e = fmt.Errorf(errBuilder.String()) + } + return } @@ -193,6 +210,20 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { return nil } +func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { + deltaVols := make(map[needle.VolumeId]*Volume, 0) + for k, v := range l.volumes { + if v.Collection == collectionName && !v.isCompacting { + deltaVols[k] = v + } + } + + for k, _ := range deltaVols { + delete(l.volumes, k) + } + return deltaVols +} + func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { l.volumesLock.Lock() defer l.volumesLock.Unlock() diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index ba0824c6d..f6c44e966 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) { delete(l.ecVolumes, vid) return } + +func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume { + deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0) + for k, v := range l.ecVolumes { + if v.Collection == collectionName { + deltaVols[k] = v + } + } + + for k, _ := range deltaVols { + delete(l.ecVolumes, k) + } + return deltaVols +}