From 3137777d8395111f6c1eb4b3653e13f4961b8510 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 22 Mar 2020 16:21:42 -0700 Subject: [PATCH] volume: automatically detect max volume count --- weed/server/master_grpc_server.go | 5 +++++ weed/server/volume_grpc_client_to_master.go | 7 +++++- weed/storage/disk_location.go | 16 +++++++++++++ weed/storage/disk_location_ec.go | 7 ++++++ weed/storage/erasure_coding/ec_volume.go | 7 ++++++ weed/storage/store.go | 25 +++++++++++++++++++++ 6 files changed, 66 insertions(+), 1 deletion(-) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 84087df8b..cfe5fd9c0 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -81,6 +81,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } + if dn.GetMaxVolumeCount() != int64(heartbeat.MaxVolumeCount) { + delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount() + dn.UpAdjustMaxVolumeCountDelta(delta) + } + glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 1f4d9df10..517eb4bc0 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -80,8 +80,13 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi doneChan <- err return } - if in.GetVolumeSizeLimit() != 0 { + if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() { vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) + if vs.store.MaybeAdjustVolumeMax() { + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + } + } } if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index f15303282..3c8a7b864 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -275,3 +275,19 @@ func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) { return nil, false } + +func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) { + + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() + + for _, vol := range l.volumes { + if vol.IsReadOnly() { + continue + } + datSize, idxSize, _ := vol.FileStat() + unUsedSpace += volumeSizeLimit - (datSize + idxSize) + } + + return +} diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index f6c44e966..72d3e2b3e 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -183,3 +183,10 @@ func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[ne } return deltaVols } + +func (l *DiskLocation) EcVolumesLen() int { + l.ecVolumesLock.RLock() + defer l.ecVolumesLock.RUnlock() + + return len(l.ecVolumes) +} diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 3d9aa2cff..eef53765f 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -152,6 +152,13 @@ func (ev *EcVolume) ShardSize() int64 { return 0 } +func (ev *EcVolume) Size() (size int64) { + for _, shard := range ev.Shards { + size += shard.Size() + } + return +} + func (ev *EcVolume) CreatedAt() time.Time { return ev.ecxCreatedAt } diff --git a/weed/storage/store.go b/weed/storage/store.go index 76fe4de27..4ef3682d8 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -99,6 +100,9 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { max := 0 for _, location := range s.Locations { currentFreeCount := location.MaxVolumeCount - location.VolumesLen() + currentFreeCount *= erasure_coding.DataShardsCount + currentFreeCount -= location.EcVolumesLen() + currentFreeCount /= erasure_coding.DataShardsCount if currentFreeCount > max { max = currentFreeCount ret = location @@ -382,3 +386,24 @@ func (s *Store) SetVolumeSizeLimit(x uint64) { func (s *Store) GetVolumeSizeLimit() uint64 { return atomic.LoadUint64(&s.volumeSizeLimit) } + +func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { + volumeSizeLimit := s.GetVolumeSizeLimit() + for _, diskLocation := range s.Locations { + if diskLocation.MaxVolumeCount == 0 { + diskStatus := stats.NewDiskStatus(diskLocation.Directory) + unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit) + unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace) + volCount := diskLocation.VolumesLen() + maxVolumeCount := volCount + if unclaimedSpaces > int64(volumeSizeLimit) { + maxVolumeCount += int(uint64(unclaimedSpaces)/volumeSizeLimit) - 1 + } + diskLocation.MaxVolumeCount = maxVolumeCount + glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%d/MB", + diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024) + hasChanges = true + } + } + return +}