package topology import ( "errors" "math/rand" "strings" "sync" "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NodeId string type Node interface { Id() NodeId String() string FreeSpace() int64 ReserveOneVolume(r int64) (*DataNode, error) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) UpAdjustEcShardCountDelta(ecShardCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustMaxVolumeId(vid needle.VolumeId) GetVolumeCount() int64 GetEcShardCount() int64 GetActiveVolumeCount() int64 GetRemoteVolumeCount() int64 GetMaxVolumeCount() int64 GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) IsDataNode() bool IsRack() bool IsDataCenter() bool Children() []Node Parent() Node GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { volumeCount int64 remoteVolumeCount int64 activeVolumeCount int64 ecShardCount int64 maxVolumeCount int64 id NodeId parent Node sync.RWMutex // lock children children map[NodeId]Node maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string value interface{} } // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { candidates := make([]Node, 0, len(n.children)) var errs []string n.RLock() for _, node := range n.children { if err := filterFirstNodeFn(node); err == nil { candidates = append(candidates, node) } else { errs = append(errs, string(node.Id())+":"+err.Error()) } } n.RUnlock() if len(candidates) == 0 { return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } firstNode = candidates[rand.Intn(len(candidates))] glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) restNodes = make([]Node, numberOfNodes-1) candidates = candidates[:0] n.RLock() for _, node := range n.children { if node.Id() == firstNode.Id() { continue } if node.FreeSpace() <= 0 { continue } glog.V(2).Infoln("select rest node candidate:", node.Id()) candidates = append(candidates, node) } n.RUnlock() glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") ret := len(restNodes) == 0 for k, node := range candidates { if k < len(restNodes) { restNodes[k] = node if k == len(restNodes)-1 { ret = true } } else { r := rand.Intn(k + 1) if r < len(restNodes) { restNodes[r] = node } } } if !ret { glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") err = errors.New("No enough data node found!") } return } func (n *NodeImpl) IsDataNode() bool { return n.nodeType == "DataNode" } func (n *NodeImpl) IsRack() bool { return n.nodeType == "Rack" } func (n *NodeImpl) IsDataCenter() bool { return n.nodeType == "DataCenter" } func (n *NodeImpl) String() string { if n.parent != nil { return n.parent.String() + ":" + string(n.id) } return string(n.id) } func (n *NodeImpl) Id() NodeId { return n.id } func (n *NodeImpl) FreeSpace() int64 { freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount if n.ecShardCount > 0 { freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 } return freeVolumeSlotCount } func (n *NodeImpl) SetParent(node Node) { n.parent = node } func (n *NodeImpl) Children() (ret []Node) { n.RLock() defer n.RUnlock() for _, c := range n.children { ret = append(ret, c) } return ret } func (n *NodeImpl) Parent() Node { return n.parent } func (n *NodeImpl) GetValue() interface{} { return n.value } func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { freeSpace := node.FreeSpace() // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue } if r >= freeSpace { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } assignedNode, err = node.ReserveOneVolume(r) if err == nil { return } } } return nil, errors.New("No free volume slot found!") } func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) } } func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative atomic.AddInt64(&n.ecShardCount, ecShardCountDelta) if n.parent != nil { n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta) } } func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { n.parent.UpAdjustMaxVolumeId(vid) } } } func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } func (n *NodeImpl) GetEcShardCount() int64 { return n.ecShardCount } func (n *NodeImpl) GetRemoteVolumeCount() int64 { return n.remoteVolumeCount } func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } func (n *NodeImpl) LinkChildNode(node Node) { n.Lock() defer n.Unlock() if n.children[node.Id()] == nil { n.children[node.Id()] = node n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) glog.V(0).Infoln(n, "adds child", node.Id()) } } func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { n.Lock() defer n.Unlock() node := n.children[nodeId] if node != nil { node.SetParent(nil) delete(n.children, node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) glog.V(0).Infoln(n, "removes", node.Id()) } } func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode for _, v := range dn.GetVolumes() { if uint64(v.Size) >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) n.GetTopology().chanFullVolumes <- v } } } } else { for _, c := range n.Children() { c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) } } } func (n *NodeImpl) GetTopology() *Topology { var p Node p = n for p.Parent() != nil { p = p.Parent() } return p.GetValue().(*Topology) }