Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #4491: Make memberlist cluster rejoin dead nodes periodically #4528

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func run(o *Options) error {
return fmt.Errorf("invalid Node Transport IPAddr in Node config: %v", nodeConfig)
}
memberlistCluster, err = memberlist.NewCluster(nodeTransportIP, o.config.ClusterMembershipPort,
nodeConfig.Name, nodeInformer, externalIPPoolInformer,
nodeConfig.Name, nodeInformer, externalIPPoolInformer, nil,
)
if err != nil {
return fmt.Errorf("error creating new memberlist cluster: %v", err)
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function generate_mocks {
"pkg/agent/cniserver/ipam IPAMDriver testing"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing"
"pkg/agent/interfacestore InterfaceStore testing"
"pkg/agent/memberlist Memberlist testing"
"pkg/agent/multicast RouteInterface testing"
"pkg/agent/types McastNetworkPolicyController testing"
"pkg/agent/nodeportlocal/portcache LocalPortOpener testing"
Expand Down
138 changes: 98 additions & 40 deletions pkg/agent/memberlist/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package memberlist
import (
"errors"
"fmt"
"io/ioutil"
"io"
"net"
"reflect"
"sync"
Expand Down Expand Up @@ -88,13 +88,20 @@ type Interface interface {
AddClusterEventHandler(handler ClusterNodeEventHandler)
}

type Memberlist interface {
Join(existing []string) (int, error)
Members() []*memberlist.Node
Leave(timeout time.Duration) error
Shutdown() error
}

// Cluster implements ClusterInterface.
type Cluster struct {
bindPort int
// Name of local Node. Node name must be unique in the cluster.
nodeName string

mList *memberlist.Memberlist
mList Memberlist
// consistentHash hold the consistentHashMap, when a Node join cluster, use method Add() to add a key to the hash.
// when a Node leave the cluster, the consistentHashMap should be update.
consistentHashMap map[string]*consistenthash.Map
Expand Down Expand Up @@ -129,13 +136,15 @@ func NewCluster(
nodeName string,
nodeInformer coreinformers.NodeInformer,
externalIPPoolInformer crdinformers.ExternalIPPoolInformer,
ml Memberlist, // Parameterized for testing, could be left nil for production code.
) (*Cluster, error) {
// The Node join/leave events will be notified via it.
nodeEventCh := make(chan memberlist.NodeEvent, 1024)
c := &Cluster{
bindPort: clusterBindPort,
nodeName: nodeName,
consistentHashMap: make(map[string]*consistenthash.Map),
mList: ml,
nodeEventsCh: nodeEventCh,
nodeInformer: nodeInformer,
nodeLister: nodeInformer.Lister(),
Expand All @@ -146,20 +155,24 @@ func NewCluster(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalIPPool"),
}

conf := memberlist.DefaultLocalConfig()
conf.Name = c.nodeName
conf.BindPort = c.bindPort
conf.AdvertisePort = c.bindPort
conf.AdvertiseAddr = nodeIP.String()
conf.Events = &memberlist.ChannelEventDelegate{Ch: nodeEventCh}
conf.LogOutput = ioutil.Discard
klog.V(1).InfoS("New memberlist cluster", "config", conf)

mList, err := memberlist.Create(conf)
if err != nil {
return nil, fmt.Errorf("failed to create memberlist cluster: %v", err)
if ml == nil {
conf := memberlist.DefaultLocalConfig()
conf.Name = c.nodeName
conf.BindPort = c.bindPort
conf.AdvertisePort = c.bindPort
conf.AdvertiseAddr = nodeIP.String()
// Setting it to a non-zero value to allow reclaiming Nodes with different addresses for Node IP update case.
conf.DeadNodeReclaimTime = 10 * time.Millisecond
conf.Events = &memberlist.ChannelEventDelegate{Ch: nodeEventCh}
conf.LogOutput = io.Discard
klog.V(1).InfoS("New memberlist cluster", "config", conf)

mList, err := memberlist.Create(conf)
if err != nil {
return nil, fmt.Errorf("failed to create memberlist cluster: %v", err)
}
c.mList = mList
}
c.mList = mList

nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -188,13 +201,16 @@ func NewCluster(

func (c *Cluster) handleCreateNode(obj interface{}) {
node := obj.(*corev1.Node)
if member, err := c.newClusterMember(node); err == nil {
_, err := c.mList.Join([]string{member})
if err != nil {
klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member)
// Ignore the Node itself.
if node.Name != c.nodeName {
if member, err := c.newClusterMember(node); err == nil {
_, err := c.mList.Join([]string{member})
if err != nil {
klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member)
}
} else {
klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name)
}
} else {
klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name)
}

affectedEIPs := c.filterEIPsFromNodeLabels(node)
Expand Down Expand Up @@ -261,8 +277,7 @@ func (c *Cluster) enqueueExternalIPPool(obj interface{}) {
c.queue.Add(eip.Name)
}

// newClusterMember gets the Node's IP and returns a cluster member "<IP>:<clusterMemberlistPort>"
// representing that Node in the memberlist cluster.
// newClusterMember gets the Node's IP and returns it as a cluster member for memberlist cluster to join.
func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) {
nodeAddrs, err := k8s.GetNodeAddrs(node)
if err != nil {
Expand All @@ -277,11 +292,7 @@ func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) {

func (c *Cluster) filterEIPsFromNodeLabels(node *corev1.Node) sets.String {
pools := sets.NewString()
eips, err := c.externalIPPoolLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "Filter ExternalIPPools from nodeLabels failed")
return pools
}
eips, _ := c.externalIPPoolLister.List(labels.Everything())
for _, eip := range eips {
nodeSelector, _ := metav1.LabelSelectorAsSelector(&eip.Spec.NodeSelector)
if nodeSelector.Matches(labels.Set(node.GetLabels())) {
Expand Down Expand Up @@ -312,14 +323,64 @@ func (c *Cluster) Run(stopCh <-chan struct{}) {
go wait.Until(c.worker, time.Second, stopCh)
}

for {
select {
case <-stopCh:
return
case nodeEvent := <-c.nodeEventsCh:
c.handleClusterNodeEvents(&nodeEvent)
go func() {
for {
select {
case <-stopCh:
return
case nodeEvent := <-c.nodeEventsCh:
c.handleClusterNodeEvents(&nodeEvent)
}
}
}()

// Rejoin Nodes periodically in case some Nodes are removed from the member list because of long downtime.
go func() {
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case <-stopCh:
return
case <-ticker.C:
c.RejoinNodes()
}
}
}()

<-stopCh
}

// RejoinNodes rejoins Nodes that were removed from the member list by memberlist because they were unreachable for more
// than 15 seconds (the GossipToTheDeadTime we are using). Without it, once there is a network downtime lasting more
// than 15 seconds, the agent wouldn't try to reach any other Node and would think it's the only alive Node until it's
// restarted.
func (c *Cluster) RejoinNodes() {
nodes, _ := c.nodeLister.List(labels.Everything())
aliveNodes := c.AliveNodes()
var membersToJoin []string
for _, node := range nodes {
if !aliveNodes.Has(node.Name) {
member, err := c.newClusterMember(node)
if err != nil {
klog.ErrorS(err, "Failed to generate cluster member to join", "Node", node.Name)
continue
}
membersToJoin = append(membersToJoin, member)
}
}
// Every known Node is alive, do nothing.
if len(membersToJoin) == 0 {
return
}
// The Join method returns an error only when none could be reached.
numSuccess, err := c.mList.Join(membersToJoin)
if err != nil {
klog.ErrorS(err, "Failed to rejoin any members", "members", membersToJoin)
} else if numSuccess != len(membersToJoin) {
klog.ErrorS(err, "Failed to rejoin some members", "members", membersToJoin, "numSuccess", numSuccess)
} else {
klog.InfoS("Rejoined all members", "members", membersToJoin)
}
}

func (c *Cluster) worker() {
Expand Down Expand Up @@ -419,12 +480,9 @@ func (c *Cluster) handleClusterNodeEvents(nodeEvent *memberlist.NodeEvent) {
// if the Node has failed, ExternalIPPools consistentHash maybe changed, and affected ExternalIPPool should be enqueued.
coreNode, err := c.nodeLister.Get(node.Name)
if err != nil {
if apierrors.IsNotFound(err) {
// Node has been deleted, and deleteNode handler has been executed.
klog.ErrorS(err, "Processing Node event, not found", "eventType", event)
return
}
klog.ErrorS(err, "Processing Node event, get Node failed", "eventType", event)
// It means the Node has been deleted, no further processing is needed as handleDeleteNode has enqueued
// related ExternalIPPools.
klog.InfoS("Received a Node event but did not find the Node object", "eventType", mapNodeEventType[event], "nodeName", node.Name)
return
}
affectedEIPs := c.filterEIPsFromNodeLabels(coreNode)
Expand Down
Loading