Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #5739: Store NetworkPolicy in filesystem as fallback data source #5777: Enable Pod network after realizing initial NetworkPolicies #5795: Support Local ExternalTrafficPolicy for Services with #5798: Fix unit test TestReconcile #5833: Enable IPv4/IPv6 forwarding on demand automatically #5860

17 changes: 12 additions & 5 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"time"

"github.com/spf13/afero"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -78,6 +79,7 @@ import (
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/podstore"
utilwait "antrea.io/antrea/pkg/util/wait"
"antrea.io/antrea/pkg/version"
)

Expand Down Expand Up @@ -219,9 +221,12 @@ func run(o *Options) error {
// Create an ifaceStore that caches network interfaces managed by this node.
ifaceStore := interfacestore.NewInterfaceStore()

// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// podNetworkWait is used to wait and notify that preconditions for Pod network are ready.
// Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement
// it when finished.
// Processes that enable Pod network should wait for it.
podNetworkWait := utilwait.NewGroup()

// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
Expand Down Expand Up @@ -266,7 +271,7 @@ func run(o *Options) error {
wireguardConfig,
egressConfig,
serviceConfig,
networkReadyCh,
podNetworkWait,
stopCh,
o.nodeType,
o.config.ExternalNode.ExternalNodeNamespace,
Expand Down Expand Up @@ -446,6 +451,7 @@ func run(o *Options) error {
antreaClientProvider,
ofClient,
ifaceStore,
afero.NewOsFs(),
nodeKey,
podUpdateChannel,
externalEntityUpdateChannel,
Expand All @@ -465,6 +471,7 @@ func run(o *Options) error {
gwPort,
tunPort,
nodeConfig,
podNetworkWait,
)
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
Expand Down Expand Up @@ -536,7 +543,7 @@ func run(o *Options) error {
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
networkConfig,
networkReadyCh)
podNetworkWait)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
cniPodInfoStore = cnipodcache.NewCNIPodInfoStore()
Expand Down
23 changes: 7 additions & 16 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
Expand Down Expand Up @@ -58,6 +57,7 @@ import (
"antrea.io/antrea/pkg/util/env"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
utilwait "antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -119,9 +119,9 @@ type Initializer struct {
l7NetworkPolicyConfig *config.L7NetworkPolicyConfig
enableL7NetworkPolicy bool
connectUplinkToBridge bool
// networkReadyCh should be closed once the Node's network is ready.
// podNetworkWait should be decremented once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
podNetworkWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
Expand All @@ -142,7 +142,7 @@ func NewInitializer(
wireGuardConfig *config.WireGuardConfig,
egressConfig *config.EgressConfig,
serviceConfig *config.ServiceConfig,
networkReadyCh chan<- struct{},
podNetworkWait *utilwait.Group,
stopCh <-chan struct{},
nodeType config.NodeType,
externalNodeNamespace string,
Expand All @@ -165,7 +165,7 @@ func NewInitializer(
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
networkReadyCh: networkReadyCh,
podNetworkWait: podNetworkWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
Expand Down Expand Up @@ -403,9 +403,6 @@ func (i *Initializer) restorePortConfigs() error {
// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
// wg is used to wait for the asynchronous initialization.
var wg sync.WaitGroup

if err := i.initNodeLocalConfig(); err != nil {
return err
}
Expand Down Expand Up @@ -481,23 +478,17 @@ func (i *Initializer) Initialize() error {
}

if i.nodeType == config.K8sNode {
wg.Add(1)
i.podNetworkWait.Increment()
// routeClient.Initialize() should be after i.setupOVSBridge() which
// creates the host gateway interface.
if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil {
if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil {
return err
}

// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
return err
}

// The Node's network is ready only when both synchronous and asynchronous initialization are done.
go func() {
wg.Wait()
close(i.networkReadyCh)
}()
} else {
// Install OpenFlow entries on OVS bridge.
if err := i.initOpenFlowPipeline(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/interface_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (ns *fakeNS) clear() {
}

func createNS(t *testing.T, waitForComplete bool) *fakeNS {
nsPath := generateUUID(t)
nsPath := generateUUID()
fakeNs := &fakeNS{path: nsPath, fd: uintptr(unsafe.Pointer(&nsPath)), waitCompleted: waitForComplete, stopCh: make(chan struct{})}
validNSs.Store(nsPath, fakeNs)
return fakeNs
Expand Down
46 changes: 30 additions & 16 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/wait"
)

type vethPair struct {
Expand Down Expand Up @@ -416,7 +417,7 @@ func parsePrevResult(conf *types.NetworkConfig) error {
return nil
}

func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error {
func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error {
// desiredPods is the set of Pods that should be present, based on the
// current list of Pods got from the Kubernetes API.
desiredPods := sets.New[string]()
Expand All @@ -441,21 +442,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).Infof("Syncing interface %s for Pod %s", containerConfig.InterfaceName, namespacedName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.Errorf("Error when re-installing flows for Pod %s", namespacedName)
}
go func(containerID, pod, namespace string) {
// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
containerAccess.lockContainer(containerID)
defer containerAccess.unlockContainer(containerID)

containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID)
if !exists {
klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, pod), "containerID", containerID)
return
}
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, pod), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
containerConfig.MAC,
uint32(containerConfig.OFPort),
containerConfig.VLANID,
nil,
); err != nil {
klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, pod))
}
}(containerConfig.ContainerID, containerConfig.PodName, containerConfig.PodNamespace)
} else {
// clean-up and delete interface
klog.V(4).Infof("Deleting interface %s", containerConfig.InterfaceName)
Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestConnectInterceptedInterface(t *testing.T) {
testPodName := "test-pod"
podNamespace := testPodNamespace
hostInterfaceName := util.GenerateContainerInterfaceName(testPodName, testPodNamespace, testPodInfraContainerID)
containerID := generateUUID(t)
containerID := generateUUID()
containerNetNS := "container-ns"
containerDev := "eth0"

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestConnectInterceptedInterface(t *testing.T) {
if tc.migratedRoute {
mockRoute.EXPECT().MigrateRoutesToGw(hostInterfaceName).Return(tc.migrateRouteErr)
}
ovsPortID := generateUUID(t)
ovsPortID := generateUUID()
if tc.connectedOVS {
mockOVSBridgeClient.EXPECT().CreatePort(hostInterfaceName, gomock.Any(), gomock.Any()).Return(ovsPortID, tc.createOVSPortErr).Times(1)
if tc.createOVSPortErr == nil {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestConnectInterceptedInterface(t *testing.T) {

func TestCreateOVSPort(t *testing.T) {
controller := gomock.NewController(t)
containerID := generateUUID(t)
containerID := generateUUID()
podName := "p0"
podNamespace := testPodNamespace

Expand Down Expand Up @@ -271,10 +271,10 @@ func TestCreateOVSPort(t *testing.T) {
containerConfig := buildContainerConfig(tc.portName, containerID, podName, podNamespace, &current.Interface{Mac: "01:02:03:04:05:06"}, ipamResult.IPs, tc.vlanID)
attachInfo := BuildOVSPortExternalIDs(containerConfig)
if tc.createOVSPort {
mockOVSBridgeClient.EXPECT().CreatePort(tc.portName, tc.portName, attachInfo).Times(1).Return(generateUUID(t), nil)
mockOVSBridgeClient.EXPECT().CreatePort(tc.portName, tc.portName, attachInfo).Times(1).Return(generateUUID(), nil)
}
if tc.createOVSAccessPort {
mockOVSBridgeClient.EXPECT().CreateAccessPort(tc.portName, tc.portName, attachInfo, tc.vlanID).Times(1).Return(generateUUID(t), nil)
mockOVSBridgeClient.EXPECT().CreateAccessPort(tc.portName, tc.portName, attachInfo, tc.vlanID).Times(1).Return(generateUUID(), nil)
}
_, err := podConfigurator.createOVSPort(tc.portName, attachInfo, tc.vlanID)
assert.NoError(t, err)
Expand All @@ -283,8 +283,8 @@ func TestCreateOVSPort(t *testing.T) {
}

func TestParseOVSPortInterfaceConfig(t *testing.T) {
containerID := generateUUID(t)
portUUID := generateUUID(t)
containerID := generateUUID()
portUUID := generateUUID()
ofPort := int32(1)
containerIPs := "1.1.1.2,aabb:1122::101:102"
parsedIPs := []net.IP{net.ParseIP("1.1.1.2"), net.ParseIP("aabb:1122::101:102")}
Expand Down Expand Up @@ -398,14 +398,14 @@ func TestParseOVSPortInterfaceConfig(t *testing.T) {
func TestCheckHostInterface(t *testing.T) {
controller := gomock.NewController(t)
hostIfaceName := "port1"
containerID := generateUUID(t)
containerID := generateUUID()
containerIntf := &current.Interface{Name: ifname, Sandbox: netns, Mac: "01:02:03:04:05:06"}
interfaces := []*current.Interface{containerIntf, {Name: hostIfaceName}}
containeIPs := ipamResult.IPs
ifaceMAC, _ := net.ParseMAC("01:02:03:04:05:06")
containerInterface := interfacestore.NewContainerInterface(hostIfaceName, containerID, "pod1", testPodNamespace, ifaceMAC, []net.IP{containerIP}, 1)
containerInterface.OVSPortConfig = &interfacestore.OVSPortConfig{
PortUUID: generateUUID(t),
PortUUID: generateUUID(),
OFPort: int32(10),
}

Expand Down Expand Up @@ -454,7 +454,7 @@ func TestCheckHostInterface(t *testing.T) {

func TestConfigureSriovSecondaryInterface(t *testing.T) {
controller := gomock.NewController(t)
containerID := generateUUID(t)
containerID := generateUUID()
containerNS := "containerNS"

for _, tc := range []struct {
Expand Down
17 changes: 8 additions & 9 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -118,8 +119,8 @@ type CNIServer struct {
disableTXChecksumOffload bool
secondaryNetworkEnabled bool
networkConfig *config.NetworkConfig
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
// podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
podNetworkWait *wait.Group
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -441,11 +442,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return resp, err
}

select {
case <-time.After(networkReadyTimeout):
klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil {
klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}

result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}}
Expand Down Expand Up @@ -634,7 +633,7 @@ func New(
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
networkConfig *config.NetworkConfig,
networkReadyCh <-chan struct{},
podNetworkWait *wait.Group,
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -650,7 +649,7 @@ func New(
disableTXChecksumOffload: disableTXChecksumOffload,
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
podNetworkWait: podNetworkWait,
}
}

Expand Down Expand Up @@ -773,7 +772,7 @@ func (s *CNIServer) reconcile() error {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}

return s.podConfigurator.reconcile(pods.Items, s.containerAccess)
return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait)
}

func init() {
Expand Down
Loading
Loading