Skip to content

Commit

Permalink
Do not apply Egress to traffic destined for ServiceCIDRs
Browse files Browse the repository at this point in the history
When AntreaProxy is asked to skip some Services or is not running at
all, Pod-to-Service traffic would be forwarded to Egress Node and be
load-balanced remotely, as opposed to locally, which could incur
performance issue and unexpected behaviors.

This patch installs flows to prevent traffic destined for ServiceCIDRs
from being SNAT'd.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Sep 27, 2023
1 parent f6ff2d6 commit cb64db3
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func run(o *Options) error {
if o.enableEgress {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down
52 changes: 52 additions & 0 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/types"
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
Expand Down Expand Up @@ -147,6 +148,11 @@ type EgressController struct {
ipAssigner ipassigner.IPAssigner

egressIPScheduler *egressIPScheduler

serviceCIDRInterface servicecidr.Interface
serviceCIDRUpdateCh chan struct{}
// Declared for testing.
serviceCIDRUpdateRetryDelay time.Duration
}

func NewEgressController(
Expand All @@ -161,6 +167,7 @@ func NewEgressController(
egressInformer crdinformers.EgressInformer,
nodeInformers coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
serviceCIDRInterface servicecidr.Interface,
maxEgressIPsPerNode int,
) (*EgressController, error) {
c := &EgressController{
Expand All @@ -181,6 +188,10 @@ func NewEgressController(
localIPDetector: ipassigner.NewLocalIPDetector(),
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
serviceCIDRInterface: serviceCIDRInterface,
// One buffer is enough as we just use it to ensure the target handler is executed once.
serviceCIDRUpdateCh: make(chan struct{}, 1),
serviceCIDRUpdateRetryDelay: 10 * time.Second,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -214,6 +225,7 @@ func NewEgressController(
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule)
c.serviceCIDRInterface.AddEventHandler(c.onServiceCIDRUpdate)
return c, nil
}

Expand All @@ -222,6 +234,44 @@ func (c *EgressController) onEgressIPSchedule(egress string) {
c.queue.Add(egress)
}

// onServiceCIDRUpdate will be called when ServiceCIDRs change.
// It ensures updateServiceCIDRs will be executed once after this call.
func (c *EgressController) onServiceCIDRUpdate(_ []*net.IPNet) {
select {
case c.serviceCIDRUpdateCh <- struct{}{}:
default:
// The previous event is not processed yet, discard the new event.
}
}

func (c *EgressController) updateServiceCIDRs(stopCh <-chan struct{}) {
timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // Consume the first tick.
for {
select {
case <-stopCh:
return
case <-c.serviceCIDRUpdateCh:
klog.V(2).InfoS("Received service CIDR update")
case <-timer.C:
klog.V(2).InfoS("Service CIDR update timer expired")
}
serviceCIDRs, err := c.serviceCIDRInterface.GetServiceCIDRs()
if err != nil {
klog.ErrorS(err, "Failed to get Service CIDRs")
// No need to retry in this case as the Service CIDRs won't be available until it receives a service CIDRs update.
continue
}
err = c.ofClient.InstallSNATBypassServiceFlows(serviceCIDRs)
if err != nil {
klog.ErrorS(err, "Failed to install SNAT bypass flows for Service CIDRs, will retry", "serviceCIDRs", serviceCIDRs)
// Schedule a retry as it should be transient error.
timer.Reset(c.serviceCIDRUpdateRetryDelay)
}
}
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(e interface{}) {
Expand Down Expand Up @@ -314,6 +364,8 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {

go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh)

go c.updateServiceCIDRs(stopCh)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
Expand Down
87 changes: 69 additions & 18 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
Expand All @@ -49,6 +50,7 @@ import (
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -128,14 +130,15 @@ func mockNewIPAssigner(ipAssigner ipassigner.IPAssigner) func() {

type fakeController struct {
*EgressController
mockController *gomock.Controller
mockOFClient *openflowtest.MockClient
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.SubscribableChannel
mockController *gomock.Controller
mockOFClient *openflowtest.MockClient
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
mockServiceCIDRInterface *servicecidrtest.MockInterface
podUpdateChannel *channel.SubscribableChannel
}

func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController {
Expand Down Expand Up @@ -163,7 +166,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
addPodInterface(ifaceStore, "ns4", "pod4", 4)

podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)

mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller)
mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any())
egressController, _ := NewEgressController(mockOFClient,
&antreaClientGetter{clientset},
crdClient,
Expand All @@ -175,19 +179,21 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
egressInformer,
nodeInformer,
podUpdateChannel,
mockServiceCIDRProvider,
255,
)
egressController.localIPDetector = localIPDetector
return &fakeController{
EgressController: egressController,
mockController: controller,
mockOFClient: mockOFClient,
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
EgressController: egressController,
mockController: controller,
mockOFClient: mockOFClient,
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
mockServiceCIDRInterface: mockServiceCIDRProvider,
podUpdateChannel: podUpdateChannel,
}
}

Expand Down Expand Up @@ -1093,6 +1099,51 @@ func TestGetEgressIPByMark(t *testing.T) {
}
}

func TestUpdateServiceCIDRs(t *testing.T) {
c := newFakeController(t, nil)
stopCh := make(chan struct{})
defer close(stopCh)
// Retry immediately.
c.serviceCIDRUpdateRetryDelay = 0

serviceCIDRs := []*net.IPNet{
ip.MustParseCIDR("10.96.0.0/16"),
ip.MustParseCIDR("1096::/64"),
}
assert.Len(t, c.serviceCIDRUpdateCh, 0)
// Call the handler the 1st time, it should enqueue an event.
c.onServiceCIDRUpdate(serviceCIDRs)
assert.Len(t, c.serviceCIDRUpdateCh, 1)
// Call the handler the 2nd time, it should not block and should discard the event.
c.onServiceCIDRUpdate(serviceCIDRs)
assert.Len(t, c.serviceCIDRUpdateCh, 1)

// In the 1st round, returning the ServiceCIDRs fails, it should not retry.
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(nil, fmt.Errorf("not initialized"))

go c.updateServiceCIDRs(stopCh)

// Wait for the event to be processed.
require.Eventually(t, func() bool {
return len(c.serviceCIDRUpdateCh) == 0
}, time.Second, 100*time.Millisecond)
// In the 2nd round, returning the ServiceCIDR succeeds but installing flows fails, it should retry.
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil)
c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Return(fmt.Errorf("transient error"))
// In the 3rd round, both succeed.
finishCh := make(chan struct{})
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil)
c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Do(func(_ []*net.IPNet) { close(finishCh) }).Return(nil)
// Enqueue only one event as the 2nd failure is supposed to trigger a retry.
c.onServiceCIDRUpdate(serviceCIDRs)

select {
case <-finishCh:
case <-time.After(time.Second):
t.Errorf("InstallSNATBypassServiceFlows didn't succeed in time")
}
}

func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) {
t.Logf("queue len %d", queue.Len())
require.Eventually(t, func() bool {
Expand Down
18 changes: 17 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ type Client interface {
// are removed from PolicyRule.From, else from PolicyRule.To.
DeletePolicyRuleAddress(ruleID uint32, addrType types.AddressType, addresses []types.Address, priority *uint16) error

// InstallSNATBypassServiceFlows installs flows to prevent traffic destined for the specified Service CIDRs from
// being SNAT'd. Otherwise, such Pod-to-Service traffic would be forwarded to Egress Node and be load-balanced
// remotely, as opposed to locally, when AntreaProxy is asked to skip some Services or is not running at all.
// Calling the method with new CIDRs will override the flows installed for previous CIDRs.
InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error

// InstallSNATMarkFlows installs flows for a local SNAT IP. On Linux, a
// single flow is added to mark the packets tunnelled from remote Nodes
// that should be SNAT'd with the SNAT IP.
Expand All @@ -144,7 +150,7 @@ type Client interface {

// InstallPodSNATFlows installs the SNAT flows for a local Pod. If the
// SNAT IP for the Pod is on the local Node, a non-zero SNAT ID should
// allocated for the SNAT IP, and the installed flow sets the SNAT IP
// be allocated for the SNAT IP, and the installed flow sets the SNAT IP
// mark on the egress packets from the ofPort; if the SNAT IP is on a
// remote Node, snatMark should be set to 0, and the installed flow
// tunnels egress packets to the remote Node using the SNAT IP as the
Expand Down Expand Up @@ -980,6 +986,16 @@ func (c *client) generatePipelines() {
}
}

func (c *client) InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error {
var flows []binding.Flow
for _, serviceCIDR := range serviceCIDRs {
flows = append(flows, c.featureEgress.snatSkipCIDRFlow(*serviceCIDR))
}
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.modifyFlows(c.featureEgress.cachedFlows, "svc-cidrs", flows)
}

func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error {
flow := c.featureEgress.snatIPFromTunnelFlow(snatIP, mark)
cacheKey := fmt.Sprintf("s%x", mark)
Expand Down
81 changes: 81 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,87 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}

func Test_client_InstallSNATBypassServiceFlows(t *testing.T) {
testCases := []struct {
name string
serviceCIDRs []*net.IPNet
newServiceCIDRs []*net.IPNet
expectedFlows []string
expectedNewFlows []string
}{
{
name: "IPv4",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/24"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/16"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "IPv6",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("1096::/80"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("1096::/64"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "dual-stack",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/24"),
utilip.MustParseCIDR("1096::/80"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/16"),
utilip.MustParseCIDR("1096::/64"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.serviceCIDRs))
fCacheI, ok := fc.featureEgress.cachedFlows.Load("svc-cidrs")
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))

m.EXPECT().BundleOps(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.newServiceCIDRs))
fCacheI, ok = fc.featureEgress.cachedFlows.Load("svc-cidrs")
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedNewFlows, getFlowStrings(fCacheI))
})
}
}

func Test_client_InstallSNATMarkFlows(t *testing.T) {
mark := uint32(100)

Expand Down
Loading

0 comments on commit cb64db3

Please sign in to comment.