diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 92b7ae82710..ee294a77672 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -60,7 +60,6 @@ import ( "antrea.io/antrea/pkg/agent/querier" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/secondarynetwork" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/servicecidr" "antrea.io/antrea/pkg/agent/stats" support "antrea.io/antrea/pkg/agent/supportbundlecollection" @@ -535,7 +534,6 @@ func run(o *Options) error { } var cniServer *cniserver.CNIServer - var cniPodInfoStore cnipodcache.CNIPodInfoStore var externalNodeController *externalnode.ExternalNodeController var localExternalNodeInformer cache.SharedIndexInformer @@ -554,17 +552,9 @@ func run(o *Options) error { networkConfig, networkReadyCh) - if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { - cniPodInfoStore = cnipodcache.NewCNIPodInfoStore() - err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore) - if err != nil { - return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err) - } - } else { - err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil) - if err != nil { - return fmt.Errorf("error initializing CNI server: %v", err) - } + err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel) + if err != nil { + return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err) } } else { listOptions := func(options *metav1.ListOptions) { @@ -700,8 +690,8 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { if err := secondarynetwork.Initialize( o.config.ClientConnection, o.config.KubeAPIServerOverride, - k8sClient, localPodInformer.Get(), nodeConfig.Name, cniPodInfoStore, - stopCh, + k8sClient, localPodInformer.Get(), nodeConfig.Name, + podUpdateChannel, stopCh, &o.config.SecondaryNetwork, ovsdbConnection); err != nil { return fmt.Errorf("failed to initialize secondary network: %v", err) } diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 9564fbb6e26..9f6398af48b 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -31,7 +31,6 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" agenttypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/ovs/ovsconfig" @@ -72,8 +71,6 @@ type podConfigurator struct { // podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this // information, i.e. NetworkPolicyController, EgressController. podUpdateNotifier channel.Notifier - // consumed by secondary network creation. - podInfoStore cnipodcache.CNIPodInfoStore } func newPodConfigurator( @@ -86,7 +83,6 @@ func newPodConfigurator( isOvsHardwareOffloadEnabled bool, disableTXChecksumOffload bool, podUpdateNotifier channel.Notifier, - podInfoStore cnipodcache.CNIPodInfoStore, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload) if err != nil { @@ -100,7 +96,6 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, - podInfoStore: podInfoStore, }, nil } @@ -243,7 +238,8 @@ func (pc *podConfigurator) configureInterfaces( } var containerConfig *interfacestore.InterfaceConfig - if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil { + if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS, + hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil { return fmt.Errorf("failed to connect to ovs for container %s: %v", containerID, err) } defer func() { @@ -486,7 +482,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain return nil } -func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, containerConfig *interfacestore.InterfaceConfig) error { +func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName, netNS string, containerConfig *interfacestore.InterfaceConfig) error { // create OVS Port and add attach container configuration into external_ids containerID := containerConfig.ContainerID klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) @@ -519,8 +515,9 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta event := agenttypes.PodUpdate{ PodName: containerConfig.PodName, PodNamespace: containerConfig.PodNamespace, - IsAdd: true, ContainerID: containerConfig.ContainerID, + NetNS: netNS, + IsAdd: true, } pc.podUpdateNotifier.Notify(event) return nil @@ -548,8 +545,8 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface event := agenttypes.PodUpdate{ PodName: containerConfig.PodName, PodNamespace: containerConfig.PodNamespace, - IsAdd: false, ContainerID: containerConfig.ContainerID, + IsAdd: false, } pc.podUpdateNotifier.Notify(event) klog.Infof("Removed interfaces for container %s", containerID) @@ -577,8 +574,8 @@ func (pc *podConfigurator) connectInterceptedInterface( if err = pc.routeClient.MigrateRoutesToGw(hostIface.Name); err != nil { return fmt.Errorf("connectInterceptedInterface failed to migrate: %w", err) } - _, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface, - containerIface, containerIPs, 0, containerAccess) + _, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS, + hostIface, containerIface, containerIPs, 0, containerAccess) return err } diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index 4376d32b1a6..1ca96ff2c33 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -29,6 +29,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( podName string, podNamespace string, containerID string, + netNS string, hostIface *current.Interface, containerIface *current.Interface, ips []*current.IPConfig, @@ -38,7 +39,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // Use the outer veth interface name as the OVS port name. ovsPortName := hostIface.Name containerConfig := buildContainerConfig(ovsPortName, containerID, podName, podNamespace, containerIface, ips, vlanID) - return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig) + return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig) } func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) { diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 1c1a8fefd4d..62e65069fc8 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -501,7 +501,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 1302e66bf58..9259b156d79 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -66,6 +66,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( podName string, podNamespace string, containerID string, + netNS string, hostIface *current.Interface, containerIface *current.Interface, ips []*current.IPConfig, @@ -87,7 +88,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // HNSEndpoint/HostComputeEndpoint, the current implementation will still work. It will choose the synchronized // way to create OVS port. if hostInterfaceExistsFunc(hostIfAlias) { - return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig) + return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig) } klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig) diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index ae01aed0dff..bfe94751e99 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -24,7 +24,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient) (*podConfigurator, error) { - return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil) + return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil) } // ConfigureSriovSecondaryInterface configures a SR-IOV secondary interface for a Pod. diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index a99522d549d..1426fdc131c 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -39,7 +39,6 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/util" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/cni" @@ -115,7 +114,6 @@ type CNIServer struct { // Enable AntreaIPAM for secondary networks implementd by other CNIs. enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool - secondaryNetworkEnabled bool networkConfig *config.NetworkConfig // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. networkReadyCh <-chan struct{} @@ -523,13 +521,6 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* // mark success as true to avoid rollback success = true - if s.secondaryNetworkEnabled { - // Go cache the CNI server info at CNIConfigInfo cache, for podWatch usage - cniInfo := &cnipodcache.CNIConfigInfo{CNIVersion: cniVersion, PodName: podName, PodNamespace: podNamespace, - ContainerID: cniConfig.ContainerId, ContainerNetNS: netNS, PodCNIDeleted: false} - s.podConfigurator.podInfoStore.AddCNIConfigInfo(cniInfo) - } - return resultToResponse(cniResult), nil } @@ -558,16 +549,7 @@ func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniC return s.configInterfaceFailureResponse(err), nil } klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId) - if s.secondaryNetworkEnabled { - podName := string(cniConfig.K8S_POD_NAME) - podNamespace := string(cniConfig.K8S_POD_NAMESPACE) - containerInfo := s.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(podName, podNamespace, cniConfig.ContainerId) - if containerInfo != nil { - // Update PodCNIDeleted = true. - // This is to let Podwatch controller know that the CNI server cleaned up this Pod's primary network configuration. - s.podConfigurator.podInfoStore.SetPodCNIDeleted(containerInfo) - } - } + return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil } @@ -652,21 +634,12 @@ func (s *CNIServer) Initialize( ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, podUpdateNotifier channel.Notifier, - podInfoStore cnipodcache.CNIPodInfoStore, ) error { var err error - // If podInfoStore is not nil, secondaryNetwork configuration is supported. - if podInfoStore != nil { - s.secondaryNetworkEnabled = true - } else { - s.secondaryNetworkEnabled = false - } - s.podConfigurator, err = newPodConfigurator( ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), - s.disableTXChecksumOffload, - podUpdateNotifier, podInfoStore) + s.disableTXChecksumOffload, podUpdateNotifier) if err != nil { return fmt.Errorf("error during initialize podConfigurator: %v", err) } diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 844f2b99361..4760fc2ab93 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -40,7 +40,6 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/util" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/ovs/ovsconfig" @@ -98,7 +97,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -109,7 +108,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -122,7 +121,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -191,7 +190,7 @@ func TestRemoveInterface(t *testing.T) { }) } -func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ipam.IPAMDriver, ipamType string, enableSecondaryNetworkIPAM, isChaining, secondaryNetworkEnabled bool) *CNIServer { +func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ipam.IPAMDriver, ipamType string, enableSecondaryNetworkIPAM, isChaining bool) *CNIServer { mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -203,13 +202,9 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining - cniServer.secondaryNetworkEnabled = secondaryNetworkEnabled - if secondaryNetworkEnabled { - cniServer.podConfigurator.podInfoStore = cnipodcache.NewCNIPodInfoStore() - } cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer } @@ -244,7 +239,6 @@ func TestCmdAdd(t *testing.T) { cniType string enableSecondaryNetworkIPAM bool isChaining bool - secondaryNetworkEnabled bool connectOVS bool migrateRoute bool addLocalIPAMRoute bool @@ -306,22 +300,11 @@ func TestCmdAdd(t *testing.T) { addLocalIPAMRoute: true, addLocalIPAMRouteError: fmt.Errorf("failed to configure route"), containerIfaceExist: false, - }, { - name: "add-secondary-network", - podName: "pod4", - ipamType: "test-cni-ipam", - ipamAdd: true, - enableSecondaryNetworkIPAM: false, - secondaryNetworkEnabled: true, - isChaining: false, - connectOVS: true, - addLocalIPAMRoute: true, - containerIfaceExist: true, }, } { t.Run(tc.name, func(t *testing.T) { defer mockGetNSPath(nil)() - cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining, tc.secondaryNetworkEnabled) + cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining) testIfaceConfigurator := newTestInterfaceConfigurator() requestMsg, hostInterfaceName := createCNIRequestAndInterfaceName(t, tc.podName, tc.cniType, ipamResult, tc.ipamType, true) testIfaceConfigurator.hostIfaceName = hostInterfaceName @@ -383,10 +366,6 @@ func TestCmdAdd(t *testing.T) { successResponse := resultToResponse(versionedResult) assert.Equal(t, successResponse, resp) } - if tc.secondaryNetworkEnabled { - cniConfigInfo := cniserver.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(tc.podName, testPodNamespace, containerID) - assert.NotNil(t, cniConfigInfo) - } }) } } @@ -407,7 +386,6 @@ func TestCmdDel(t *testing.T) { cniType string enableSecondaryNetworkIPAM bool isChaining bool - secondaryNetworkEnabled bool disconnectOVS bool migrateRoute bool delLocalIPAMRoute bool @@ -470,20 +448,9 @@ func TestCmdDel(t *testing.T) { delLocalIPAMRoute: true, delLocalIPAMRouteError: fmt.Errorf("unable to delete flexible IPAM rule"), }, - { - name: "del-secondary-network", - podName: "pod4", - ipamType: "test-delete", - ipamDel: true, - enableSecondaryNetworkIPAM: false, - secondaryNetworkEnabled: true, - isChaining: false, - disconnectOVS: true, - delLocalIPAMRoute: true, - }, } { t.Run(tc.name, func(t *testing.T) { - cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining, tc.secondaryNetworkEnabled) + cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining) requestMsg, hostInterfaceName := createCNIRequestAndInterfaceName(t, tc.podName, tc.cniType, ipamResult, tc.ipamType, true) containerID := requestMsg.CniArgs.ContainerId containerIfaceConfig := interfacestore.NewContainerInterface(hostInterfaceName, containerID, tc.podName, testPodNamespace, containerVethMac, []net.IP{net.ParseIP("10.1.2.100")}, 0) @@ -492,11 +459,6 @@ func TestCmdDel(t *testing.T) { testIfaceConfigurator := newTestInterfaceConfigurator() testIfaceConfigurator.hostIfaceName = hostInterfaceName cniserver.podConfigurator.ifConfigurator = testIfaceConfigurator - if tc.secondaryNetworkEnabled { - cniInfo := &cnipodcache.CNIConfigInfo{CNIVersion: supportedCNIVersion, PodName: tc.podName, PodNamespace: testPodNamespace, - ContainerID: containerID, ContainerNetNS: netns, PodCNIDeleted: false} - cniserver.podConfigurator.podInfoStore.AddCNIConfigInfo(cniInfo) - } if tc.ipamDel { if tc.enableSecondaryNetworkIPAM { ipamSecondaryNetworkDel = func(cniArgs *cnipb.CniCmdArgs, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error { @@ -530,11 +492,6 @@ func TestCmdDel(t *testing.T) { assert.Equal(t, emptyResponse, resp) } } - if tc.secondaryNetworkEnabled { - cniConfigInfo := cniserver.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(tc.podName, testPodNamespace, containerID) - assert.NotNil(t, cniConfigInfo) - assert.True(t, cniConfigInfo.PodCNIDeleted) - } }) } } @@ -566,7 +523,7 @@ func TestCmdCheck(t *testing.T) { } t.Run("secondary-IPAM", func(t *testing.T) { ipamType := ipam.AntreaIPAMType - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false) requestMsg, _ := prepareRequest("pod0", "cniType", ipamType, false) ipamSecondaryNetworkCheck = func(cniArgs *cnipb.CniCmdArgs, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error { return nil @@ -580,7 +537,7 @@ func TestCmdCheck(t *testing.T) { }) t.Run("secondary-IPAM-failure", func(t *testing.T) { ipamType := ipam.AntreaIPAMType - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false) requestMsg, _ := prepareRequest("pod0", "cniType", ipamType, false) ipamSecondaryNetworkCheck = func(cniArgs *cnipb.CniCmdArgs, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error { return errors.New("failed to check secondary IPAM response") @@ -600,7 +557,7 @@ func TestCmdCheck(t *testing.T) { }) t.Run("chaining", func(t *testing.T) { ipamType := "test-check" - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, true, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, true) requestMsg, hostInterfaceName := prepareRequest("pod1", "", ipamType, true) testIfaceConfigurator := newTestInterfaceConfigurator() testIfaceConfigurator.hostIfaceName = hostInterfaceName @@ -611,7 +568,7 @@ func TestCmdCheck(t *testing.T) { }) t.Run("check-general-cni", func(t *testing.T) { ipamType := "test-check" - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, false, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, false) requestMsg, hostInterfaceName := prepareRequest("pod2", "", ipamType, true) testIfaceConfigurator := newTestInterfaceConfigurator() testIfaceConfigurator.hostIfaceName = hostInterfaceName @@ -638,7 +595,7 @@ func TestReconcile(t *testing.T) { cniServer := newCNIServer(t) cniServer.routeClient = mockRoute gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 5e26c3041e3..2786dd2a8ae 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -293,7 +293,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier, nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) return cniServer } @@ -947,7 +947,7 @@ func TestReconcile(t *testing.T) { pod4IfaceName := "iface4" pod4Iface := containerIfaces["iface4"] waiter := newAsyncWaiter(pod4Iface.PodName, pod4Iface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier, nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} // Re-install Pod1 flows diff --git a/pkg/agent/secondarynetwork/cnipodcache/types.go b/pkg/agent/secondarynetwork/cnipodcache/types.go index 83fa41d30b7..657ce431c2d 100644 --- a/pkg/agent/secondarynetwork/cnipodcache/types.go +++ b/pkg/agent/secondarynetwork/cnipodcache/types.go @@ -15,7 +15,6 @@ package cnipodcache type CNIConfigInfo struct { - CNIVersion string PodName string PodNamespace string ContainerID string diff --git a/pkg/agent/secondarynetwork/init.go b/pkg/agent/secondarynetwork/init.go index eda73d7a824..abc3351c001 100644 --- a/pkg/agent/secondarynetwork/init.go +++ b/pkg/agent/secondarynetwork/init.go @@ -26,10 +26,10 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch" agentconfig "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) @@ -46,7 +46,7 @@ func Initialize( k8sClient clientset.Interface, podInformer cache.SharedIndexInformer, nodeName string, - podCache cnipodcache.CNIPodInfoStore, + podUpdateSubscriber channel.Subscriber, stopCh <-chan struct{}, config *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error { @@ -66,7 +66,7 @@ func Initialize( // k8s.v1.cni.cncf.io/networks Annotation defined. if podWatchController, err := podwatch.NewPodController( k8sClient, netAttachDefClient, podInformer, - nodeName, podCache, ovsBridgeClient); err != nil { + nodeName, podUpdateSubscriber, ovsBridgeClient); err != nil { return err } else { go podWatchController.Run(stopCh) diff --git a/pkg/agent/secondarynetwork/podwatch/controller.go b/pkg/agent/secondarynetwork/podwatch/controller.go index 86d684bb833..fd6d9f8c500 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller.go +++ b/pkg/agent/secondarynetwork/podwatch/controller.go @@ -37,10 +37,12 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" - "antrea.io/antrea/pkg/agent/cniserver/types" + cnitypes "antrea.io/antrea/pkg/agent/cniserver/types" cnipodcache "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" + "antrea.io/antrea/pkg/agent/types" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -70,7 +72,7 @@ type InterfaceConfigurator interface { } type IPAMAllocator interface { - SecondaryNetworkAllocate(podOwner *crdv1a2.PodOwner, networkConfig *types.NetworkConfig) (*ipam.IPAMResult, error) + SecondaryNetworkAllocate(podOwner *crdv1a2.PodOwner, networkConfig *cnitypes.NetworkConfig) (*ipam.IPAMResult, error) SecondaryNetworkRelease(podOwner *crdv1a2.PodOwner) error } @@ -80,6 +82,7 @@ type PodController struct { queue workqueue.RateLimitingInterface podInformer cache.SharedIndexInformer nodeName string + podUpdateSubscriber channel.Subscriber podCache cnipodcache.CNIPodInfoStore ovsBridgeClient ovsconfig.OVSBridgeClient interfaceConfigurator InterfaceConfigurator @@ -92,7 +95,7 @@ func NewPodController( netAttachDefClient netdefclient.K8sCniCncfIoV1Interface, podInformer cache.SharedIndexInformer, nodeName string, - podCache cnipodcache.CNIPodInfoStore, + podUpdateSubscriber channel.Subscriber, ovsBridgeClient ovsconfig.OVSBridgeClient, ) (*PodController, error) { interfaceConfigurator, err := cniserver.NewSecondaryInterfaceConfigurator(ovsBridgeClient) @@ -105,7 +108,8 @@ func NewPodController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "podcontroller"), podInformer: podInformer, nodeName: nodeName, - podCache: podCache, + podUpdateSubscriber: podUpdateSubscriber, + podCache: cnipodcache.NewCNIPodInfoStore(), ovsBridgeClient: ovsBridgeClient, interfaceConfigurator: interfaceConfigurator, ipamAllocator: ipam.GetSecondaryNetworkAllocator(), @@ -118,11 +122,16 @@ func NewPodController( }, resyncPeriod, ) + // podUpdateSubscriber can be nil with test code. + if podUpdateSubscriber != nil { + // Subscribe Pod CNI add/del events. + podUpdateSubscriber.Subscribe(pc.processCNIUpdate) + } return &pc, nil } -func podKeyGet(pod *corev1.Pod) string { - return pod.Namespace + "/" + pod.Name +func podKeyGet(podName, podNamespace string) string { + return podNamespace + "/" + podName } func generatePodSecondaryIfaceName(podCNIInfo *cnipodcache.CNIConfigInfo) (string, error) { @@ -147,9 +156,8 @@ func (pc *PodController) deletePodSecondaryNetwork(podCNIInfo *cnipodcache.CNICo // NOTE: SR-IOV VF interface clean-up, upon Pod delete will be handled by SR-IOV device // plugin. Not handled here. for iface, interfaceInfo := range podCNIInfo.Interfaces { - klog.V(1).InfoS("Deleting secondary interface", - "Pod", klog.KRef(podCNIInfo.PodNamespace, podCNIInfo.PodName), - "interface", iface, "interfaceInfo", *interfaceInfo) + klog.InfoS("Deleting secondary interface", + "Pod", klog.KRef(podCNIInfo.PodNamespace, podCNIInfo.PodName), "interface", iface) if interfaceInfo.NetworkType == vlanNetworkType { if err := pc.interfaceConfigurator.DeleteVLANSecondaryInterface(podCNIInfo.ContainerID, interfaceInfo.HostInterfaceName, interfaceInfo.OVSPortUUID); err != nil { @@ -188,10 +196,29 @@ func (pc *PodController) enqueuePod(obj interface{}) { return } } - podKey := podKeyGet(pod) + podKey := podKeyGet(pod.Name, pod.Namespace) pc.queue.Add(podKey) } +// processCNIUpdate will be called when CNIServer publishes a Pod update event. +func (pc *PodController) processCNIUpdate(e interface{}) { + event := e.(types.PodUpdate) + if event.IsAdd { + // Go cache the CNI server info at CNIConfigInfo cache, for podWatch usage + cniInfo := &cnipodcache.CNIConfigInfo{PodName: event.PodName, PodNamespace: event.PodNamespace, + ContainerID: event.ContainerID, ContainerNetNS: event.NetNS, PodCNIDeleted: false} + pc.podCache.AddCNIConfigInfo(cniInfo) + } else { + containerInfo := pc.podCache.GetCNIConfigInfoByContainerID(event.PodName, event.PodNamespace, event.ContainerID) + if containerInfo != nil { + // Update PodCNIDeleted = true. + // This is to let Podwatch controller know that the CNI server cleaned up this Pod's primary network configuration. + pc.podCache.SetPodCNIDeleted(containerInfo) + } + } + pc.queue.Add(podKeyGet(event.PodName, event.PodNamespace)) +} + // handleAddUpdatePod handles Pod Add, Update events and updates annotation if required. func (pc *PodController) handleAddUpdatePod(obj interface{}) error { var err error @@ -217,7 +244,7 @@ func (pc *PodController) handleAddUpdatePod(obj interface{}) error { // Avoid processing Pod annotation, if we already have at least one secondary network successfully configured on this Pod. // We do not support/handle Annotation updates yet. if len(podCNIInfo.Interfaces) > 0 { - klog.V(1).InfoS("Secondary network already configured on this Pod and annotation update not supported, skipping update", "pod", klog.KObj(pod)) + klog.V(1).InfoS("Secondary network already configured on this Pod and annotation update not supported, skipping update", "Pod", klog.KObj(pod)) return nil } @@ -232,7 +259,7 @@ func (pc *PodController) handleAddUpdatePod(obj interface{}) error { err = pc.configurePodSecondaryNetwork(pod, networklist, podCNIInfo) if err != nil { - klog.ErrorS(err, "Error when configuring secondary network", "pod", klog.KObj(pod)) + klog.ErrorS(err, "Error when configuring secondary network", "Pod", klog.KObj(pod)) if len(podCNIInfo.Interfaces) == 0 { // Return error to requeue and retry. return err @@ -251,6 +278,7 @@ func (pc *PodController) handleRemovePod(key string) error { for _, info := range podCNIInfo { // Delete all secondary interfaces and release IPAM. if err = pc.deletePodSecondaryNetwork(info); err != nil { + klog.ErrorS(err, "Error when deleting secondary network", "Pod", klog.KRef(info.PodNamespace, info.PodName)) // Return error to requeue Pod delete. return err } else { diff --git a/pkg/agent/secondarynetwork/podwatch/controller_test.go b/pkg/agent/secondarynetwork/podwatch/controller_test.go index 71f2e16ddfe..63572d4353c 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller_test.go +++ b/pkg/agent/secondarynetwork/podwatch/controller_test.go @@ -209,8 +209,8 @@ func TestPodControllerRun(t *testing.T) { netdefclient, informerFactory.Core().V1().Pods().Informer(), testNode, - podCache, - nil) + nil, nil) + podController.podCache = podCache podController.interfaceConfigurator = interfaceConfigurator podController.ipamAllocator = mockIPAM diff --git a/pkg/agent/types/event.go b/pkg/agent/types/event.go index eea70f66e42..297a4efb709 100644 --- a/pkg/agent/types/event.go +++ b/pkg/agent/types/event.go @@ -17,6 +17,7 @@ package types type PodUpdate struct { PodNamespace string PodName string - IsAdd bool ContainerID string + NetNS string + IsAdd bool } diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 846980eec0b..1852f4dbea1 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -574,7 +574,7 @@ func newTester() *cmdAddDelTester { routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, tester.networkReadyCh) - tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) + tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) ctx := context.Background() tester.ctx = ctx return tester @@ -795,7 +795,7 @@ func TestCNIServerChaining(t *testing.T) { ifaceStore := interfacestore.NewInterfaceStore() ovsServiceMock.EXPECT().IsHardwareOffloadEnabled().Return(false).AnyTimes() ovsServiceMock.EXPECT().GetOVSDatapathType().Return(ovsconfig.OVSDatapathSystem).AnyTimes() - err = server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) + err = server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) testRequire.Nil(err) } @@ -928,7 +928,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { ) // call Initialize, which will run reconciliation and perform host-local IPAM garbage collection - server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) + server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) getIPs := func(cID string) []net.IP { ipamStore, err := disk.New("antrea", "")