Skip to content

Commit

Permalink
Fix single rule deletion for NodePortLocal on Linux (antrea-io#6284)
Browse files Browse the repository at this point in the history
The logic for deleting an individual NPL mapping was broken. It
incorrectly believed that the protocol socket was still in use, and the
mapping could never be deleted, putting the NPL controller in an endless
error loop.

The State field in ProtocolSocketData was left over from pre Antrea
v1.7, back when we would always use the same port number for multiple
protocols, for a give Pod IP + port. With the current version of the NPL
implementation, this field is not needed and should be removed. By
removing the field, we avoid the deletion issue.

This patch also ensures that if a rule is only partially cleaned-up, we
can attempt to delete it again, by making DeleteRule idempotent. To
identify that a prior deletion attempt failed, we introduce a "defunct"
field in the NPL rule data. If this field is set, the controller knows
that the rule has been partially deleted and deletion needs to be
attempted again. Without this, it would be possible for the controller
(with the right sequence of updates) to assume that a partially-deleted
rule is still valid, which would break the datapath. I plan on improving
the NPL code further with a follow-up patch, but in order to keep this
patch small (for back-porting), I went with the simplest solution I
could think of.

Fixes antrea-io#6281

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed May 7, 2024
1 parent 6cc443a commit 46169a3
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 138 deletions.
22 changes: 12 additions & 10 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,13 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
}
podPorts[targetPortProto] = struct{}{}
portData := c.portTable.GetEntry(podIP, port, protocol)
if portData != nil && !portData.ProtocolInUse(protocol) {
// If the PortTable has an entry for the Pod but does not have an
// entry with protocol, we enforce AddRule for the missing Protocol.
// Special handling for a rule that was previously marked for deletion but could not
// be deleted properly: we have to retry now.
if portData != nil && portData.Defunct() {
klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol)
if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil {
return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err)
}
portData = nil
}
if portData == nil {
Expand Down Expand Up @@ -527,13 +531,11 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
// second, delete any existing rule that is not needed based on the current Pod
// specification.
entries := c.portTable.GetDataForPodIP(podIP)
if nplExists {
for _, data := range entries {
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %v", podIP, data.PodPort, proto.Protocol, err)
}
for _, data := range entries {
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err)
}
}
}
Expand Down
191 changes: 161 additions & 30 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -176,11 +177,12 @@ func getTestSvcWithPortName(portName string) *corev1.Service {

type testData struct {
*testing.T
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
wg sync.WaitGroup
stopCh chan struct{}
ctrl *gomock.Controller
k8sClient *k8sfake.Clientset
portTable *portcache.PortTable
svcInformer cache.SharedIndexInformer
wg sync.WaitGroup
}

func (t *testData) runWrapper(c *k8s.NPLController) {
Expand Down Expand Up @@ -234,30 +236,35 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any(), gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil)
}

data := &testData{
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sfake.NewSimpleClientset(objects...),
portTable: newPortTable(mockIPTables, mockPortOpener),
}
k8sClient := k8sfake.NewSimpleClientset(objects...)

portTable := newPortTable(mockIPTables, mockPortOpener)

resyncPeriod := 0 * time.Minute
// informerFactory is initialized and started from cmd/antrea-agent/agent.go
informerFactory := informers.NewSharedInformerFactory(data.k8sClient, resyncPeriod)
informerFactory := informers.NewSharedInformerFactory(k8sClient, resyncPeriod)
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", defaultNodeName).String()
}
localPodInformer := coreinformers.NewFilteredPodInformer(
data.k8sClient,
k8sClient,
metav1.NamespaceAll,
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController.
listOptions,
)
svcInformer := informerFactory.Core().V1().Services().Informer()

c := k8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName)
c := k8s.NewNPLController(k8sClient, localPodInformer, svcInformer, portTable, defaultNodeName)

data := &testData{
T: t,
stopCh: make(chan struct{}),
ctrl: mockCtrl,
k8sClient: k8sClient,
portTable: portTable,
svcInformer: svcInformer,
}

data.runWrapper(c)
informerFactory.Start(data.stopCh)
Expand Down Expand Up @@ -305,31 +312,41 @@ func (t *testData) tearDown() {
os.Unsetenv("NODE_NAME")
}

func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) {
var data string
var exists bool
func conditionMatchAll([]types.NPLAnnotation) bool {
return true
}

// If conditionFn is nil, we will assume you are looking for a non-existing annotation.
// If you want to match all, use conditionMatchAll as the conditionFn.
func (t *testData) pollForPodAnnotationWithCondition(podName string, conditionFn func([]types.NPLAnnotation) bool) ([]types.NPLAnnotation, error) {
var nplValue []types.NPLAnnotation
// do not use PollImmediate: 1 second is reserved for the controller to do his job and
// update Pod NPL annotations as needed.
err := wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := t.k8sClient.CoreV1().Pods(defaultNS).Get(context.TODO(), podName, metav1.GetOptions{})
require.NoError(t, err, "Failed to get Pod")
annotation := updatedPod.GetAnnotations()
data, exists = annotation[types.NPLAnnotationKey]
if found {
return exists, nil
data, exists := annotation[types.NPLAnnotationKey]
if !exists {
return conditionFn == nil, nil
}
return !exists, nil
if conditionFn == nil {
return false, nil
}
if err := json.Unmarshal([]byte(data), &nplValue); err != nil {
return false, err
}
return conditionFn(nplValue), nil
})
return nplValue, err
}

if err != nil {
return []types.NPLAnnotation{}, err
}
if data == "" {
return []types.NPLAnnotation{}, nil
func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) {
var conditionFn func([]types.NPLAnnotation) bool
if found {
conditionFn = conditionMatchAll
}
var nplValue []types.NPLAnnotation
err = json.Unmarshal([]byte(data), &nplValue)
return nplValue, err
return t.pollForPodAnnotationWithCondition(podName, conditionFn)
}

