package topology import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" "sync/atomic" ) type DataNode struct { NodeImpl Ip string Port int GrpcPort int PublicUrl string LastSeen int64 // unix time in seconds Counter int // in race condition, the previous dataNode was not dead IsTerminating bool } func NewDataNode(id string) *DataNode { dn := &DataNode{} dn.id = NodeId(id) dn.nodeType = "DataNode" dn.diskUsages = newDiskUsages() dn.children = make(map[NodeId]Node) dn.NodeImpl.value = dn return dn } func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() return dn.doAddOrUpdateVolume(v) } func (dn *DataNode) getOrCreateDisk(diskType string) *Disk { c, found := dn.children[NodeId(diskType)] if !found { c = NewDisk(diskType) dn.doLinkChildNode(c) } disk := c.(*Disk) return disk } func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) { disk := dn.getOrCreateDisk(v.DiskType) return disk.AddOrUpdateVolume(v) } // UpdateVolumes detects new/deleted/changed volumes on a volume server // used in master to notify master clients of these changes. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changedVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } dn.Lock() defer dn.Unlock() existingVolumes := dn.getVolumes() for _, v := range existingVolumes { vid := v.Id if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) disk := dn.getOrCreateDisk(v.DiskType) delete(disk.volumes, vid) deletedVolumes = append(deletedVolumes, v) deltaDiskUsages := newDiskUsages() deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) deltaDiskUsage.volumeCount = -1 if v.IsRemote() { deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { deltaDiskUsage.activeVolumeCount = -1 } disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } } for _, v := range actualVolumes { isNew, isChanged := dn.doAddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } if isChanged { changedVolumes = append(changedVolumes, v) } } return } func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) { dn.Lock() defer dn.Unlock() for _, v := range deletedVolumes { disk := dn.getOrCreateDisk(v.DiskType) if _, found := disk.volumes[v.Id]; !found { continue } delete(disk.volumes, v.Id) deltaDiskUsages := newDiskUsages() deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) deltaDiskUsage.volumeCount = -1 if v.IsRemote() { deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { deltaDiskUsage.activeVolumeCount = -1 } disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } for _, v := range newVolumes { dn.doAddOrUpdateVolume(v) } return } func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { deltaDiskUsages := newDiskUsages() for diskType, maxVolumeCount := range maxVolumeCounts { if maxVolumeCount == 0 { // the volume server may have set the max to zero continue } dt := types.ToDiskType(diskType) currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) currentDiskUsageMaxVolumeCount := atomic.LoadInt64(¤tDiskUsage.maxVolumeCount) if currentDiskUsageMaxVolumeCount == int64(maxVolumeCount) { continue } disk := dn.getOrCreateDisk(dt.String()) deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } } func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() for _, c := range dn.children { disk := c.(*Disk) ret = append(ret, disk.GetVolumes()...) } dn.RUnlock() return ret } func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) { dn.RLock() defer dn.RUnlock() found := false for _, c := range dn.children { disk := c.(*Disk) vInfo, found = disk.volumes[id] if found { break } } if found { return vInfo, nil } else { return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") } } func (dn *DataNode) GetDataCenter() *DataCenter { rack := dn.Parent() if rack == nil { return nil } dcNode := rack.Parent() if dcNode == nil { return nil } dcValue := dcNode.GetValue() return dcValue.(*DataCenter) } func (dn *DataNode) GetDataCenterId() string { if dc := dn.GetDataCenter(); dc != nil { return string(dc.Id()) } return "" } func (dn *DataNode) GetRack() *Rack { return dn.Parent().(*NodeImpl).value.(*Rack) } func (dn *DataNode) GetTopology() *Topology { p := dn.Parent() for p.Parent() != nil { p = p.Parent() } t := p.(*Topology) return t } func (dn *DataNode) MatchLocation(ip string, port int) bool { return dn.Ip == ip && dn.Port == port } func (dn *DataNode) Url() string { return util.JoinHostPort(dn.Ip, dn.Port) } func (dn *DataNode) ServerAddress() pb.ServerAddress { return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort) } type DataNodeInfo struct { Url string `json:"Url"` PublicUrl string `json:"PublicUrl"` Volumes int64 `json:"Volumes"` EcShards int64 `json:"EcShards"` Max int64 `json:"Max"` VolumeIds string `json:"VolumeIds"` } func (dn *DataNode) ToInfo() (info DataNodeInfo) { info.Url = dn.Url() info.PublicUrl = dn.PublicUrl // aggregated volume info var volumeCount, ecShardCount, maxVolumeCount int64 var volumeIds string for _, diskUsage := range dn.diskUsages.usages { volumeCount += diskUsage.volumeCount ecShardCount += diskUsage.ecShardCount maxVolumeCount += diskUsage.maxVolumeCount } for _, disk := range dn.Children() { d := disk.(*Disk) volumeIds += " " + d.GetVolumeIds() } info.Volumes = volumeCount info.EcShards = ecShardCount info.Max = maxVolumeCount info.VolumeIds = volumeIds return } func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ Id: string(dn.Id()), DiskInfos: make(map[string]*master_pb.DiskInfo), GrpcPort: uint32(dn.GrpcPort), } for _, c := range dn.Children() { disk := c.(*Disk) m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo() } return m } // GetVolumeIds returns the human readable volume ids limited to count of max 100. func (dn *DataNode) GetVolumeIds() string { dn.RLock() defer dn.RUnlock() existingVolumes := dn.getVolumes() ids := make([]int, 0, len(existingVolumes)) for k := range existingVolumes { ids = append(ids, int(k)) } return util.HumanReadableIntsMax(100, ids...) } func (dn *DataNode) getVolumes() []storage.VolumeInfo { var existingVolumes []storage.VolumeInfo for _, c := range dn.children { disk := c.(*Disk) existingVolumes = append(existingVolumes, disk.GetVolumes()...) } return existingVolumes }