Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Pod network after realizing initial NetworkPolicies #5777

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/lazy"
"antrea.io/antrea/pkg/util/podstore"
utilwait "antrea.io/antrea/pkg/util/wait"
"antrea.io/antrea/pkg/version"
)

Expand Down Expand Up @@ -226,9 +227,12 @@ func run(o *Options) error {
// Create an ifaceStore that caches network interfaces managed by this node.
ifaceStore := interfacestore.NewInterfaceStore()

// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// podNetworkWait is used to wait and notify that preconditions for Pod network are ready.
// Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement
// it when finished.
// Processes that enable Pod network should wait for it.
podNetworkWait := utilwait.NewGroup()

// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
Expand Down Expand Up @@ -275,7 +279,7 @@ func run(o *Options) error {
wireguardConfig,
egressConfig,
serviceConfig,
networkReadyCh,
podNetworkWait,
stopCh,
o.nodeType,
o.config.ExternalNode.ExternalNodeNamespace,
Expand Down Expand Up @@ -479,6 +483,7 @@ func run(o *Options) error {
gwPort,
tunPort,
nodeConfig,
podNetworkWait,
)
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
Expand Down Expand Up @@ -550,7 +555,7 @@ func run(o *Options) error {
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
networkConfig,
networkReadyCh)
podNetworkWait)

err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel)
if err != nil {
Expand Down
23 changes: 7 additions & 16 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
Expand Down Expand Up @@ -57,6 +56,7 @@ import (
"antrea.io/antrea/pkg/util/env"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
utilwait "antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -121,9 +121,9 @@ type Initializer struct {
enableL7NetworkPolicy bool
connectUplinkToBridge bool
enableAntreaProxy bool
// networkReadyCh should be closed once the Node's network is ready.
// podNetworkWait should be decremented once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
podNetworkWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
Expand All @@ -144,7 +144,7 @@ func NewInitializer(
wireGuardConfig *config.WireGuardConfig,
egressConfig *config.EgressConfig,
serviceConfig *config.ServiceConfig,
networkReadyCh chan<- struct{},
podNetworkWait *utilwait.Group,
stopCh <-chan struct{},
nodeType config.NodeType,
externalNodeNamespace string,
Expand All @@ -168,7 +168,7 @@ func NewInitializer(
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
networkReadyCh: networkReadyCh,
podNetworkWait: podNetworkWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
Expand Down Expand Up @@ -407,9 +407,6 @@ func (i *Initializer) restorePortConfigs() error {
// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
// wg is used to wait for the asynchronous initialization.
var wg sync.WaitGroup

if err := i.initNodeLocalConfig(); err != nil {
return err
}
Expand Down Expand Up @@ -485,23 +482,17 @@ func (i *Initializer) Initialize() error {
}

if i.nodeType == config.K8sNode {
wg.Add(1)
i.podNetworkWait.Increment()
// routeClient.Initialize() should be after i.setupOVSBridge() which
// creates the host gateway interface.
if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil {
if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil {
return err
}

// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
return err
}

// The Node's network is ready only when both synchronous and asynchronous initialization are done.
go func() {
wg.Wait()
close(i.networkReadyCh)
}()
} else {
// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
Expand Down
46 changes: 30 additions & 16 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/wait"
)

type vethPair struct {
Expand Down Expand Up @@ -413,7 +414,7 @@ func parsePrevResult(conf *types.NetworkConfig) error {
return nil
}

func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error {
func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remind me when we flush existing flows in OVS after agent restart? Is that before or after this func?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are asynchonous, likely after this func in practice:

antrea/pkg/agent/agent.go

Lines 568 to 580 in 4ba9451

// Delete stale flows from previous round. We need to wait long enough to ensure
// that all the flow which are still required have received an updated cookie (with
// the new round number), otherwise we would disrupt the dataplane. Unfortunately,
// the time required for convergence may be large and there is no simple way to
// determine when is a right time to perform the cleanup task.
// TODO: introduce a deterministic mechanism through which the different entities
// responsible for installing flows can notify the agent that this deletion
// operation can take place. A waitGroup can be created here and notified when
// full sync in agent networkpolicy controller is complete. This would signal NP
// flows have been synced once. Other mechanisms are still needed for node flows
// fullSync check.
time.Sleep(10 * time.Second)
klog.Info("Deleting stale flows from previous round if any")

We may synchronize them using another WaitGroup, but I feel keeping existing flows until new flows are installed doesn't seem working as expected because it has removed all groups and meters upon initialziation.

// desiredPods is the set of Pods that should be present, based on the
// current list of Pods got from the Kubernetes API.
desiredPods := sets.New[string]()
Expand Down Expand Up @@ -445,21 +446,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name))
}
go func(containerID, pod, namespace string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before there was a guarantee that the CNI server would not be started until reconciliation was complete (i.e., until this function returned). I assume that since the only thing that happens asynchronously is installation of Pod flows, a race condition should not be possible?

// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
containerAccess.lockContainer(containerID)
defer containerAccess.unlockContainer(containerID)

containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID)
if !exists {
klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, name), "containerID", containerID)
return
}
Comment on lines +456 to +460
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to my comment above, I assume this will take care of the case where a CNI DEL comes in before we have a chance to do reconciliation for this Pod?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Pod IP or ofPort is reused, there can be an issue too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for ofport: wouldn't this require a situation where 1) the port has been released (deleted from OVSDB), yet 2) the interface configuration is still present in the ifaceStore? I don't know if that's possible. That would require a CNI DEL with a successful pc.ovsBridgeClient.DeletePort and a failed pc.ifaceStore.DeleteInterface. I don't think it's possible for pc.ifaceStore.DeleteInterface to fail.

for the IP address: I think it's theoretically possible. The host-ipam release happens before OVS port deletion and pc.ifaceStore.DeleteInterface. So we could have this:

  1. A CNI DEL comes in for the Pod
  2. IPAM deletion runs successfully, IP is released
  3. OVS port deletion fails, interface is not deleted from ifaceStore
  4. this logic runs, we call InstallPodFlows for a Pod that has been deleted already
  5. A CNI Add comes in for a new Pod, the same IP address is allocated (unlikely with host-ipam, with IPs being assigned sequentially, but maybe this is the only IP available). Can the flows installed in 4) disrupt networking for the new Pod.
  6. A new CNI DEL comes in for the old Pod (as per the CNI spec, given that the previous one did not succeed).

I guess this situation does not arise when reconciliation runs to completion before the CNI server starts. What do you think @tnqn ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to my comment above, I assume this will take care of the case where a CNI DEL comes in before we have a chance to do reconciliation for this Pod?

Yes, the lock and the check are to prevent race with CNI Del handler.

If Pod IP or ofPort is reused, there can be an issue too.

ofport reuse is not a problem as @antoninbas explained.

The situation @antoninbas described for Pod IP reuse is possible, however, not specific to this case if that could happen. Even when processing CNI events, a similar situation can happen:

  1. A CNI DEL comes in for the Pod
  2. IPAM deletion succeeds, released the IP
  3. OVS flow deletion fails
  4. A CNI ADD comes in for a new Pod, the same IP is reused. There is no problem at this point as it should override the IP's L3 flow.
  5. A new CNI DEL comes in for the old Pod, removing some flows installed by 4. This will cause a problem because it could remove the IP's L3 flow.

To avoid it, perhaps we could use a Pod-specific cookie ID to prevent removing Pod flows of Pod A from affecting Pod B even they share same IP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this situation can already occur before that change, then there is no need to address it in this PR.

To avoid it, perhaps we could use a Pod-specific cookie ID to prevent removing Pod flows of Pod A from affecting Pod B even they share same IP.

That sounds fine.

BTW, would it help to just release the IP address last in the delete sequence, or do we expose ourselves to other problems in this case (e.g., easier to run out of IPs when we have a lot of Pods on a Node, > 200, and a lot of churn)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Agree we need not to resolve all existing issues in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, would it help to just release the IP address last in the delete sequence, or do we expose ourselves to other problems in this case (e.g., easier to run out of IPs when we have a lot of Pods on a Node, > 200, and a lot of churn)?

Great suggestion. I think it's better because ovs flows references to Pod IPs, deleting references before deleting the referenced resource makes more sense and avoids the above issue.

Even if removing OVS resource succeeds and releasing Pod IP fails, the IP and the ofport has been decoupled, there is no problem to reuse the ofport with another Pod IP, and eventually container runtime should keep calling CNI to release the IP, or the startup cleanup will do it when container runtime behaves wrongly.

I could create another PR for this improvement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5788 implements the suggestion.

// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, name))
}
}(containerConfig.ContainerID, name, namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think having several goroutines instead of a single goroutine taking care of the Pods can help reduce initialization time if we have 100+ Pods on the Node? Or would that be insignificant? I am just wondering if this the reason why you went this route.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for simple given goroutine is lightweight, otherwise it would need to creata another slice and iterate it.

} else {
// clean-up and delete interface
klog.V(4).InfoS("Deleting interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
Expand Down
17 changes: 8 additions & 9 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -115,8 +116,8 @@ type CNIServer struct {
enableSecondaryNetworkIPAM bool
disableTXChecksumOffload bool
networkConfig *config.NetworkConfig
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
// podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
podNetworkWait *wait.Group
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -434,11 +435,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return resp, err
}

select {
case <-time.After(networkReadyTimeout):
klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil {
klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}

result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}}
Expand Down Expand Up @@ -610,7 +609,7 @@ func New(
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
networkConfig *config.NetworkConfig,
networkReadyCh <-chan struct{},
podNetworkWait *wait.Group,
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -625,7 +624,7 @@ func New(
disableTXChecksumOffload: disableTXChecksumOffload,
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
podNetworkWait: podNetworkWait,
}
}

Expand Down Expand Up @@ -739,7 +738,7 @@ func (s *CNIServer) reconcile() error {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}

return s.podConfigurator.reconcile(pods.Items, s.containerAccess)
return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait)
}

