diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 5ce9e09f4a8..dc5d0a38eea 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -42,6 +42,7 @@ function generate_mocks { "pkg/agent/cniserver/ipam IPAMDriver testing" "pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing" "pkg/agent/interfacestore InterfaceStore testing" + "pkg/agent/memberlist MemberlistInterface testing" "pkg/agent/multicast RouteInterface testing" "pkg/agent/types McastNetworkPolicyController testing" "pkg/agent/nodeportlocal/portcache LocalPortOpener testing" diff --git a/pkg/agent/memberlist/cluster.go b/pkg/agent/memberlist/cluster.go index ee4d2c8de80..6ee8c879b04 100644 --- a/pkg/agent/memberlist/cluster.go +++ b/pkg/agent/memberlist/cluster.go @@ -88,13 +88,20 @@ type Interface interface { AddClusterEventHandler(handler ClusterNodeEventHandler) } +type Memberlist interface { + Join(existing []string) (int, error) + Members() []*memberlist.Node + Leave(timeout time.Duration) error + Shutdown() error +} + // Cluster implements ClusterInterface. type Cluster struct { bindPort int // Name of local Node. Node name must be unique in the cluster. nodeName string - mList *memberlist.Memberlist + mList Memberlist // consistentHash hold the consistentHashMap, when a Node join cluster, use method Add() to add a key to the hash. // when a Node leave the cluster, the consistentHashMap should be update. consistentHashMap map[string]*consistenthash.Map @@ -129,7 +136,7 @@ func NewCluster( nodeName string, nodeInformer coreinformers.NodeInformer, externalIPPoolInformer crdinformers.ExternalIPPoolInformer, - transport memberlist.Transport, // Parameterized for testing, could be left nil for production code. + ml Memberlist, // Parameterized for testing, could be left nil for production code. ) (*Cluster, error) { // The Node join/leave events will be notified via it. nodeEventCh := make(chan memberlist.NodeEvent, 1024) @@ -137,6 +144,7 @@ func NewCluster( bindPort: clusterBindPort, nodeName: nodeName, consistentHashMap: make(map[string]*consistenthash.Map), + mList: ml, nodeEventsCh: nodeEventCh, nodeInformer: nodeInformer, nodeLister: nodeInformer.Lister(), @@ -147,21 +155,22 @@ func NewCluster( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalIPPool"), } - conf := memberlist.DefaultLocalConfig() - conf.Name = c.nodeName - conf.Transport = transport - conf.BindPort = c.bindPort - conf.AdvertisePort = c.bindPort - conf.AdvertiseAddr = nodeIP.String() - conf.Events = &memberlist.ChannelEventDelegate{Ch: nodeEventCh} - conf.LogOutput = io.Discard - klog.V(1).InfoS("New memberlist cluster", "config", conf) - - mList, err := memberlist.Create(conf) - if err != nil { - return nil, fmt.Errorf("failed to create memberlist cluster: %v", err) + if ml == nil { + conf := memberlist.DefaultLocalConfig() + conf.Name = c.nodeName + conf.BindPort = c.bindPort + conf.AdvertisePort = c.bindPort + conf.AdvertiseAddr = nodeIP.String() + conf.Events = &memberlist.ChannelEventDelegate{Ch: nodeEventCh} + conf.LogOutput = io.Discard + klog.V(1).InfoS("New memberlist cluster", "config", conf) + + mList, err := memberlist.Create(conf) + if err != nil { + return nil, fmt.Errorf("failed to create memberlist cluster: %v", err) + } + c.mList = mList } - c.mList = mList nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -190,13 +199,16 @@ func NewCluster( func (c *Cluster) handleCreateNode(obj interface{}) { node := obj.(*corev1.Node) - if member, err := c.newClusterMember(node); err == nil { - _, err := c.mList.Join([]string{member}) - if err != nil { - klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member) + // Ignore the Node itself. + if node.Name != c.nodeName { + if member, err := c.newClusterMember(node); err == nil { + _, err := c.mList.Join([]string{member}) + if err != nil { + klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member) + } + } else { + klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name) } - } else { - klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name) } affectedEIPs := c.filterEIPsFromNodeLabels(node) @@ -263,8 +275,7 @@ func (c *Cluster) enqueueExternalIPPool(obj interface{}) { c.queue.Add(eip.Name) } -// newClusterMember gets the Node's IP and returns a cluster member ":" -// representing that Node in the memberlist cluster. +// newClusterMember gets the Node's IP and returns it as a cluster member for memberlist cluster to join. func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) { nodeAddrs, err := k8s.GetNodeAddrs(node) if err != nil { @@ -279,11 +290,7 @@ func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) { func (c *Cluster) filterEIPsFromNodeLabels(node *corev1.Node) sets.String { pools := sets.NewString() - eips, err := c.externalIPPoolLister.List(labels.Everything()) - if err != nil { - klog.ErrorS(err, "Filter ExternalIPPools from nodeLabels failed") - return pools - } + eips, _ := c.externalIPPoolLister.List(labels.Everything()) for _, eip := range eips { nodeSelector, _ := metav1.LabelSelectorAsSelector(&eip.Spec.NodeSelector) if nodeSelector.Matches(labels.Set(node.GetLabels())) { @@ -314,14 +321,64 @@ func (c *Cluster) Run(stopCh <-chan struct{}) { go wait.Until(c.worker, time.Second, stopCh) } - for { - select { - case <-stopCh: - return - case nodeEvent := <-c.nodeEventsCh: - c.handleClusterNodeEvents(&nodeEvent) + go func() { + for { + select { + case <-stopCh: + return + case nodeEvent := <-c.nodeEventsCh: + c.handleClusterNodeEvents(&nodeEvent) + } + } + }() + + // Rejoin Nodes periodically in case some Nodes are removed from the member list because of long downtime. + go func() { + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-stopCh: + return + case <-ticker.C: + c.RejoinNodes() + } + } + }() + + <-stopCh +} + +// RejoinNodes rejoins Nodes that were removed from the member list by memberlist because they were unreachable for more +// than 30 seconds (the default GossipToTheDeadTime of memberlist). Without it, once there is a network downtime lasting +// more than 30 seconds, the agent wouldn't try to reach any other Node and would think it's the only alive Node until +// it's restarted. +func (c *Cluster) RejoinNodes() { + nodes, _ := c.nodeLister.List(labels.Everything()) + aliveNodes := c.AliveNodes() + var membersToJoin []string + for _, node := range nodes { + if !aliveNodes.Has(node.Name) { + member, err := c.newClusterMember(node) + if err != nil { + klog.ErrorS(err, "Failed to generate cluster member to join", "Node", node.Name) + continue + } + membersToJoin = append(membersToJoin, member) } } + // Every known Node is alive, do nothing. + if len(membersToJoin) == 0 { + return + } + // The Join method returns an error only when none could be reached. + numSuccess, err := c.mList.Join(membersToJoin) + if err != nil { + klog.ErrorS(err, "Failed to rejoin any members", "members", membersToJoin) + } else if numSuccess != len(membersToJoin) { + klog.ErrorS(err, "Failed to rejoin some members", "members", membersToJoin, "numSuccess", numSuccess) + } else { + klog.InfoS("Rejoined all members", "members", membersToJoin) + } } func (c *Cluster) worker() { @@ -421,12 +478,9 @@ func (c *Cluster) handleClusterNodeEvents(nodeEvent *memberlist.NodeEvent) { // if the Node has failed, ExternalIPPools consistentHash maybe changed, and affected ExternalIPPool should be enqueued. coreNode, err := c.nodeLister.Get(node.Name) if err != nil { - if apierrors.IsNotFound(err) { - // Node has been deleted, and deleteNode handler has been executed. - klog.ErrorS(err, "Processing Node event, not found", "eventType", event) - return - } - klog.ErrorS(err, "Processing Node event, get Node failed", "eventType", event) + // It means the Node has been deleted, no further processing is needed as handleDeleteNode has enqueued + // related ExternalIPPools. + klog.InfoS("Received a Node event but did not find the Node object", "eventType", mapNodeEventType[event], "nodeName", node.Name) return } affectedEIPs := c.filterEIPsFromNodeLabels(coreNode) diff --git a/pkg/agent/memberlist/cluster_test.go b/pkg/agent/memberlist/cluster_test.go index d337657e464..785b76f5bd5 100644 --- a/pkg/agent/memberlist/cluster_test.go +++ b/pkg/agent/memberlist/cluster_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/hashicorp/memberlist" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -33,10 +34,12 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/consistenthash" + memberlisttest "antrea.io/antrea/pkg/agent/memberlist/testing" "antrea.io/antrea/pkg/apis" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/ip" ) type fakeCluster struct { @@ -45,19 +48,15 @@ type fakeCluster struct { crdClient *fakeversioned.Clientset } -func newFakeCluster(nodeConfig *config.NodeConfig, stopCh <-chan struct{}) (*fakeCluster, error) { - clientset := fake.NewSimpleClientset() +func newFakeCluster(nodeConfig *config.NodeConfig, stopCh <-chan struct{}, memberlist Memberlist, objs ...runtime.Object) (*fakeCluster, error) { + clientset := fake.NewSimpleClientset(objs...) informerFactory := informers.NewSharedInformerFactory(clientset, 0) nodeInformer := informerFactory.Core().V1().Nodes() - crdClient := fakeversioned.NewSimpleClientset([]runtime.Object{}...) + crdClient := fakeversioned.NewSimpleClientset() crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) ipPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() - ip := net.ParseIP("127.0.0.1") - // Use mock network to avoid port conflict with system network and any impact on the system network. - mockNetwork := &memberlist.MockNetwork{} - mockTransport := mockNetwork.NewTransport(nodeConfig.Name) - cluster, err := NewCluster(ip, apis.AntreaAgentClusterMembershipPort, nodeConfig.Name, nodeInformer, ipPoolInformer, mockTransport) + cluster, err := NewCluster(nodeConfig.NodeIPv4Addr.IP, apis.AntreaAgentClusterMembershipPort, nodeConfig.Name, nodeInformer, ipPoolInformer, memberlist) if err != nil { return nil, err } @@ -135,36 +134,58 @@ func TestCluster_Run(t *testing.T) { } for _, tCase := range testCases { t.Run(tCase.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + stoppedCh := make(chan struct{}) + defer func() { + // Make sure mock controller is closed after Run() finishes. + close(stopCh) + <-stoppedCh + controller.Finish() + }() + nodeConfig := &config.NodeConfig{ Name: localNodeName, NodeIPv4Addr: &net.IPNet{IP: net.IPv4(127, 0, 0, 1), Mask: net.IPv4Mask(255, 255, 255, 255)}, } - stopCh := make(chan struct{}) - defer close(stopCh) - fakeCluster, err := newFakeCluster(nodeConfig, stopCh) + mockMemberlist := memberlisttest.NewMockMemberlistInterface(controller) + fakeCluster, err := newFakeCluster(nodeConfig, stopCh, mockMemberlist) if err != nil { t.Fatalf("New fake memberlist server error: %v", err) } - eip := tCase.externalIPPool - assert.NoError(t, createExternalIPPool(fakeCluster.crdClient, eip)) + mockMemberlist.EXPECT().Leave(time.Second) + mockMemberlist.EXPECT().Shutdown() + mockMemberlist.EXPECT().Members().Return([]*memberlist.Node{ + {Name: localNodeName}, + }).AnyTimes() + + assert.NoError(t, createExternalIPPool(fakeCluster.crdClient, tCase.externalIPPool)) assert.NoError(t, createNode(fakeCluster.clientSet, tCase.localNode)) - go fakeCluster.cluster.Run(stopCh) + go func() { + defer close(stoppedCh) + fakeCluster.cluster.Run(stopCh) + }() - tCase.egress.Spec.ExternalIPPool = eip.Name assert.NoError(t, wait.Poll(100*time.Millisecond, time.Second, func() (done bool, err error) { - res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, eip.Name) + res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, tCase.externalIPPool.Name) return err == nil && res == tCase.expectEgressSelectResult, nil }), "select Node result for Egress does not match") - assert.Equal(t, 1, fakeCluster.cluster.mList.NumMembers(), "expected alive Node num is 1") }) } } func TestCluster_RunClusterEvents(t *testing.T) { + controller := gomock.NewController(t) stopCh := make(chan struct{}) - defer close(stopCh) + stoppedCh := make(chan struct{}) + defer func() { + // Make sure mock controller is closed after Run() finishes. + close(stopCh) + <-stoppedCh + controller.Finish() + }() nodeName := "localNodeName" nodeConfig := &config.NodeConfig{ @@ -183,8 +204,8 @@ func TestCluster_RunClusterEvents(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "fakeEgress1", UID: "fakeUID1"}, Spec: crdv1a2.EgressSpec{ExternalIPPool: fakeEIP1.Name, EgressIP: "1.1.1.2"}, } - - fakeCluster, err := newFakeCluster(nodeConfig, stopCh) + mockMemberlist := memberlisttest.NewMockMemberlistInterface(controller) + fakeCluster, err := newFakeCluster(nodeConfig, stopCh, mockMemberlist) if err != nil { t.Fatalf("New fake memberlist server error: %v", err) } @@ -193,42 +214,45 @@ func TestCluster_RunClusterEvents(t *testing.T) { t.Logf("Detected cluster Node event, running fake handler, obj: %s", objName) }) + mockMemberlist.EXPECT().Leave(time.Second) + mockMemberlist.EXPECT().Shutdown() + mockMemberlist.EXPECT().Members().Return([]*memberlist.Node{ + {Name: nodeName}, + }).AnyTimes() // Create local Node and ExternalIPPool. assert.NoError(t, createNode(fakeCluster.clientSet, localNode)) assert.NoError(t, createExternalIPPool(fakeCluster.crdClient, fakeEIP1)) - go fakeCluster.cluster.Run(stopCh) + go func() { + defer close(stoppedCh) + fakeCluster.cluster.Run(stopCh) + }() // Test updating Node labels. testCasesUpdateNode := []struct { name string expectEgressSelectResult bool newNodeLabels map[string]string - egress *crdv1a2.Egress }{ { name: "Update Node with the same labels then local Node should not be selected", expectEgressSelectResult: false, newNodeLabels: localNode.Labels, - egress: fakeEgress1, }, { name: "Update Node with matched labels then local Node should be selected", expectEgressSelectResult: true, newNodeLabels: map[string]string{"env": "pro"}, - egress: fakeEgress1, }, { name: "Update Node with different but matched labels then local Node should be selected", expectEgressSelectResult: true, newNodeLabels: map[string]string{"env": "pro", "env1": "test"}, - egress: fakeEgress1, }, { name: "Update Node with not matched labels then local Node should not be selected", expectEgressSelectResult: false, newNodeLabels: map[string]string{"env": "test"}, - egress: fakeEgress1, }, } updateNode := func(node *v1.Node) { @@ -242,7 +266,7 @@ func TestCluster_RunClusterEvents(t *testing.T) { localNode.Labels = tCase.newNodeLabels updateNode(localNode) assert.NoError(t, wait.Poll(100*time.Millisecond, time.Second, func() (done bool, err error) { - res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, tCase.egress.Spec.ExternalIPPool) + res, err := fakeCluster.cluster.ShouldSelectIP(fakeEgress1.Spec.EgressIP, fakeEgress1.Spec.ExternalIPPool) return err == nil && res == tCase.expectEgressSelectResult, nil }), "select Node result for Egress does not match") }) @@ -371,6 +395,7 @@ func TestCluster_RunClusterEvents(t *testing.T) { assertEgressSelectResult(fakeEgress2, false, true) assertEgressSelectResult(fakeEgress1, false, false) + mockMemberlist.EXPECT().Join([]string{"1.1.1.1"}) // Test creating Node with valid IP. fakeNode1 := &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: "fakeNode1"}, @@ -615,3 +640,44 @@ func BenchmarkCluster_ShouldSelect(b *testing.B) { }) } } + +func TestCluster_RejoinNodes(t *testing.T) { + localNodeConfig := &config.NodeConfig{ + Name: "node1", + NodeIPv4Addr: ip.MustParseCIDR("10.0.0.1/24"), + } + node1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.1"}}}, + } + node2 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node2"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.2"}}}, + } + node3 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node3"}, + Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.3"}}}, + } + stopCh := make(chan struct{}) + defer close(stopCh) + controller := gomock.NewController(t) + defer controller.Finish() + mockMemberlist := memberlisttest.NewMockMemberlistInterface(controller) + mockMemberlist.EXPECT().Join([]string{"10.0.0.2"}) + mockMemberlist.EXPECT().Join([]string{"10.0.0.3"}) + fakeCluster, _ := newFakeCluster(localNodeConfig, stopCh, mockMemberlist, node1, node2, node3) + + mockMemberlist.EXPECT().Members().Return([]*memberlist.Node{ + {Name: "node1"}, + {Name: "node2"}, + }) + mockMemberlist.EXPECT().Join([]string{"10.0.0.3"}) + fakeCluster.cluster.RejoinNodes() + + mockMemberlist.EXPECT().Members().Return([]*memberlist.Node{ + {Name: "node1"}, + {Name: "node3"}, + }) + mockMemberlist.EXPECT().Join([]string{"10.0.0.2"}) + fakeCluster.cluster.RejoinNodes() +} diff --git a/pkg/agent/memberlist/testing/mock_memberlist.go b/pkg/agent/memberlist/testing/mock_memberlist.go new file mode 100644 index 00000000000..fa002e288df --- /dev/null +++ b/pkg/agent/memberlist/testing/mock_memberlist.go @@ -0,0 +1,107 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: antrea.io/antrea/pkg/agent/memberlist (interfaces: Memberlist) + +// Package testing is a generated GoMock package. +package testing + +import ( + gomock "github.com/golang/mock/gomock" + memberlist "github.com/hashicorp/memberlist" + reflect "reflect" + time "time" +) + +// MockMemberlistInterface is a mock of Memberlist interface +type MockMemberlistInterface struct { + ctrl *gomock.Controller + recorder *MockMemberlistInterfaceMockRecorder +} + +// MockMemberlistInterfaceMockRecorder is the mock recorder for MockMemberlistInterface +type MockMemberlistInterfaceMockRecorder struct { + mock *MockMemberlistInterface +} + +// NewMockMemberlistInterface creates a new mock instance +func NewMockMemberlistInterface(ctrl *gomock.Controller) *MockMemberlistInterface { + mock := &MockMemberlistInterface{ctrl: ctrl} + mock.recorder = &MockMemberlistInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockMemberlistInterface) EXPECT() *MockMemberlistInterfaceMockRecorder { + return m.recorder +} + +// Join mocks base method +func (m *MockMemberlistInterface) Join(arg0 []string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Join", arg0) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Join indicates an expected call of Join +func (mr *MockMemberlistInterfaceMockRecorder) Join(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockMemberlistInterface)(nil).Join), arg0) +} + +// Leave mocks base method +func (m *MockMemberlistInterface) Leave(arg0 time.Duration) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Leave", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Leave indicates an expected call of Leave +func (mr *MockMemberlistInterfaceMockRecorder) Leave(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Leave", reflect.TypeOf((*MockMemberlistInterface)(nil).Leave), arg0) +} + +// Members mocks base method +func (m *MockMemberlistInterface) Members() []*memberlist.Node { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Members") + ret0, _ := ret[0].([]*memberlist.Node) + return ret0 +} + +// Members indicates an expected call of Members +func (mr *MockMemberlistInterfaceMockRecorder) Members() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Members", reflect.TypeOf((*MockMemberlistInterface)(nil).Members)) +} + +// Shutdown mocks base method +func (m *MockMemberlistInterface) Shutdown() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Shutdown") + ret0, _ := ret[0].(error) + return ret0 +} + +// Shutdown indicates an expected call of Shutdown +func (mr *MockMemberlistInterfaceMockRecorder) Shutdown() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockMemberlistInterface)(nil).Shutdown)) +}