diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 348646f04..08ff97198 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -10,7 +10,8 @@ import ( const ( MasterType = "master" - FilerType = "filer" + FilerType = "filer" + BrokerType = "broker" ) type ClusterNode struct { @@ -25,34 +26,34 @@ type Leaders struct { } type Cluster struct { - nodes map[pb.ServerAddress]*ClusterNode - nodesLock sync.RWMutex - leaders *Leaders + filers map[pb.ServerAddress]*ClusterNode + filersLock sync.RWMutex + filerLeaders *Leaders } func NewCluster() *Cluster { return &Cluster{ - nodes: make(map[pb.ServerAddress]*ClusterNode), - leaders: &Leaders{}, + filers: make(map[pb.ServerAddress]*ClusterNode), + filerLeaders: &Leaders{}, } } func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { switch nodeType { case FilerType: - cluster.nodesLock.Lock() - defer cluster.nodesLock.Unlock() - if existingNode, found := cluster.nodes[address]; found { + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() + if existingNode, found := cluster.filers[address]; found { existingNode.counter++ return nil } - cluster.nodes[address] = &ClusterNode{ + cluster.filers[address] = &ClusterNode{ Address: address, Version: version, counter: 1, createdTs: time.Now(), } - return cluster.ensureLeader(true, nodeType, address) + return cluster.ensureFilerLeaders(true, nodeType, address) case MasterType: } return nil @@ -61,15 +62,15 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { switch nodeType { case FilerType: - cluster.nodesLock.Lock() - defer cluster.nodesLock.Unlock() - if existingNode, found := cluster.nodes[address]; !found { + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() + if existingNode, found := cluster.filers[address]; !found { return nil } else { existingNode.counter-- if existingNode.counter <= 0 { - delete(cluster.nodes, address) - return cluster.ensureLeader(false, nodeType, address) + delete(cluster.filers, address) + return cluster.ensureFilerLeaders(false, nodeType, address) } } case MasterType: @@ -80,9 +81,9 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { switch nodeType { case FilerType: - cluster.nodesLock.RLock() - defer cluster.nodesLock.RUnlock() - for _, node := range cluster.nodes { + cluster.filersLock.RLock() + defer cluster.filersLock.RUnlock() + for _, node := range cluster.filers { nodes = append(nodes, node) } case MasterType: @@ -91,12 +92,12 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) } func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool { - return cluster.leaders.isOneLeader(address) + return cluster.filerLeaders.isOneLeader(address) } -func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { +func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { if isAdd { - if cluster.leaders.addLeaderIfVacant(address) { + if cluster.filerLeaders.addLeaderIfVacant(address) { // has added the address as one leader result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ @@ -117,7 +118,7 @@ func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.Ser }) } } else { - if cluster.leaders.removeLeaderIfExists(address) { + if cluster.filerLeaders.removeLeaderIfExists(address) { result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ @@ -132,8 +133,8 @@ func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.Ser var shortestDuration int64 = math.MaxInt64 now := time.Now() var candidateAddress pb.ServerAddress - for _, node := range cluster.nodes { - if cluster.leaders.isOneLeader(node.Address) { + for _, node := range cluster.filers { + if cluster.filerLeaders.isOneLeader(node.Address) { continue } duration := now.Sub(node.createdTs).Nanoseconds() @@ -143,7 +144,7 @@ func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.Ser } } if candidateAddress != "" { - cluster.leaders.addLeaderIfVacant(candidateAddress) + cluster.filerLeaders.addLeaderIfVacant(candidateAddress) // added a new leader result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go index dd68d59b9..b56ee501c 100644 --- a/weed/cluster/cluster_test.go +++ b/weed/cluster/cluster_test.go @@ -14,7 +14,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { assert.Equal(t, []pb.ServerAddress{ pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), - }, c.leaders.GetLeaders()) + }, c.filerLeaders.GetLeaders()) c.AddClusterNode("filer", pb.ServerAddress("111:3"), "23.45") c.AddClusterNode("filer", pb.ServerAddress("111:4"), "23.45") @@ -22,7 +22,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.leaders.GetLeaders()) + }, c.filerLeaders.GetLeaders()) c.AddClusterNode("filer", pb.ServerAddress("111:5"), "23.45") c.AddClusterNode("filer", pb.ServerAddress("111:6"), "23.45") @@ -31,7 +31,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.leaders.GetLeaders()) + }, c.filerLeaders.GetLeaders()) // remove oldest c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) @@ -39,7 +39,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:6"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.leaders.GetLeaders()) + }, c.filerLeaders.GetLeaders()) // remove oldest c.RemoveClusterNode("filer", pb.ServerAddress("111:1"))