diff --git a/weed/command/server.go b/weed/command/server.go index a99d5a858..4c39b099a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -55,6 +55,8 @@ var ( serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") + volumeFreeDiskSpaceWatermark = cmdServer.Flag.String("volume.freeDiskSpaceWatermark", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly") + // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -206,7 +208,8 @@ func runServer(cmd *Command, args []string) bool { // start volume server { - go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) + go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeFreeDiskSpaceWatermark) + } startMaster(masterOptions, serverWhiteList) diff --git a/weed/command/volume.go b/weed/command/volume.go index bb63a1f86..7f976192a 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -52,6 +52,7 @@ type VolumeServerOptions struct { memProfile *string compactionMBPerSecond *int fileSizeLimitMB *int + freeDiskSpaceWatermark []float32 } func init() { @@ -87,6 +88,7 @@ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + freeDiskSpaceWatermark = cmdVolume.Flag.String("freeDiskSpaceWatermark", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly") ) func runVolume(cmd *Command, args []string) bool { @@ -96,12 +98,12 @@ func runVolume(cmd *Command, args []string) bool { runtime.GOMAXPROCS(runtime.NumCPU()) grace.SetupProfiling(*v.cpuProfile, *v.memProfile) - v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) + v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *freeDiskSpaceWatermark) return true } -func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) { +func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption, freeDiskSpaceWatermark string) { // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") @@ -116,6 +118,16 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if len(v.folders) != len(v.folderMaxLimits) { glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) } + freeDiskSpaceWatermarkStrings := strings.Split(freeDiskSpaceWatermark, ",") + for _, freeString := range freeDiskSpaceWatermarkStrings { + + if value, e := strconv.ParseFloat(freeString, 32); e == nil { + v.freeDiskSpaceWatermark = append(v.freeDiskSpaceWatermark, float32(value)) + } else { + glog.Fatalf("The value specified in -freeDiskSpaceWatermark not a valid value %s", freeString) + } + } + for _, folder := range v.folders { if err := util.TestFolderWritable(folder); err != nil { glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) @@ -159,7 +171,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, - v.folders, v.folderMaxLimits, + v.folders, v.folderMaxLimits, v.freeDiskSpaceWatermark, volumeNeedleMapKind, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 2d716edc1..86e02cd92 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -35,7 +35,7 @@ type VolumeServer struct { func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, - folders []string, maxCounts []int, + folders []string, maxCounts []int, freeDiskSpaceWatermark []float32, needleMapKind storage.NeedleMapType, masterNodes []string, pulseSeconds int, dataCenter string, rack string, @@ -68,8 +68,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, } vs.SeedMasterNodes = masterNodes - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, freeDiskSpaceWatermark, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 088763c45..ba4d25635 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -1,11 +1,14 @@ package storage import ( - "fmt" - "io/ioutil" - "os" - "strings" - "sync" + "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -15,6 +18,7 @@ import ( type DiskLocation struct { Directory string MaxVolumeCount int + FreeDiskSpaceWatermark float32 volumes map[needle.VolumeId]*Volume volumesLock sync.RWMutex @@ -23,10 +27,11 @@ type DiskLocation struct { ecVolumesLock sync.RWMutex } -func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} +func NewDiskLocation(dir string, maxVolumeCount int, freeDiskSpaceWatermark float32) *DiskLocation { + location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, FreeDiskSpaceWatermark: freeDiskSpaceWatermark} location.volumes = make(map[needle.VolumeId]*Volume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + go location.CheckDiskSpace() return location } @@ -293,3 +298,21 @@ func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) return } + +func (l *DiskLocation) CheckDiskSpace() { + lastStat := false + t := time.NewTicker(time.Minute) + for _ = range t.C { + if dir, e := filepath.Abs(l.Directory); e == nil { + s := stats.NewDiskStatus(dir) + if (s.PercentFree < l.FreeDiskSpaceWatermark) != lastStat { + lastStat = !lastStat + for _, v := range l.volumes { + v.SetLowDiskSpace(lastStat) + } + + } + } + } + +} \ No newline at end of file diff --git a/weed/storage/store.go b/weed/storage/store.go index 14881ffde..2aff8c93f 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -48,11 +48,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, freeDiskSpaceWatermark []float32, needleMapKind NeedleMapType) (s *Store) { s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], freeDiskSpaceWatermark[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) diff --git a/weed/storage/volume.go b/weed/storage/volume.go index df63360a1..e10f5afaa 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -27,6 +27,7 @@ type Volume struct { needleMapKind NeedleMapType noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + lowDiskSpace bool hasRemoteFile bool // if the volume has a remote file MemoryMapMaxSizeMb uint32 @@ -45,6 +46,11 @@ type Volume struct { volumeInfo *volume_server_pb.VolumeInfo } +func (v *Volume) SetLowDiskSpace(lowDiskSpace bool) { + glog.V(0).Infof("SetLowDiskSpace id %d value %t", v.Id, lowDiskSpace) + v.lowDiskSpace = lowDiskSpace +} + func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, @@ -244,5 +250,5 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { } func (v *Volume) IsReadOnly() bool { - return v.noWriteOrDelete || v.noWriteCanDelete + return v.noWriteOrDelete || v.noWriteCanDelete || v.lowDiskSpace } diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 0dccdd0f2..d18dd6af0 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -41,7 +41,7 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() if oldV, ok := dn.volumes[v.Id]; !ok { @@ -64,12 +64,13 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { dn.UpAdjustRemoteVolumeCountDelta(-1) } } + isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly dn.volumes[v.Id] = v } return } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v @@ -91,10 +92,13 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume } dn.Unlock() for _, v := range actualVolumes { - isNew := dn.AddOrUpdateVolume(v) + isNew, isChangedRO := dn.AddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } + if isChangedRO { + changeRO = append(changeRO, v) + } } return } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index c24cab9d6..993f444a7 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -212,13 +212,18 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati } } // find out the delta volumes - newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) + var changedVolumes []storage.VolumeInfo + newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos) for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { t.UnRegisterVolumeLayout(v, dn) } + for _, v := range changedVolumes { + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl.ensureCorrectWritables(&v) + } return }