func init() {
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -663,15 +664,13 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[
}

func newCNIServer(t *testing.T) *CNIServer {
networkReadyCh := make(chan struct{})
cniServer := &CNIServer{
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
networkReadyCh: networkReadyCh,
podNetworkWait: wait.NewGroup(),
}
close(networkReadyCh)
cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450}
return cniServer
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
utilwait "antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -143,10 +144,11 @@ type Controller struct {
fullSyncGroup sync.WaitGroup
ifaceStore interfacestore.InterfaceStore
// denyConnStore is for storing deny connections for flow exporter.
denyConnStore *connections.DenyConnectionStore
gwPort uint32
tunPort uint32
nodeConfig *config.NodeConfig
denyConnStore *connections.DenyConnectionStore
gwPort uint32
tunPort uint32
nodeConfig *config.NodeConfig
podNetworkWait *utilwait.Group

// The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect
// to antrea-controller on startup.
Expand Down Expand Up @@ -181,7 +183,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
v4Enabled bool,
v6Enabled bool,
gwPort, tunPort uint32,
nodeConfig *config.NodeConfig) (*Controller, error) {
nodeConfig *config.NodeConfig,
podNetworkWait *utilwait.Group) (*Controller, error) {
idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID)
c := &Controller{
antreaClientProvider: antreaClientGetter,
Expand All @@ -196,6 +199,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
gwPort: gwPort,
tunPort: tunPort,
nodeConfig: nodeConfig,
podNetworkWait: podNetworkWait.Increment(),
}

if l7NetworkPolicyEnabled {
Expand Down Expand Up @@ -610,6 +614,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
klog.Infof("All watchers have completed full sync, installing flows for init events")
// Batch install all rules in queue after fullSync is finished.
c.processAllItemsInQueue()
c.podNetworkWait.Done()

klog.Infof("Starting NetworkPolicy workers now")
defer c.queue.ShutDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned/fake"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/wait"
)

const testNamespace = "ns1"
Expand Down Expand Up @@ -76,7 +77,30 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
groupIDAllocator := openflow.NewGroupAllocator()
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)}
fs := afero.NewMemMapFs()
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, nil, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{})
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset},
nil,
nil,
fs,
"node1",
podUpdateChannel,
nil,
groupCounters,
ch2,
true,
true,
true,
true,
false,
nil,
testAsyncDeleteInterval,
"8.8.8.8:53",
config.K8sNode,
true,
false,
config.HostGatewayOFPort,
config.DefaultTunOFPort,
&config.NodeConfig{},
wait.NewGroup())
reconciler := newMockReconciler()
controller.reconciler = reconciler
controller.auditLogger = nil
Expand Down
Loading
Loading