Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #5017: Avoid ServiceCIDR flapping on agent start #5495: Do not apply Egress to traffic destined for ServiceCIDRs #5530

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ func run(o *Options) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Must start after registering all event handlers.
go serviceCIDRProvider.Run(stopCh)

// Get all available NodePort addresses.
var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP
if o.config.AntreaProxy.ProxyAll {
Expand Down Expand Up @@ -488,7 +491,7 @@ func run(o *Options) error {
if o.enableEgress {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down
52 changes: 52 additions & 0 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/types"
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
Expand Down Expand Up @@ -147,6 +148,11 @@ type EgressController struct {
ipAssigner ipassigner.IPAssigner

egressIPScheduler *egressIPScheduler

serviceCIDRInterface servicecidr.Interface
serviceCIDRUpdateCh chan struct{}
// Declared for testing.
serviceCIDRUpdateRetryDelay time.Duration
}

func NewEgressController(
Expand All @@ -161,6 +167,7 @@ func NewEgressController(
egressInformer crdinformers.EgressInformer,
nodeInformers coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
serviceCIDRInterface servicecidr.Interface,
maxEgressIPsPerNode int,
) (*EgressController, error) {
c := &EgressController{
Expand All @@ -181,6 +188,10 @@ func NewEgressController(
localIPDetector: ipassigner.NewLocalIPDetector(),
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
serviceCIDRInterface: serviceCIDRInterface,
// One buffer is enough as we just use it to ensure the target handler is executed once.
serviceCIDRUpdateCh: make(chan struct{}, 1),
serviceCIDRUpdateRetryDelay: 10 * time.Second,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -214,6 +225,7 @@ func NewEgressController(
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule)
c.serviceCIDRInterface.AddEventHandler(c.onServiceCIDRUpdate)
return c, nil
}

Expand All @@ -222,6 +234,44 @@ func (c *EgressController) onEgressIPSchedule(egress string) {
c.queue.Add(egress)
}

// onServiceCIDRUpdate will be called when ServiceCIDRs change.
// It ensures updateServiceCIDRs will be executed once after this call.
func (c *EgressController) onServiceCIDRUpdate(_ []*net.IPNet) {
select {
case c.serviceCIDRUpdateCh <- struct{}{}:
default:
// The previous event is not processed yet, discard the new event.
}
}

func (c *EgressController) updateServiceCIDRs(stopCh <-chan struct{}) {
timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // Consume the first tick.
for {
select {
case <-stopCh:
return
case <-c.serviceCIDRUpdateCh:
klog.V(2).InfoS("Received service CIDR update")
case <-timer.C:
klog.V(2).InfoS("Service CIDR update timer expired")
}
serviceCIDRs, err := c.serviceCIDRInterface.GetServiceCIDRs()
if err != nil {
klog.ErrorS(err, "Failed to get Service CIDRs")
// No need to retry in this case as the Service CIDRs won't be available until it receives a service CIDRs update.
continue
}
err = c.ofClient.InstallSNATBypassServiceFlows(serviceCIDRs)
if err != nil {
klog.ErrorS(err, "Failed to install SNAT bypass flows for Service CIDRs, will retry", "serviceCIDRs", serviceCIDRs)
// Schedule a retry as it should be transient error.
timer.Reset(c.serviceCIDRUpdateRetryDelay)
}
}
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(e interface{}) {
Expand Down Expand Up @@ -314,6 +364,8 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {

go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh)

go c.updateServiceCIDRs(stopCh)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
Expand Down
87 changes: 69 additions & 18 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
Expand All @@ -49,6 +50,7 @@ import (
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -128,14 +130,15 @@ func mockNewIPAssigner(ipAssigner ipassigner.IPAssigner) func() {

type fakeController struct {
*EgressController
mockController *gomock.Controller
mockOFClient *openflowtest.MockClient
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.SubscribableChannel
mockController *gomock.Controller
mockOFClient *openflowtest.MockClient
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
mockServiceCIDRInterface *servicecidrtest.MockInterface
podUpdateChannel *channel.SubscribableChannel
}

func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController {
Expand Down Expand Up @@ -163,7 +166,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
addPodInterface(ifaceStore, "ns4", "pod4", 4)

podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)

mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller)
mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any())
egressController, _ := NewEgressController(mockOFClient,
&antreaClientGetter{clientset},
crdClient,
Expand All @@ -175,19 +179,21 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
egressInformer,
nodeInformer,
podUpdateChannel,
mockServiceCIDRProvider,
255,
)
egressController.localIPDetector = localIPDetector
return &fakeController{
EgressController: egressController,
mockController: controller,
mockOFClient: mockOFClient,
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
EgressController: egressController,
mockController: controller,
mockOFClient: mockOFClient,
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
mockServiceCIDRInterface: mockServiceCIDRProvider,
podUpdateChannel: podUpdateChannel,
}
}

Expand Down Expand Up @@ -1093,6 +1099,51 @@ func TestGetEgressIPByMark(t *testing.T) {
}
}

func TestUpdateServiceCIDRs(t *testing.T) {
c := newFakeController(t, nil)
stopCh := make(chan struct{})
defer close(stopCh)
// Retry immediately.
c.serviceCIDRUpdateRetryDelay = 0

serviceCIDRs := []*net.IPNet{
ip.MustParseCIDR("10.96.0.0/16"),
ip.MustParseCIDR("1096::/64"),
}
assert.Len(t, c.serviceCIDRUpdateCh, 0)
// Call the handler the 1st time, it should enqueue an event.
c.onServiceCIDRUpdate(serviceCIDRs)
assert.Len(t, c.serviceCIDRUpdateCh, 1)
// Call the handler the 2nd time, it should not block and should discard the event.
c.onServiceCIDRUpdate(serviceCIDRs)
assert.Len(t, c.serviceCIDRUpdateCh, 1)

// In the 1st round, returning the ServiceCIDRs fails, it should not retry.
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(nil, fmt.Errorf("not initialized"))

go c.updateServiceCIDRs(stopCh)

// Wait for the event to be processed.
require.Eventually(t, func() bool {
return len(c.serviceCIDRUpdateCh) == 0
}, time.Second, 100*time.Millisecond)
// In the 2nd round, returning the ServiceCIDR succeeds but installing flows fails, it should retry.
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil)
c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Return(fmt.Errorf("transient error"))
// In the 3rd round, both succeed.
finishCh := make(chan struct{})
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil)
c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Do(func(_ []*net.IPNet) { close(finishCh) }).Return(nil)
// Enqueue only one event as the 2nd failure is supposed to trigger a retry.
c.onServiceCIDRUpdate(serviceCIDRs)

select {
case <-finishCh:
case <-time.After(time.Second):
t.Errorf("InstallSNATBypassServiceFlows didn't succeed in time")
}
}

func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) {
t.Logf("queue len %d", queue.Len())
require.Eventually(t, func() bool {
Expand Down
18 changes: 17 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ type Client interface {
// are removed from PolicyRule.From, else from PolicyRule.To.
DeletePolicyRuleAddress(ruleID uint32, addrType types.AddressType, addresses []types.Address, priority *uint16) error

// InstallSNATBypassServiceFlows installs flows to prevent traffic destined for the specified Service CIDRs from
// being SNAT'd. Otherwise, such Pod-to-Service traffic would be forwarded to Egress Node and be load-balanced
// remotely, as opposed to locally, when AntreaProxy is asked to skip some Services or is not running at all.
// Calling the method with new CIDRs will override the flows installed for previous CIDRs.
InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error

// InstallSNATMarkFlows installs flows for a local SNAT IP. On Linux, a
// single flow is added to mark the packets tunnelled from remote Nodes
// that should be SNAT'd with the SNAT IP.
Expand All @@ -144,7 +150,7 @@ type Client interface {

// InstallPodSNATFlows installs the SNAT flows for a local Pod. If the
// SNAT IP for the Pod is on the local Node, a non-zero SNAT ID should
// allocated for the SNAT IP, and the installed flow sets the SNAT IP
// be allocated for the SNAT IP, and the installed flow sets the SNAT IP
// mark on the egress packets from the ofPort; if the SNAT IP is on a
// remote Node, snatMark should be set to 0, and the installed flow
// tunnels egress packets to the remote Node using the SNAT IP as the
Expand Down Expand Up @@ -980,6 +986,16 @@ func (c *client) generatePipelines() {
}
}

func (c *client) InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error {
var flows []binding.Flow
for _, serviceCIDR := range serviceCIDRs {
flows = append(flows, c.featureEgress.snatSkipCIDRFlow(*serviceCIDR))
}
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.modifyFlows(c.featureEgress.cachedFlows, "svc-cidrs", flows)
}

func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error {
flow := c.featureEgress.snatIPFromTunnelFlow(snatIP, mark)
cacheKey := fmt.Sprintf("s%x", mark)
Expand Down
81 changes: 81 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,87 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}

func Test_client_InstallSNATBypassServiceFlows(t *testing.T) {
testCases := []struct {
name string
serviceCIDRs []*net.IPNet
newServiceCIDRs []*net.IPNet
expectedFlows []string
expectedNewFlows []string
}{
{
name: "IPv4",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/24"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/16"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "IPv6",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("1096::/80"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("1096::/64"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "dual-stack",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/24"),
utilip.MustParseCIDR("1096::/80"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/16"),
utilip.MustParseCIDR("1096::/64"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.serviceCIDRs))
fCacheI, ok := fc.featureEgress.cachedFlows.Load("svc-cidrs")
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))

m.EXPECT().BundleOps(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.newServiceCIDRs))
fCacheI, ok = fc.featureEgress.cachedFlows.Load("svc-cidrs")
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedNewFlows, getFlowStrings(fCacheI))
})
}
}

func Test_client_InstallSNATMarkFlows(t *testing.T) {
mark := uint32(100)

Expand Down
Loading