func (t *testData) updateServiceOrFail(testSvc *corev1.Service) {
Expand Down Expand Up @@ -497,6 +514,7 @@ func TestPodDelete(t *testing.T) {

// TestPodAddMultiPort creates a Pod and a Service with two target ports.
// It verifies that the Pod's NPL annotation and the local port table are updated with both ports.
// It then updates the Service to remove one of the target ports.
func TestAddMultiPortPodSvc(t *testing.T) {
newPort := 90
testSvc := getTestSvc(defaultPort, int32(newPort))
Expand All @@ -510,6 +528,16 @@ func TestAddMultiPortPodSvc(t *testing.T) {
expectedAnnotations.Check(t, value)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))

// Remove the second target port.
testSvc.Spec.Ports = testSvc.Spec.Ports[:1]
testData.updateServiceOrFail(testSvc)
// Wait for annotation to be updated (single mapping).
value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 })
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))
}

// TestPodAddMultiPort creates a Pod with multiple ports and a Service with only one target port.
Expand Down Expand Up @@ -807,3 +835,106 @@ func TestSyncRulesError(t *testing.T) {
testData, _, _ := setUpWithTestServiceAndPod(t, testConfig, nil)
defer testData.tearDown()
}

func TestSingleRuleDeletionError(t *testing.T) {
newPort := 90
testSvc := getTestSvc(defaultPort, int32(newPort))
testPod := getTestPod()

testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) {
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
gomock.InOrder(
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("iptables failure")),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()),
)
})

testData := setUp(t, testConfig, testSvc, testPod)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))

// Remove the second target port, to force one mapping to be deleted.
testSvc.Spec.Ports = testSvc.Spec.Ports[:1]
testData.updateServiceOrFail(testSvc)
// The first deletion attempt will fail, but the second should succeed.
// Wait for annotation to be updated (single mapping).
value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 })
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))
}

func TestPreventDefunctRuleReuse(t *testing.T) {
newPort := 90
testSvc := getTestSvc(defaultPort, int32(newPort))
testPod := getTestPod()

var testData *testData

ports := testSvc.Spec.Ports
// This function will be executed synchronously when DeleteRule is called for the first time
// and we simulate a failure. It restores the second target port for the Service, which was
// deleted previously, and waits for the change to be reflected in the informer's
// store. After that, we know that the next time the NPL controller processes the test Pod,
// it will need to ensure that both NPL mappings are configured correctly. Because one of
// the rules will be marked as "defunct", it will first need to delete the rule properly
// before adding it back.
restoreServiceTargetPorts := func() {
testSvc.Spec.Ports = ports
_, err := testData.k8sClient.CoreV1().Services(defaultNS).Update(context.TODO(), testSvc, metav1.UpdateOptions{})
if !assert.NoError(t, err) {
return
}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
obj, exists, err := testData.svcInformer.GetIndexer().GetByKey(testSvc.Namespace + "/" + testSvc.Name)
if !assert.NoError(t, err) || !assert.True(t, exists) {
return
}
svc := obj.(*corev1.Service)
assert.Len(t, svc.Spec.Ports, 2)
}, 2*time.Second, 50*time.Millisecond)
}

var done atomic.Bool

testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) {
mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes()
gomock.InOrder(
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(nodePort int, podIP string, podPort int, protocol string) { restoreServiceTargetPorts() },
).Return(fmt.Errorf("iptables failure")),
mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()),
mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(nodePort int, podIP string, podPort int, protocol string) { done.Store(true) },
),
)
})

testData = setUp(t, testConfig, testSvc, testPod)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP)
expectedAnnotations.Check(t, value)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP))

// Remove the second target port, to force one mapping to be deleted.
testSvc.Spec.Ports = testSvc.Spec.Ports[:1]
testData.updateServiceOrFail(testSvc)

assert.Eventually(t, done.Load, 2*time.Second, 50*time.Millisecond)
assert.Eventually(t, func() bool {
return testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP) && testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)
}, 2*time.Second, 50*time.Millisecond)
}
26 changes: 9 additions & 17 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,8 @@ const (
PodIPIndex = "podIPIndex"
)

// protocolSocketState represents the state of the socket corresponding to a
// given (Node port, protocol) tuple.
type protocolSocketState int

type ProtocolSocketData struct {
Protocol string
State protocolSocketState
socket io.Closer
}

Expand All @@ -47,14 +42,13 @@ type NodePortData struct {
PodPort int
PodIP string
Protocol ProtocolSocketData
// defunct is used to indicate that a rule has been partially deleted: it is no longer
// usable and deletion needs to be re-attempted.
defunct bool
}

func (d *NodePortData) ProtocolInUse(protocol string) bool {
protocolSocketData := d.Protocol
if protocolSocketData.Protocol == protocol {
return protocolSocketData.State == stateInUse
}
return false
func (d *NodePortData) Defunct() bool {
return d.defunct
}

type LocalPortOpener interface {
Expand Down Expand Up @@ -204,8 +198,8 @@ func podIPPortProtoFormat(ip string, port int, protocol string) string {
}

func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData {
data, exists := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol))
if exists == false {
data, ok := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol))
if !ok {
return nil
}
return data
Expand All @@ -214,10 +208,8 @@ func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol stri
func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool {
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
if data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol); data != nil {
return data.ProtocolInUse(protocol)
}
return false
data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol)
return data != nil
}

// nodePortProtoFormat formats the nodeport, protocol to string port:protocol.
Expand Down
Loading

0 comments on commit 46169a3

Please sign in to comment.