diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 94e99c8f6..cf01b5bd8 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,10 +2,11 @@ package weed_server import ( "fmt" + "time" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/spf13/viper" "google.golang.org/grpc" - "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -75,7 +76,7 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA return } if in.GetVolumeSizeLimit() != 0 { - vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit() + vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) } if in.GetLeader() != "" && masterNode != in.GetLeader() { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index ad3bd3f7a..a3b574324 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -47,9 +47,7 @@ func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } + nm.MaybeSetMaxFileKey(key) if !offset.IsZero() && size != TombstoneFileSize { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) @@ -67,7 +65,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { } return nil }) - glog.V(1).Infof("max file key: %d for file: %s", nm.MaximumFileKey, file.Name()) + glog.V(1).Infof("max file key: %d for file: %s", nm.MaxFileKey(), file.Name()) return nm, e } diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index cc3d9e028..0e2e16964 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -2,51 +2,64 @@ package storage import ( "fmt" + "os" + "sync/atomic" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/willf/bloom" - "os" ) type mapMetric struct { - DeletionCounter int `json:"DeletionCounter"` - FileCounter int `json:"FileCounter"` - DeletionByteCounter uint64 `json:"DeletionByteCounter"` - FileByteCounter uint64 `json:"FileByteCounter"` - MaximumFileKey NeedleId `json:"MaxFileKey"` + DeletionCounter uint32 `json:"DeletionCounter"` + FileCounter uint32 `json:"FileCounter"` + DeletionByteCounter uint64 `json:"DeletionByteCounter"` + FileByteCounter uint64 `json:"FileByteCounter"` + MaximumFileKey uint64 `json:"MaxFileKey"` } func (mm *mapMetric) logDelete(deletedByteCount uint32) { - mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) - mm.DeletionCounter++ + mm.LogDeletionCounter(deletedByteCount) } func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) { - if key > mm.MaximumFileKey { - mm.MaximumFileKey = key + mm.MaybeSetMaxFileKey(key) + mm.LogFileCounter(newSize) + if oldSize > 0 && oldSize != TombstoneFileSize { + mm.LogDeletionCounter(oldSize) } - mm.FileCounter++ - mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) +} +func (mm mapMetric) LogFileCounter(newSize uint32) { + atomic.AddUint32(&mm.FileCounter, 1) + atomic.AddUint64(&mm.FileByteCounter, uint64(newSize)) +} +func (mm mapMetric) LogDeletionCounter(oldSize uint32) { if oldSize > 0 { - mm.DeletionCounter++ - mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) + atomic.AddUint32(&mm.DeletionCounter, 1) + atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize)) + } +} +func (mm mapMetric) ContentSize() uint64 { + return atomic.LoadUint64(&mm.FileByteCounter) +} +func (mm mapMetric) DeletedSize() uint64 { + return atomic.LoadUint64(&mm.DeletionByteCounter) +} +func (mm mapMetric) FileCount() int { + return int(atomic.LoadUint32(&mm.FileCounter)) +} +func (mm mapMetric) DeletedCount() int { + return int(atomic.LoadUint32(&mm.DeletionCounter)) +} +func (mm mapMetric) MaxFileKey() NeedleId { + t := uint64(mm.MaximumFileKey) + return NeedleId(t) +} +func (mm mapMetric) MaybeSetMaxFileKey(key NeedleId) { + if key > mm.MaxFileKey() { + atomic.StoreUint64(&mm.MaximumFileKey, uint64(key)) } } -func (mm mapMetric) ContentSize() uint64 { - return mm.FileByteCounter -} -func (mm mapMetric) DeletedSize() uint64 { - return mm.DeletionByteCounter -} -func (mm mapMetric) FileCount() int { - return mm.FileCounter -} -func (mm mapMetric) DeletedCount() int { - return mm.DeletionCounter -} -func (mm mapMetric) MaxFileKey() NeedleId { - return mm.MaximumFileKey -} func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { mm = &mapMetric{} @@ -56,9 +69,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { bf = bloom.NewWithEstimates(uint(entryCount), 0.001) }, func(key NeedleId, offset Offset, size uint32) error { - if key > mm.MaximumFileKey { - mm.MaximumFileKey = key - } + mm.MaybeSetMaxFileKey(key) NeedleIdToBytes(buf, key) if size != TombstoneFileSize { mm.FileByteCounter += uint64(size) diff --git a/weed/storage/store.go b/weed/storage/store.go index 56e973738..d866d2e11 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -2,6 +2,8 @@ package storage import ( "fmt" + "sync/atomic" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" . "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -22,7 +24,7 @@ type Store struct { dataCenter string //optional informaton, overwriting master setting if exists rack string //optional information, overwriting master setting if exists connected bool - VolumeSizeLimit uint64 //read from the master + volumeSizeLimit uint64 //read from the master Client master_pb.Seaweed_SendHeartbeatClient NeedleMapType NeedleMapType NewVolumeIdChan chan VolumeId @@ -30,7 +32,7 @@ type Store struct { } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit) + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit()) return } @@ -150,7 +152,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } - if !v.expired(s.VolumeSizeLimit) { + if !v.expired(s.GetVolumeSizeLimit()) { volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage()) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { @@ -192,7 +194,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { _, size, err = v.writeNeedle(n) } else { - err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize()) + err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) } return } @@ -255,3 +257,11 @@ func (s *Store) DeleteVolume(i VolumeId) error { return fmt.Errorf("Volume %d not found on disk", i) } + +func (s *Store) SetVolumeSizeLimit(x uint64) { + atomic.StoreUint64(&s.volumeSizeLimit, x) +} + +func (s *Store) GetVolumeSizeLimit() uint64 { + return atomic.LoadUint64(&s.volumeSizeLimit) +}