From 11ae823a435a7cbf1ce0ea842575e2a916a8fd47 Mon Sep 17 00:00:00 2001 From: hjiajing Date: Fri, 17 Mar 2023 16:57:05 +0800 Subject: [PATCH] Add WireGuard tunnels for Antrea Multi-cluster cross-cluster traffic Add a traffic encryption mode for cross-cluster traffic. If WireGuard enabled, corresponding WireGuard configuration will be created on the Gateway Node. And all cross-cluster traffic will go through the WireGuard tunnel to remote Gateway. Signed-off-by: hjiajing --- build/charts/antrea/README.md | 4 +- build/charts/antrea/conf/antrea-agent.conf | 10 + .../antrea/templates/agent/clusterrole.yaml | 1 + build/charts/antrea/values.yaml | 11 +- build/yamls/antrea-aks.yml | 15 +- build/yamls/antrea-eks.yml | 15 +- build/yamls/antrea-gke.yml | 15 +- build/yamls/antrea-ipsec.yml | 15 +- build/yamls/antrea.yml | 15 +- cmd/antrea-agent/agent.go | 5 +- cmd/antrea-agent/options.go | 16 + docs/network-requirements.md | 21 +- .../multicluster/v1alpha1/gateway_types.go | 12 +- .../v1alpha1/zz_generated.deepcopy.go | 25 ++ .../antrea-multicluster-leader-global.yml | 16 + .../yamls/antrea-multicluster-member.yml | 14 + .../gateway_webhook.go | 8 +- ...ster.crd.antrea.io_clusterinfoimports.yaml | 7 + .../multicluster.crd.antrea.io_gateways.yaml | 7 + ...cluster.crd.antrea.io_resourceexports.yaml | 8 + ...cluster.crd.antrea.io_resourceimports.yaml | 8 + .../multicluster/member/gateway_controller.go | 38 +- .../member/gateway_controller_test.go | 30 ++ pkg/agent/agent.go | 22 +- pkg/agent/multicluster/mc_route_controller.go | 287 +++++++++++++-- .../multicluster/mc_route_controller_test.go | 335 +++++++++++++++++- pkg/agent/openflow/client.go | 4 +- pkg/agent/openflow/multicluster.go | 2 +- pkg/agent/route/interfaces.go | 7 + pkg/agent/route/route_linux.go | 27 ++ pkg/agent/route/route_windows.go | 8 + pkg/agent/route/testing/mock_route.go | 28 ++ pkg/agent/wireguard/client_linux.go | 65 ++-- pkg/agent/wireguard/client_test.go | 23 +- pkg/agent/wireguard/client_windows.go | 4 +- pkg/agent/wireguard/client_windows_test.go | 2 +- pkg/agent/wireguard/interface.go | 10 +- pkg/agent/wireguard/testing/mock_wireguard.go | 42 +++ pkg/apis/ports.go | 2 + pkg/config/agent/config.go | 7 + 40 files changed, 1058 insertions(+), 133 deletions(-) create mode 100644 pkg/agent/wireguard/testing/mock_wireguard.go diff --git a/build/charts/antrea/README.md b/build/charts/antrea/README.md index bf11667c2d1..9a4702fb39d 100644 --- a/build/charts/antrea/README.md +++ b/build/charts/antrea/README.md @@ -90,6 +90,8 @@ Kubernetes: `>= 1.16.0-0` | multicluster.enablePodToPodConnectivity | bool | `false` | Enable Multi-cluster Pod to Pod connectivity. | | multicluster.enableStretchedNetworkPolicy | bool | `false` | Enable Multi-cluster NetworkPolicy. Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy. | | multicluster.namespace | string | `""` | The Namespace where Antrea Multi-cluster Controller is running. The default is antrea-agent's Namespace. | +| multicluster.trafficEncryptionMode | string | `"none"` | Determines how cross-cluster traffic is encrypted. It has the following options: - none (default): Cross-cluster traffic will not be encrypted. - wireGuard: Enable WireGuard for tunnel traffic encryption. | +| multicluster.wireGuard.port | int | `51821` | WireGuard tunnel port for cross-cluster traffic. | | noSNAT | bool | `false` | Whether or not to SNAT (using the Node IP) the egress traffic from a Pod to the external network. | | nodeIPAM.clusterCIDRs | list | `[]` | CIDR ranges to use when allocating Pod IP addresses. | | nodeIPAM.enable | bool | `false` | Enable Node IPAM in Antrea | @@ -114,7 +116,7 @@ Kubernetes: `>= 1.16.0-0` | tlsCipherSuites | string | `""` | Comma-separated list of cipher suites that will be used by the Antrea APIservers. If empty, the default Go Cipher Suites will be used. See https://golang.org/pkg/crypto/tls/#pkg-constants. | | tlsMinVersion | string | `""` | TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. | | trafficEncapMode | string | `"encap"` | Determines how traffic is encapsulated. It must be one of "encap", "noEncap", "hybrid", or "networkPolicyOnly". | -| trafficEncryptionMode | string | `"none"` | Determines how tunnel traffic is encrypted. Currently encryption only works with encap mode.It must be one of "none", "ipsec", "wireGuard". | +| trafficEncryptionMode | string | `"none"` | Determines how tunnel traffic is encrypted. Currently encryption only works with encap mode. It must be one of "none", "ipsec", "wireGuard". | | transportInterface | string | `""` | Name of the interface on Node which is used for tunneling or routing the traffic across Nodes. | | transportInterfaceCIDRs | list | `[]` | Network CIDRs of the interface on Node which is used for tunneling or routing the traffic across Nodes. | | tunnelCsum | bool | `false` | TunnelCsum determines whether to compute UDP encapsulation header (Geneve or VXLAN) checksums on outgoing packets. For Linux kernel before Mar 2021, UDP checksum must be present to trigger GRO on the receiver for better performance of Geneve and VXLAN tunnels. The issue has been fixed by https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063, thus computing UDP checksum is no longer necessary. It should only be set to true when you are using an unpatched Linux kernel and observing poor transfer performance. | diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 7a80dbb25d4..0566ccf1d67 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -346,6 +346,16 @@ multicluster: enableStretchedNetworkPolicy: {{ .enableStretchedNetworkPolicy }} # Enable Pod to Pod connectivity. enablePodToPodConnectivity: {{ .enablePodToPodConnectivity }} +# Determines how cross-cluster traffic is encrypted. +# It has the following options: +# - none (default): Cross-cluster traffic will not be encrypted. +# - wireGuard: Use WireGuard to encrypt traffic. + trafficEncryptionMode: {{ .trafficEncryptionMode | quote }} +# WireGuard tunnel configuration for cross-cluster traffic. +# It only works when multicluster.trafficEncryptionMode is wireGuard. + wireGuard: + # WireGuard tunnel port for cross-cluster traffic. + port: {{ .wireGuard.port }} {{- end }} {{- if .Values.featureGates.SecondaryNetwork }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 384d3d4e8ac..7db11aebb8e 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -209,6 +209,7 @@ rules: - get - list - watch + - patch - apiGroups: - multicluster.crd.antrea.io resources: diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index d70c2638533..988339e9bf5 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -24,7 +24,7 @@ tunnelPort: 0 # observing poor transfer performance. tunnelCsum: false # -- Determines how tunnel traffic is encrypted. Currently encryption only works -# with encap mode.It must be one of "none", "ipsec", "wireGuard". +# with encap mode. It must be one of "none", "ipsec", "wireGuard". trafficEncryptionMode: "none" # -- Enable bridging mode of Pod network on Nodes, in which the Node's transport # interface is connected to the OVS bridge. @@ -342,6 +342,15 @@ multicluster: enableStretchedNetworkPolicy: false # -- Enable Multi-cluster Pod to Pod connectivity. enablePodToPodConnectivity: false + # -- Determines how cross-cluster traffic is encrypted. + # It has the following options: + # - none (default): Cross-cluster traffic will not be encrypted. + # - wireGuard: Enable WireGuard for tunnel traffic encryption. + trafficEncryptionMode: "none" + # WireGuard tunnel configuration for cross-cluster traffic. + wireGuard: + # -- WireGuard tunnel port for cross-cluster traffic. + port: 51821 testing: ## -- enable code coverage measurement (used when testing Antrea only). diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 56f13f6eecb..540489e82c6 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3262,6 +3262,16 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # Determines how cross-cluster traffic is encrypted. + # It has the following options: + # - none (default): Cross-cluster traffic will not be encrypted. + # - wireGuard: Use WireGuard to encrypt traffic. + trafficEncryptionMode: "none" + # WireGuard tunnel configuration for cross-cluster traffic. + # It only works when multicluster.trafficEncryptionMode is wireGuard. + wireGuard: + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3744,6 +3754,7 @@ rules: - get - list - watch + - patch - apiGroups: - multicluster.crd.antrea.io resources: @@ -4323,7 +4334,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 3af5beafa4cc20ba7f963ed5409de8af66dbd1e185d98a56601d18edf74faba1 + checksum/config: 1deeecf01f782d7520200f50723554203d03b1130dc698488aea43ff6588e522 labels: app: antrea component: antrea-agent @@ -4564,7 +4575,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 3af5beafa4cc20ba7f963ed5409de8af66dbd1e185d98a56601d18edf74faba1 + checksum/config: 1deeecf01f782d7520200f50723554203d03b1130dc698488aea43ff6588e522 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 97b869e9a03..3d2729d2530 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3262,6 +3262,16 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # Determines how cross-cluster traffic is encrypted. + # It has the following options: + # - none (default): Cross-cluster traffic will not be encrypted. + # - wireGuard: Use WireGuard to encrypt traffic. + trafficEncryptionMode: "none" + # WireGuard tunnel configuration for cross-cluster traffic. + # It only works when multicluster.trafficEncryptionMode is wireGuard. + wireGuard: + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3744,6 +3754,7 @@ rules: - get - list - watch + - patch - apiGroups: - multicluster.crd.antrea.io resources: @@ -4323,7 +4334,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 3af5beafa4cc20ba7f963ed5409de8af66dbd1e185d98a56601d18edf74faba1 + checksum/config: 1deeecf01f782d7520200f50723554203d03b1130dc698488aea43ff6588e522 labels: app: antrea component: antrea-agent @@ -4565,7 +4576,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 3af5beafa4cc20ba7f963ed5409de8af66dbd1e185d98a56601d18edf74faba1 + checksum/config: 1deeecf01f782d7520200f50723554203d03b1130dc698488aea43ff6588e522 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index e3ceaf2ad93..1a2800b9d58 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3262,6 +3262,16 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # Determines how cross-cluster traffic is encrypted. + # It has the following options: + # - none (default): Cross-cluster traffic will not be encrypted. + # - wireGuard: Use WireGuard to encrypt traffic. + trafficEncryptionMode: "none" + # WireGuard tunnel configuration for cross-cluster traffic. + # It only works when multicluster.trafficEncryptionMode is wireGuard. + wireGuard: + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3744,6 +3754,7 @@ rules: - get - list - watch + - patch - apiGroups: - multicluster.crd.antrea.io resources: @@ -4323,7 +4334,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 530b8f5633759918bc625c6c3e13b8927c2854687f7a5c5bfd420f1c1e15e3cf + checksum/config: 48031b31477519b363d88a8964b59e4f2afc445e00c41578b020eb8cd3d8922c labels: app: antrea component: antrea-agent @@ -4562,7 +4573,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 530b8f5633759918bc625c6c3e13b8927c2854687f7a5c5bfd420f1c1e15e3cf + checksum/config: 48031b31477519b363d88a8964b59e4f2afc445e00c41578b020eb8cd3d8922c labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index cef78e82f20..505e6b54655 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3275,6 +3275,16 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # Determines how cross-cluster traffic is encrypted. + # It has the following options: + # - none (default): Cross-cluster traffic will not be encrypted. + # - wireGuard: Use WireGuard to encrypt traffic. + trafficEncryptionMode: "none" + # WireGuard tunnel configuration for cross-cluster traffic. + # It only works when multicluster.trafficEncryptionMode is wireGuard. + wireGuard: + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3757,6 +3767,7 @@ rules: - get - list - watch + - patch - apiGroups: - multicluster.crd.antrea.io resources: @@ -4336,7 +4347,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 55bb24adab5185aad87b77004a64ea6a5736a85ed5e35d3f2e565d746e26dcf6 + checksum/config: dae3f1d0ff2e557477f7bbc88eadeb287d2bf4e71a68a7d57cc78fd9c30c8062 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -4621,7 +4632,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 55bb24adab5185aad87b77004a64ea6a5736a85ed5e35d3f2e565d746e26dcf6 + checksum/config: dae3f1d0ff2e557477f7bbc88eadeb287d2bf4e71a68a7d57cc78fd9c30c8062 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 567204110fd..fbaccadcd05 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3262,6 +3262,16 @@ data: enableStretchedNetworkPolicy: false # Enable Pod to Pod connectivity. enablePodToPodConnectivity: false + # Determines how cross-cluster traffic is encrypted. + # It has the following options: + # - none (default): Cross-cluster traffic will not be encrypted. + # - wireGuard: Use WireGuard to encrypt traffic. + trafficEncryptionMode: "none" + # WireGuard tunnel configuration for cross-cluster traffic. + # It only works when multicluster.trafficEncryptionMode is wireGuard. + wireGuard: + # WireGuard tunnel port for cross-cluster traffic. + port: 51821 antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3744,6 +3754,7 @@ rules: - get - list - watch + - patch - apiGroups: - multicluster.crd.antrea.io resources: @@ -4323,7 +4334,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ad7f3df72a2eadf6a2ab30580d81d6ffc43838b464119d100e0646c348214524 + checksum/config: 261db0ce9a97fec9ea24c49d637d5f72612024f1ecd158ae0ab0577211a900a6 labels: app: antrea component: antrea-agent @@ -4562,7 +4573,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ad7f3df72a2eadf6a2ab30580d81d6ffc43838b464119d100e0646c348214524 + checksum/config: 261db0ce9a97fec9ea24c49d637d5f72612024f1ecd158ae0ab0577211a900a6 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 99b7369864e..6b2a7173136 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -344,8 +344,9 @@ func run(o *Options) error { ciImportInformer, ofClient, nodeConfig, - o.config.Multicluster.EnableStretchedNetworkPolicy, - o.config.Multicluster.EnablePodToPodConnectivity, + networkConfig, + routeClient, + o.config.Multicluster, ) if networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap { mcPodRouteController = mcroute.NewMCPodRouteController( diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 6d5ab837194..899819c7cb8 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -178,6 +178,9 @@ func (o *Options) setDefaults() { } else { o.setExternalNodeDefaultOptions() } + if o.config.Multicluster.EnableGateway { + o.setMulticlusterDefaultOptions() + } } func (o *Options) validateTLSOptions() error { @@ -324,6 +327,10 @@ func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeTy if !o.config.Multicluster.EnableGateway && o.config.Multicluster.EnableStretchedNetworkPolicy { return fmt.Errorf("Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy") } + _, multiclusterEncryptionMode := config.GetTrafficEncryptionModeFromStr(o.config.Multicluster.TrafficEncryptionMode) + if multiclusterEncryptionMode == config.TrafficEncryptionModeWireGuard && encryptionMode != config.TrafficEncryptionModeNone { + return fmt.Errorf("Antrea Multi-cluster WireGuard does not support in-cluster encryption mode %s", o.config.TrafficEncryptionMode) + } if encapMode.SupportsEncap() && encryptionMode == config.TrafficEncryptionModeWireGuard { return fmt.Errorf("Multi-cluster Gateway doesn't support in-cluster WireGuard encryption") @@ -618,3 +625,12 @@ func (o *Options) setExternalNodeDefaultOptions() { o.config.ExternalNode.ExternalNodeNamespace = "default" } } + +func (o *Options) setMulticlusterDefaultOptions() { + _, trafficEncryptionModeType := config.GetTrafficEncryptionModeFromStr(o.config.Multicluster.TrafficEncryptionMode) + if trafficEncryptionModeType == config.TrafficEncryptionModeWireGuard { + if o.config.Multicluster.WireGuard.Port == 0 { + o.config.Multicluster.WireGuard.Port = apis.MulticlusterWireGuardListenPort + } + } +} diff --git a/docs/network-requirements.md b/docs/network-requirements.md index cb7cbf05abd..ed80456882a 100644 --- a/docs/network-requirements.md +++ b/docs/network-requirements.md @@ -3,16 +3,17 @@ Antrea has a few network requirements to get started, ensure that your hosts and firewalls allow the necessary traffic based on your configuration. -| Configuration | Host(s) | ports/protocols | Other | -|-------------------------------|---------------------|--------------------------------------------|------------------------------| -| Antrea with VXLAN enabled | All | UDP 4789 | | -| Antrea with Geneve enabled | All | UDP 6081 | | -| Antrea with STT enabled | All | TCP 7471 | | -| Antrea with GRE enabled | All | IP Protocol ID 47 | No support for IPv6 clusters | -| Antrea with IPsec ESP enabled | All | IP protocol ID 50 and 51, UDP 500 and 4500 | | -| Antrea with WireGuard enabled | All | UDP 51820 | | -| All | kube-apiserver host | TCP 443 or 6443\* | | -| All | All | TCP 10349, 10350, 10351, UDP 10351 | | +| Configuration | Host(s) | ports/protocols | Other | +|------------------------------------------------|----------------------------|--------------------------------------------|------------------------------| +| Antrea with VXLAN enabled | All | UDP 4789 | | +| Antrea with Geneve enabled | All | UDP 6081 | | +| Antrea with STT enabled | All | TCP 7471 | | +| Antrea with GRE enabled | All | IP Protocol ID 47 | No support for IPv6 clusters | +| Antrea with IPsec ESP enabled | All | IP protocol ID 50 and 51, UDP 500 and 4500 | | +| Antrea with WireGuard enabled | All | UDP 51820 | | +| Antrea Multi-cluster with WireGuard encryption | Multi-cluster Gateway Node | UDP 51821 | | +| All | kube-apiserver host | TCP 443 or 6443\* | | +| All | All | TCP 10349, 10350, 10351, UDP 10351 | | \* _The value passed to kube-apiserver using the --secure-port flag. If you cannot locate this, check the targetPort value returned by kubectl get svc kubernetes -o yaml._ diff --git a/multicluster/apis/multicluster/v1alpha1/gateway_types.go b/multicluster/apis/multicluster/v1alpha1/gateway_types.go index d90aa4295a8..004b843edd3 100644 --- a/multicluster/apis/multicluster/v1alpha1/gateway_types.go +++ b/multicluster/apis/multicluster/v1alpha1/gateway_types.go @@ -25,6 +25,12 @@ type GatewayInfo struct { GatewayIP string `json:"gatewayIP,omitempty"` } +// WireGuardInfo includes information of a WireGuard tunnel. +type WireGuardInfo struct { + // Public key of the WireGuard tunnel. + PublicKey string `json:"publicKey,omitempty"` +} + // +genclient //+kubebuilder:object:root=true //+kubebuilder:subresource:status @@ -42,7 +48,8 @@ type Gateway struct { // In-cluster tunnel IP of the Gateway. InternalIP string `json:"internalIP,omitempty"` // Service CIDR of the local member cluster. - ServiceCIDR string `json:"serviceCIDR,omitempty"` + ServiceCIDR string `json:"serviceCIDR,omitempty"` + WireGuard *WireGuardInfo `json:"wireGuard,omitempty"` } type ClusterInfo struct { @@ -53,7 +60,8 @@ type ClusterInfo struct { // GatewayInfos has information of Gateways GatewayInfos []GatewayInfo `json:"gatewayInfos,omitempty"` // PodCIDRs is the Pod IP address CIDRs. - PodCIDRs []string `json:"podCIDRs,omitempty"` + PodCIDRs []string `json:"podCIDRs,omitempty"` + WireGuard *WireGuardInfo `json:"wireGuard,omitempty"` } //+kubebuilder:object:root=true diff --git a/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go b/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go index 1ab5154cc7c..316a7099a9e 100644 --- a/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go +++ b/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go @@ -56,6 +56,11 @@ func (in *ClusterInfo) DeepCopyInto(out *ClusterInfo) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.WireGuard != nil { + in, out := &in.WireGuard, &out.WireGuard + *out = new(WireGuardInfo) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterInfo. @@ -385,6 +390,11 @@ func (in *Gateway) DeepCopyInto(out *Gateway) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.WireGuard != nil { + in, out := &in.WireGuard, &out.WireGuard + *out = new(WireGuardInfo) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Gateway. @@ -1034,3 +1044,18 @@ func (in *ServiceExport) DeepCopy() *ServiceExport { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WireGuardInfo) DeepCopyInto(out *WireGuardInfo) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WireGuardInfo. +func (in *WireGuardInfo) DeepCopy() *WireGuardInfo { + if in == nil { + return nil + } + out := new(WireGuardInfo) + in.DeepCopyInto(out) + return out +} diff --git a/multicluster/build/yamls/antrea-multicluster-leader-global.yml b/multicluster/build/yamls/antrea-multicluster-leader-global.yml index b361d645ec4..9e130736ccd 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-global.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-global.yml @@ -387,6 +387,14 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard + tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object clusterNetworkPolicy: description: If exported resource is AntreaClusterNetworkPolicy. @@ -3079,6 +3087,14 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard + tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object clusternetworkpolicy: description: If imported resource is AntreaClusterNetworkPolicy. diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index 13f02e73882..15211442dcb 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -119,6 +119,13 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object status: description: ClusterInfoImportStatus defines the observed state of ClusterInfoImport. @@ -401,6 +408,13 @@ spec: serviceCIDR: description: Service CIDR of the local member cluster. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object served: true storage: true diff --git a/multicluster/cmd/multicluster-controller/gateway_webhook.go b/multicluster/cmd/multicluster-controller/gateway_webhook.go index b05f6c545d2..a62095ddacf 100644 --- a/multicluster/cmd/multicluster-controller/gateway_webhook.go +++ b/multicluster/cmd/multicluster-controller/gateway_webhook.go @@ -30,6 +30,10 @@ import ( mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" ) +const ( + antreaAgentSAName = "antrea-agent" +) + //+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha1-gateway,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=gateways,verbs=create;update,versions=v1alpha1,name=vgateway.kb.io,admissionReviewVersions={v1,v1beta1} // Gateway validator @@ -56,8 +60,8 @@ func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) ad klog.ErrorS(err, "Error getting ServiceAccount name", "Gateway", req.Namespace+"/"+req.Name) return admission.Errored(http.StatusBadRequest, err) } - if saName != mcControllerSAName { - return admission.Errored(http.StatusPreconditionFailed, fmt.Errorf("Gateway can only be created or updated by Antrea Multi-cluster controller")) + if saName != mcControllerSAName && saName != antreaAgentSAName { + return admission.Errored(http.StatusPreconditionFailed, fmt.Errorf("Gateway can only be created or updated by Antrea Agent or Multi-cluster Controller")) } } return admission.Allowed("") diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml index 5bb209cbfc9..dd20354d6fe 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterinfoimports.yaml @@ -65,6 +65,13 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object status: description: ClusterInfoImportStatus defines the observed state of ClusterInfoImport. diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_gateways.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_gateways.yaml index e04914d3dfa..3f899ecd4a5 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_gateways.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_gateways.yaml @@ -53,6 +53,13 @@ spec: serviceCIDR: description: Service CIDR of the local member cluster. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object served: true storage: true diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml index fc5d680b9a5..67eb0134d15 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceexports.yaml @@ -82,6 +82,14 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard + tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object clusterNetworkPolicy: description: If exported resource is AntreaClusterNetworkPolicy. diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml index 1e0dc7a17c2..f95f24a776c 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_resourceimports.yaml @@ -80,6 +80,14 @@ spec: serviceCIDR: description: ServiceCIDR is the IP ranges used by Service ClusterIP. type: string + wireGuard: + description: WireGuardInfo includes information of a WireGuard + tunnel. + properties: + publicKey: + description: Public key of the WireGuard tunnel. + type: string + type: object type: object clusternetworkpolicy: description: If imported resource is AntreaClusterNetworkPolicy. diff --git a/multicluster/controllers/multicluster/member/gateway_controller.go b/multicluster/controllers/multicluster/member/gateway_controller.go index 3bd27ffffd8..b80d4c19d68 100644 --- a/multicluster/controllers/multicluster/member/gateway_controller.go +++ b/multicluster/controllers/multicluster/member/gateway_controller.go @@ -139,12 +139,7 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R Name: r.localClusterID, Namespace: r.namespace, } - resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{ - ClusterID: r.localClusterID, - ServiceCIDR: gw.ServiceCIDR, - PodCIDRs: r.podCIDRs, - GatewayInfos: []mcsv1alpha1.GatewayInfo{{GatewayIP: gw.GatewayIP}}, - } + resExportSpec.ClusterInfo = r.getClusterInfo(gw) klog.V(2).InfoS("Updating ClusterInfo kind of ResourceExport", "clusterinfo", klog.KObj(existingResExport), "gateway", req.NamespacedName) existingResExport.Spec = resExportSpec @@ -162,16 +157,7 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R Name: r.localClusterID, Namespace: r.namespace, } - resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{ - ClusterID: r.localClusterID, - ServiceCIDR: gateway.ServiceCIDR, - PodCIDRs: r.podCIDRs, - GatewayInfos: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: gateway.GatewayIP, - }, - }, - } + resExportSpec.ClusterInfo = r.getClusterInfo(gateway) resExport := &mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.leaderNamespace, @@ -198,3 +184,23 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error { }). Complete(r) } + +func (r *GatewayReconciler) getClusterInfo(gateway *mcsv1alpha1.Gateway) *mcsv1alpha1.ClusterInfo { + clusterInfo := &mcsv1alpha1.ClusterInfo{ + ClusterID: r.localClusterID, + ServiceCIDR: gateway.ServiceCIDR, + PodCIDRs: r.podCIDRs, + GatewayInfos: []mcsv1alpha1.GatewayInfo{ + { + GatewayIP: gateway.GatewayIP, + }, + }, + } + if gateway.WireGuard != nil && gateway.WireGuard.PublicKey != "" { + clusterInfo.WireGuard = &mcsv1alpha1.WireGuardInfo{ + PublicKey: gateway.WireGuard.PublicKey, + } + } + + return clusterInfo +} diff --git a/multicluster/controllers/multicluster/member/gateway_controller_test.go b/multicluster/controllers/multicluster/member/gateway_controller_test.go index 4cbac884a08..5d289ec3a2c 100644 --- a/multicluster/controllers/multicluster/member/gateway_controller_test.go +++ b/multicluster/controllers/multicluster/member/gateway_controller_test.go @@ -185,3 +185,33 @@ func TestGatewayReconciler(t *testing.T) { }) } } + +func TestGetClusterInfo(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build() + r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, nil) + gw := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gw", + }, + ServiceCIDR: "10.100.0.0/16", + GatewayIP: "10.10.1.1", + InternalIP: "10.10.1.1", + WireGuard: &mcsv1alpha1.WireGuardInfo{ + PublicKey: "key", + }, + } + expectedClusterInfo := &mcsv1alpha1.ClusterInfo{ + GatewayInfos: []mcsv1alpha1.GatewayInfo{ + { + GatewayIP: "10.10.1.1", + }, + }, + ServiceCIDR: "10.100.0.0/16", + PodCIDRs: []string{"10.200.1.1/16"}, + WireGuard: &mcsv1alpha1.WireGuardInfo{ + PublicKey: "key", + }, + } + + assert.Equal(t, expectedClusterInfo, r.getClusterInfo(gw)) +} diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index da32d2f3be1..dc8da7ed0f6 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -1086,13 +1086,31 @@ func (i *Initializer) waitForIPsecMonitorDaemon() error { // initializeWireguard checks if preconditions are met for using WireGuard and initializes WireGuard client or cleans up. func (i *Initializer) initializeWireGuard() error { i.wireGuardConfig.MTU = i.nodeConfig.NodeTransportInterfaceMTU - config.WireGuardOverhead - wgClient, err := wireguard.New(i.client, i.nodeConfig, i.wireGuardConfig) + wgClient, err := wireguard.New(i.nodeConfig, i.wireGuardConfig) if err != nil { return err } i.wireGuardClient = wgClient - return i.wireGuardClient.Init() + publicKey, err := i.wireGuardClient.Init(nil, nil) + if err != nil { + return err + } + + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + types.NodeWireGuardPublicAnnotationKey: publicKey, + }, + }, + }) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _, err := i.client.CoreV1().Nodes().Patch(context.TODO(), i.nodeConfig.Name, apitypes.MergePatchType, patch, metav1.PatchOptions{}, "status") + return err + }); err != nil { + return fmt.Errorf("error when patching the Node with the '%s' annotation: %w", types.NodeWireGuardPublicAnnotationKey, err) + } + return err } // readIPSecPSK reads the IPsec PSK value from environment variable ANTREA_IPSEC_PSK diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 133ec539eb1..0593213c529 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -15,14 +15,20 @@ package multicluster import ( + "context" + "encoding/json" "fmt" "net" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + apitypes "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -32,6 +38,9 @@ import ( mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow" + antrearoute "antrea.io/antrea/pkg/agent/route" + "antrea.io/antrea/pkg/agent/wireguard" + "antrea.io/antrea/pkg/config/agent" ) const ( @@ -43,9 +52,14 @@ const ( minRetryDelay = 2 * time.Second maxRetryDelay = 120 * time.Second - // Default number of workers processing a resource change - defaultWorkers = 1 - workerItemKey = "key" + workerItemKey = "key" + + multiclusterWireGuardInterface = "antrea-mc-wg0" + multiclusterWireGuardPublicKey = "publicKey" +) + +var ( + wireGuardNewFunc = wireguard.New ) // MCDefaultRouteController watches Gateway and ClusterInfoImport events. @@ -54,7 +68,11 @@ const ( type MCDefaultRouteController struct { mcClient mcclientset.Interface ofClient openflow.Client + routeClient antrearoute.Interface + wireGuardClient wireguard.Interface nodeConfig *config.NodeConfig + networkConfig *config.NetworkConfig + wireGuardConfig *config.WireGuardConfig gwInformer mcinformersv1alpha1.GatewayInformer gwLister mclisters.GatewayLister gwListerSynced cache.InformerSynced @@ -65,13 +83,16 @@ type MCDefaultRouteController struct { // installedCIImports is for saving ClusterInfos which have been processed // in MCDefaultRouteController. Need to use mutex to protect 'installedCIImports' if // we change the number of 'defaultWorkers'. - installedCIImports map[string]*mcv1alpha1.ClusterInfoImport - // Need to use mutex to protect 'installedActiveGW' if we change - // the number of 'defaultWorkers' to run multiple go routines to handle - // events. - installedActiveGW *mcv1alpha1.Gateway + installedCIImports map[string]*mcv1alpha1.ClusterInfoImport + installedWireGuardPeers map[string]*mcv1alpha1.ClusterInfoImport + // Need to use mutex to protect 'installedActiveGW' if we change to + // use multiple go routines to handle events + installedActiveGW *mcv1alpha1.Gateway + // The Namespace where Antrea Multi-cluster Controller is running. + namespace string enableStretchedNetworkPolicy bool enablePodToPodConnectivity bool + wireGuardInitialized bool } func NewMCDefaultRouteController( @@ -80,13 +101,16 @@ func NewMCDefaultRouteController( ciImportInformer mcinformersv1alpha1.ClusterInfoImportInformer, client openflow.Client, nodeConfig *config.NodeConfig, - enableStretchedNetworkPolicy bool, - enablePodToPodConnectivity bool, + networkConfig *config.NetworkConfig, + routeClient antrearoute.Interface, + multiclusterConfig agent.MulticlusterConfig, ) *MCDefaultRouteController { controller := &MCDefaultRouteController{ mcClient: mcClient, ofClient: client, + routeClient: routeClient, nodeConfig: nodeConfig, + networkConfig: networkConfig, gwInformer: gwInformer, gwLister: gwInformer.Lister(), gwListerSynced: gwInformer.Informer().HasSynced, @@ -95,8 +119,18 @@ func NewMCDefaultRouteController( ciImportListerSynced: ciImportInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "gatewayroute"), installedCIImports: make(map[string]*mcv1alpha1.ClusterInfoImport), - enableStretchedNetworkPolicy: enableStretchedNetworkPolicy, - enablePodToPodConnectivity: enablePodToPodConnectivity, + installedWireGuardPeers: make(map[string]*mcv1alpha1.ClusterInfoImport), + namespace: multiclusterConfig.Namespace, + enableStretchedNetworkPolicy: multiclusterConfig.EnableStretchedNetworkPolicy, + enablePodToPodConnectivity: multiclusterConfig.EnablePodToPodConnectivity, + } + _, trafficEncryptionMode := config.GetTrafficEncryptionModeFromStr(multiclusterConfig.TrafficEncryptionMode) + if trafficEncryptionMode == config.TrafficEncryptionModeWireGuard { + controller.wireGuardConfig = &config.WireGuardConfig{ + Port: multiclusterConfig.WireGuard.Port, + Name: multiclusterWireGuardInterface, + MTU: controller.nodeConfig.NodeTransportInterfaceMTU - controller.networkConfig.MTUDeduction - config.WireGuardOverhead, + } } controller.gwInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -182,7 +216,7 @@ func (c *MCDefaultRouteController) enqueueClusterInfoImport(obj interface{}, isD c.queue.Add(workerItemKey) } -// Run will create defaultWorkers workers (go routines) which will process +// Run will create a worker (go routines) which will process // the Gateway events from the workqueue. func (c *MCDefaultRouteController) Run(stopCh <-chan struct{}) { defer c.queue.ShutDown() @@ -193,9 +227,7 @@ func (c *MCDefaultRouteController) Run(stopCh <-chan struct{}) { return } - for i := 0; i < defaultWorkers; i++ { - go wait.Until(c.worker, time.Second, stopCh) - } + go wait.Until(c.worker, time.Second, stopCh) <-stopCh } @@ -213,20 +245,182 @@ func (c *MCDefaultRouteController) processNextWorkItem() bool { } defer c.queue.Done(obj) - if k, ok := obj.(string); !ok { + key, ok := obj.(string) + if !ok { c.queue.Forget(obj) klog.InfoS("Expected string in work queue but got", "object", obj) return true - } else if err := c.syncMCFlows(); err == nil { - c.queue.Forget(k) + } + + syncFn := func() error { + if c.wireGuardConfig != nil { + if err := c.syncWireGuard(); err != nil { + return err + } + } + return c.syncMCFlows() + } + if err := syncFn(); err == nil { + c.queue.Forget(key) } else { // Put the item back on the workqueue to handle any transient errors. - c.queue.AddRateLimited(k) - klog.ErrorS(err, "Error syncing key, requeuing", "key", k) + c.queue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing key, requeuing", "key", key) } return true } +// syncWireGuard reconciles WireGuard configurations in following way: +// 1. If the current Node is the Multi-cluster Gateway Node, controller will try to initialize corresponding WireGuard +// configuration and route on the host, then add all existing Gateway Nodes in other member clusters as WireGuard peers. +// 2. If the current Node is not Multi-cluster Gateway Node, controller will try to clean up WireGuard configurations. +// +// Note: MCDefaultRouteController runs only one worker to process Gateway and ClusterInfoImport. So we do not need +// any synchronization mechanism. +func (c *MCDefaultRouteController) syncWireGuard() error { + gateway, err := c.getActiveGateway() + if err != nil { + return err + } + + amIGateway := gateway != nil && gateway.Name == c.nodeConfig.Name + if c.wireGuardClient != nil && (!amIGateway || !c.wireGuardInitialized) { + if err := c.cleanUpWireGuard(); err != nil { + return err + } + c.wireGuardInitialized = false + } + if !amIGateway { + return nil + } + if !c.wireGuardInitialized { + if initErr := c.initializeWireGuard(gateway); initErr != nil { + if err := c.cleanUpWireGuard(); err != nil { + klog.ErrorS(err, "Failed to clean up WireGuard") + } + return initErr + } + c.wireGuardInitialized = true + } + ciImports, err := c.ciImportLister.List(labels.Everything()) + if err != nil { + return err + } + + desiredCIImports := sets.NewString() + var updateErr []error + for _, ciImport := range ciImports { + desiredCIImports.Insert(ciImport.Name) + if ciImportCache, ok := c.installedWireGuardPeers[ciImport.Name]; ok && !isWireGuardInfoChanged(ciImportCache, ciImport) { + klog.V(2).InfoS("The ClusterInfoImport did not change, skip updating WireGuard peer", "ClusterInfoImport", klog.KObj(ciImport)) + } + if err = c.addWireGuardRouteAndPeer(ciImport); err != nil { + klog.ErrorS(err, "Failed to update WireGuard peer", "ClusterInfoImport", klog.KObj(ciImport)) + updateErr = append(updateErr, err) + } + c.installedWireGuardPeers[ciImport.Name] = ciImport + } + if len(updateErr) > 0 { + return utilerrors.NewAggregate(updateErr) + } + + // Check cache and existing ClusterInfoImports, clean up routes and WireGuard peers of the + // removed ClusterInfoImports. + for ciName, ciImport := range c.installedWireGuardPeers { + if desiredCIImports.Has(ciName) { + continue + } + if err := c.removeWireGuardRouteAndPeer(ciImport); err != nil { + return err + } + delete(c.installedWireGuardPeers, ciName) + } + + return nil +} + +func (c *MCDefaultRouteController) removeWireGuardRouteAndPeer(ciImport *mcv1alpha1.ClusterInfoImport) error { + remoteGatewayIP, _, _ := net.ParseCIDR(ciImport.Spec.ServiceCIDR) + dstCIDR := net.IPNet{IP: remoteGatewayIP, Mask: net.CIDRMask(32, 32)} + if err := c.routeClient.DeleteRouteForLink(&dstCIDR, c.wireGuardConfig.LinkIndex); err != nil { + return err + } + return c.wireGuardClient.DeletePeer(ciImport.Name) +} + +// addWireGuardRouteAndPeer tries to update a WireGuard peer with ClusterInfoImport. If updating successfully, +// it will also create host route to WireGuard peer. +func (c *MCDefaultRouteController) addWireGuardRouteAndPeer(ciImport *mcv1alpha1.ClusterInfoImport) error { + if ciImport.Spec.WireGuard == nil || ciImport.Spec.WireGuard.PublicKey == "" { + klog.V(2).InfoS("ClusterInfoImport's WireGuard field has not been initialized, skip it", "ClusterInfoImport", klog.KObj(ciImport)) + return nil + } + + klog.V(2).InfoS("Updating WireGuard peer with ClusterInfoImport", "ClusterInfoImport", klog.KObj(ciImport)) + // The cross-cluster traffic will be both encapsulated and encrypted. To avoid routing loop, we use a tunnel endpoint + // IP different from the WireGuard endpoint IP. Since the ServiceCIDR is guaranteed to be unique across member clusters, + // we choose the ServiceCIDR's network address as the tunnel endpoint IP. For instance, if a cluster's ServiceCIDR is + // 10.96.0.0/16, 10.96.0.0 will be used as the tunnel endpoint IP of the cluster's Gateway Node. + remoteWireGuardIP, _, err := net.ParseCIDR(ciImport.Spec.ServiceCIDR) + if err != nil { + return err + } + remoteWireGuardNet := &net.IPNet{IP: remoteWireGuardIP, Mask: net.CIDRMask(32, 32)} + + gatewayIP := net.ParseIP(ciImport.Spec.GatewayInfos[0].GatewayIP) + allowedIPs := []*net.IPNet{remoteWireGuardNet} + if err := c.wireGuardClient.UpdatePeer(ciImport.Name, ciImport.Spec.WireGuard.PublicKey, gatewayIP, allowedIPs); err != nil { + return err + } + + klog.V(2).InfoS("Adding route on the host", "CIDR", remoteWireGuardNet, "device", c.wireGuardConfig.Name) + return c.routeClient.AddRouteForLink(remoteWireGuardNet, c.wireGuardConfig.LinkIndex) +} + +// initializeWireGuard initializes the WireGuard interface and client. +// It will also update Gateway's WireGuard field. +func (c *MCDefaultRouteController) initializeWireGuard(gateway *mcv1alpha1.Gateway) error { + wgClient, err := wireGuardNewFunc(c.nodeConfig, c.wireGuardConfig) + if err != nil { + return err + } + c.wireGuardClient = wgClient + + wireGuardInterfaceIP, _, err := net.ParseCIDR(gateway.ServiceCIDR) + if err != nil { + return err + } + publicKey, err := c.wireGuardClient.Init(wireGuardInterfaceIP, nil) + if err != nil { + return err + } + + patch, _ := json.Marshal(map[string]interface{}{ + "wireGuard": map[string]interface{}{ + multiclusterWireGuardPublicKey: publicKey, + }, + }) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _, err := c.mcClient.MulticlusterV1alpha1().Gateways(c.namespace).Patch(context.TODO(), c.nodeConfig.Name, apitypes.MergePatchType, patch, + metav1.PatchOptions{}) + return err + }); err != nil { + return fmt.Errorf("error when patching the Gateway with WireGuard information, error: %s", err) + } + + return nil +} + +// cleanUpWireGuard deletes the WireGuard interface on the host. +// The WireGuard route will also be deleted automatically when the interface is deleted. +func (c *MCDefaultRouteController) cleanUpWireGuard() error { + if err := c.wireGuardClient.CleanUp(); err != nil { + return err + } + c.wireGuardClient = nil + return nil +} + func (c *MCDefaultRouteController) syncMCFlows() error { startTime := time.Now() defer func() { @@ -325,7 +519,7 @@ func (c *MCDefaultRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.G func (c *MCDefaultRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, installedCIImp *mcv1alpha1.ClusterInfoImport, activeGWChanged bool) error { - tunnelPeerIPToRemoteGW := getPeerGatewayIP(ciImport.Spec) + tunnelPeerIPToRemoteGW := getPeerGatewayTunnelIP(ciImport.Spec, c.wireGuardConfig != nil) if tunnelPeerIPToRemoteGW == nil { klog.ErrorS(nil, "The ClusterInfoImport has no valid Gateway IP, skip it", "clusterinfoimport", klog.KObj(ciImport)) return nil @@ -333,7 +527,7 @@ func (c *MCDefaultRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1 var ciImportNoChange bool if installedCIImp != nil { - oldTunnelPeerIPToRemoteGW := getPeerGatewayIP(installedCIImp.Spec) + oldTunnelPeerIPToRemoteGW := getPeerGatewayTunnelIP(installedCIImp.Spec, c.wireGuardConfig != nil) ciImportNoChange = oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR if c.enablePodToPodConnectivity { ciImportNoChange = ciImportNoChange && sets.NewString(installedCIImp.Spec.PodCIDRs...).Equal(sets.NewString(ciImport.Spec.PodCIDRs...)) @@ -358,7 +552,11 @@ func (c *MCDefaultRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1 } if activeGW.Name == c.nodeConfig.Name { klog.V(2).InfoS("Adding/updating flows to remote Gateway Node for Multi-cluster traffic", "clusterinfoimport", ciImport.Name, "cidrs", allCIDRs) - localGatewayIP := net.ParseIP(activeGW.GatewayIP) + localGatewayIP := getLocalGatewayIP(activeGW, c.wireGuardConfig != nil) + if localGatewayIP == nil { + klog.V(2).InfoS("Local Gateway IP has not been allocated, skip", "gateway", klog.KObj(activeGW)) + return nil + } if err := c.ofClient.InstallMulticlusterGatewayFlows( ciImport.Name, peerConfigs, @@ -436,10 +634,47 @@ func generatePeerConfigs(subnets []string, gatewayIP net.IP) (map[*net.IPNet]net return peerConfigs, nil } -// getPeerGatewayIP will always return the first Gateway IP. -func getPeerGatewayIP(spec mcv1alpha1.ClusterInfo) net.IP { +// If WireGuard is disabled, getPeerGatewayTunnelIP will return Gateway's GatewayIP. +// If WireGuard is enabled, the WireGuard interfaces use the first IP address of ServiceCIDR +// as its IP address. So getPeerGatewayTunnelIP will return the first IP of the ServiceCIDR +// as the remote Gateway tunnel IP. +func getPeerGatewayTunnelIP(spec mcv1alpha1.ClusterInfo, enableWireGuard bool) net.IP { + if enableWireGuard { + if spec.ServiceCIDR == "" { + klog.InfoS("The ServiceCIDR of the peer cluster has not been updated, skip it", "clusterID", spec.ClusterID) + return nil + } + _, serviceCIDR, _ := net.ParseCIDR(spec.ServiceCIDR) + return serviceCIDR.IP + } if len(spec.GatewayInfos) == 0 { return nil } return net.ParseIP(spec.GatewayInfos[0].GatewayIP) } + +func getLocalGatewayIP(gateway *mcv1alpha1.Gateway, enableWireGuard bool) net.IP { + if enableWireGuard { + if gateway.ServiceCIDR == "" { + klog.InfoS("The ServiceCIDR of the Gateway has not been updated, skip it", "Gateway", klog.KObj(gateway)) + return nil + } + localGatewayIP, _, _ := net.ParseCIDR(gateway.ServiceCIDR) + return localGatewayIP + } + return net.ParseIP(gateway.GatewayIP) +} + +// isWireGuardInfoChanged checks the information in ClusterInfoImport needed by WireGuard change or not. +func isWireGuardInfoChanged(cache, cur *mcv1alpha1.ClusterInfoImport) bool { + if cache.Spec.ServiceCIDR != cur.Spec.ServiceCIDR { + return true + } + if cache.Spec.WireGuard == nil && cur.Spec.WireGuard == nil { + return false + } + if cache.Spec.WireGuard == nil || cur.Spec.WireGuard == nil { + return true + } + return cache.Spec.WireGuard.PublicKey != cur.Spec.WireGuard.PublicKey +} diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index 7e82226f0c0..da05db5ab1d 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -21,13 +21,21 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/agent/config" oftest "antrea.io/antrea/pkg/agent/openflow/testing" + antrearoute "antrea.io/antrea/pkg/agent/route" + routemock "antrea.io/antrea/pkg/agent/route/testing" + "antrea.io/antrea/pkg/agent/wireguard" + wireguardmock "antrea.io/antrea/pkg/agent/wireguard/testing" + "antrea.io/antrea/pkg/config/agent" ) type fakeRouteController struct { @@ -37,7 +45,13 @@ type fakeRouteController struct { ofClient *oftest.MockClient } -func newMCDefaultRouteController(t *testing.T, nodeConfig *config.NodeConfig) *fakeRouteController { +func newMCDefaultRouteController(t *testing.T, + nodeConfig *config.NodeConfig, + networkConfig *config.NetworkConfig, + wireGuardConfig agent.WireGuardConfig, + routeClient antrearoute.Interface, + trafficEncryptionMode string, +) *fakeRouteController { mcClient := mcfake.NewSimpleClientset() mcInformerFactory := mcinformers.NewSharedInformerFactoryWithOptions(mcClient, 60*time.Second, @@ -45,6 +59,16 @@ func newMCDefaultRouteController(t *testing.T, nodeConfig *config.NodeConfig) *f ) gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() ciImportInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports() + + multiclusterConfig := agent.MulticlusterConfig{ + Enable: true, + EnableGateway: true, + Namespace: "default", + EnableStretchedNetworkPolicy: true, + EnablePodToPodConnectivity: true, + WireGuard: wireGuardConfig, + TrafficEncryptionMode: trafficEncryptionMode, + } ctrl := gomock.NewController(t) ofClient := oftest.NewMockClient(ctrl) c := NewMCDefaultRouteController( @@ -53,8 +77,9 @@ func newMCDefaultRouteController(t *testing.T, nodeConfig *config.NodeConfig) *f ciImportInformer, ofClient, nodeConfig, - true, - true, + networkConfig, + routeClient, + multiclusterConfig, ) return &fakeRouteController{ MCDefaultRouteController: c, @@ -67,6 +92,7 @@ func newMCDefaultRouteController(t *testing.T, nodeConfig *config.NodeConfig) *f var ( gw1CreationTime = metav1.NewTime(time.Now()) gw2CreationTime = metav1.NewTime(time.Now().Add(10 * time.Minute)) + gw4CreationTime = metav1.NewTime(time.Now()) gateway1 = mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", @@ -88,6 +114,20 @@ var ( gw1GatewayIP = net.ParseIP(gateway1.GatewayIP) gw2InternalIP = net.ParseIP(gateway2.InternalIP) + gateway4 = mcv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-4", + Namespace: "default", + CreationTimestamp: gw4CreationTime, + }, + GatewayIP: "172.17.0.14", + InternalIP: "192.17.0.14", + ServiceCIDR: "10.100.0.0/16", + WireGuard: &mcv1alpha1.WireGuardInfo{ + PublicKey: "key", + }, + } + clusterInfoImport1 = mcv1alpha1.ClusterInfoImport{ ObjectMeta: metav1.ObjectMeta{ Name: "cluster-b-default-clusterinfo", @@ -119,10 +159,98 @@ var ( }, }, } + + clusterInfoImport3 = mcv1alpha1.ClusterInfoImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-d-default-clusterinfo", + Namespace: "default", + }, + Spec: mcv1alpha1.ClusterInfo{ + ClusterID: "cluster-d", + ServiceCIDR: "14.14.4.0/12", + GatewayInfos: []mcv1alpha1.GatewayInfo{ + { + GatewayIP: "12.13.0.10", + }, + }, + PodCIDRs: []string{ + "10.10.0.0/16", + }, + WireGuard: &mcv1alpha1.WireGuardInfo{ + PublicKey: "key", + }, + }, + } ) +func TestMCRouteControllerAsWireGuardGateway(t *testing.T) { + ctrl := gomock.NewController(t) + mockInterface := routemock.NewMockInterface(ctrl) + networkConfig := &config.NetworkConfig{} + c := newMCDefaultRouteController(t, + &config.NodeConfig{ + Name: "node-4", + PodIPv4CIDR: &net.IPNet{ + IP: net.ParseIP("10.10.0.0"), + }, + }, + networkConfig, + agent.WireGuardConfig{}, + mockInterface, + "wireGuard", + ) + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + wireGuardNewFunc = func(nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig) (wireguard.Interface, error) { + return &wireguardmock.MockWireGuardClient{}, nil + } + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + + // Create Gateway4 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway4.GetNamespace()).Create(context.TODO(), + &gateway4, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) + c.processNextWorkItem() + c.processNextWorkItem() + + // Create ClusterInfoImport3 + c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport3.GetNamespace()). + Create(context.TODO(), &clusterInfoImport3, metav1.CreateOptions{}) + peerNodeIP3 := getPeerGatewayTunnelIP(clusterInfoImport3.Spec, true) + c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport3.Name, + gomock.Any(), peerNodeIP3, gomock.Any(), true).Times(1) + mockInterface.EXPECT().AddRouteForLink(gomock.Any(), 0).Times(1) + c.processNextWorkItem() + + // Delete Gateway + c.mcClient.MulticlusterV1alpha1().Gateways(gateway4.GetNamespace()).Delete(context.TODO(), + gateway4.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport3.Name).Times(1) + c.processNextWorkItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + func TestMCRouteControllerAsGateway(t *testing.T) { - c := newMCDefaultRouteController(t, &config.NodeConfig{Name: "node-1"}) + c := newMCDefaultRouteController( + t, + &config.NodeConfig{Name: "node-1"}, + &config.NetworkConfig{}, + agent.WireGuardConfig{}, + nil, + "none", + ) defer c.queue.ShutDown() stopCh := make(chan struct{}) @@ -143,14 +271,14 @@ func TestMCRouteControllerAsGateway(t *testing.T) { // Create two ClusterInfoImports c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport1.GetNamespace()). Create(context.TODO(), &clusterInfoImport1, metav1.CreateOptions{}) - peerNodeIP1 := getPeerGatewayIP(clusterInfoImport1.Spec) + peerNodeIP1 := getPeerGatewayTunnelIP(clusterInfoImport1.Spec, false) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP1, gw1GatewayIP, true).Times(1) c.processNextWorkItem() c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()). Create(context.TODO(), &clusterInfoImport2, metav1.CreateOptions{}) - peerNodeIP2 := getPeerGatewayIP(clusterInfoImport2.Spec) + peerNodeIP2 := getPeerGatewayTunnelIP(clusterInfoImport2.Spec, false) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport2.Name, gomock.Any(), peerNodeIP2, gw1GatewayIP, true).Times(1) c.processNextWorkItem() @@ -207,7 +335,14 @@ func TestMCRouteControllerAsGateway(t *testing.T) { } func TestMCRouteControllerAsRegularNode(t *testing.T) { - c := newMCDefaultRouteController(t, &config.NodeConfig{Name: "node-3"}) + c := newMCDefaultRouteController( + t, + &config.NodeConfig{Name: "node-3"}, + &config.NetworkConfig{}, + agent.WireGuardConfig{}, + nil, + "none", + ) defer c.queue.ShutDown() stopCh := make(chan struct{}) @@ -290,3 +425,189 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { case <-finishCh: } } + +func TestRemoveWireGuardRouteAndPeer(t *testing.T) { + ctrl := gomock.NewController(t) + mockInterface := routemock.NewMockInterface(ctrl) + networkConfig := &config.NetworkConfig{} + c := newMCDefaultRouteController(t, + &config.NodeConfig{ + Name: "node-4", + PodIPv4CIDR: &net.IPNet{ + IP: net.ParseIP("10.10.0.0/16"), + }, + }, + networkConfig, + agent.WireGuardConfig{}, + mockInterface, + "wireGuard", + ) + c.wireGuardClient = &wireguardmock.MockWireGuardClient{} + defer c.queue.ShutDown() + mockInterface.EXPECT().DeleteRouteForLink(gomock.Any(), gomock.Any()).Times(1) + + testCases := []struct { + name string + ciImport *mcv1alpha1.ClusterInfoImport + expectError error + }{ + { + name: "remove peer successfully", + ciImport: &mcv1alpha1.ClusterInfoImport{ + Spec: mcv1alpha1.ClusterInfo{ + ServiceCIDR: "10.100.0.0/16", + }, + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expectError, c.removeWireGuardRouteAndPeer(tt.ciImport)) + }) + } +} + +func TestEnqueueGateway(t *testing.T) { + ctrl := gomock.NewController(t) + mockInterface := routemock.NewMockInterface(ctrl) + networkConfig := &config.NetworkConfig{} + c := newMCDefaultRouteController(t, + &config.NodeConfig{ + Name: "node-4", + PodIPv4CIDR: &net.IPNet{ + IP: net.ParseIP("10.10.0.0/16"), + }, + }, + networkConfig, + agent.WireGuardConfig{}, + mockInterface, + "wireGuard", + ) + c.wireGuardClient = &wireguardmock.MockWireGuardClient{} + defer c.queue.ShutDown() + + testCases := []struct { + name string + obj interface{} + isDeleted bool + expectNum int + }{ + { + name: "gateway is deleted, enqueue successfully", + obj: &mcv1alpha1.Gateway{}, + isDeleted: true, + expectNum: 1, + }, + { + name: "gateway deleted, enqueue successfully", + obj: &mcv1alpha1.Gateway{ + InternalIP: "10.10.1.1", + GatewayIP: "10.10.1.1", + }, + isDeleted: true, + expectNum: 1, + }, + { + name: "gateway not deleted, enqueue failed", + obj: &mcv1alpha1.Gateway{}, + isDeleted: false, + expectNum: 0, + }, + { + name: "unexpect object", + obj: map[string]string{}, + isDeleted: false, + expectNum: 0, + }, + + { + name: "invalid delete state", + obj: cache.DeletedFinalStateUnknown{ + Obj: map[string]string{}, + }, + isDeleted: false, + expectNum: 0, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "gatewayroute") + c.enqueueGateway(tt.obj, tt.isDeleted) + assert.Equal(t, tt.expectNum, c.queue.Len()) + }) + } +} + +func TestEnqueueClusterInfoImport(t *testing.T) { + ctrl := gomock.NewController(t) + mockInterface := routemock.NewMockInterface(ctrl) + networkConfig := &config.NetworkConfig{} + c := newMCDefaultRouteController(t, + &config.NodeConfig{ + Name: "node-4", + PodIPv4CIDR: &net.IPNet{ + IP: net.ParseIP("10.10.0.0/16"), + }, + }, + networkConfig, + agent.WireGuardConfig{}, + mockInterface, + "wireGuard", + ) + c.wireGuardClient = &wireguardmock.MockWireGuardClient{} + defer c.queue.ShutDown() + + testCases := []struct { + name string + obj interface{} + isDeleted bool + expectNum int + }{ + { + name: "ClusterInfoImport without GatewayInfo", + obj: &mcv1alpha1.ClusterInfoImport{}, + isDeleted: false, + expectNum: 0, + }, + { + name: "ClusterInfoImport without Gateway IP", + obj: &mcv1alpha1.ClusterInfoImport{ + Spec: mcv1alpha1.ClusterInfo{ + GatewayInfos: []mcv1alpha1.GatewayInfo{ + { + GatewayIP: "abc", + }, + }, + }, + }, + isDeleted: false, + expectNum: 0, + }, + { + name: "unexpected object", + obj: map[string]string{}, + expectNum: 0, + }, + { + name: "invalid delete state", + obj: cache.DeletedFinalStateUnknown{}, + expectNum: 0, + }, + { + name: "ClusterInfoImport delete, enqueue successfully", + obj: &mcv1alpha1.ClusterInfoImport{}, + isDeleted: true, + expectNum: 1, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "gatewayroute") + c.enqueueClusterInfoImport(tt.obj, tt.isDeleted) + assert.Equal(t, tt.expectNum, c.queue.Len()) + }) + } +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 03064d039b9..9743aca5c8d 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -1455,7 +1455,7 @@ func (c *client) InstallMulticlusterNodeFlows(clusterID string, var flows []binding.Flow localGatewayMAC := c.nodeConfig.GatewayConfig.MAC for peerCIDR, remoteGatewayIP := range peerConfigs { - flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteViaTun(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP, enableStretchedNetworkPolicy)...) + flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteGateway(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP, enableStretchedNetworkPolicy)...) } return c.modifyFlows(c.featureMulticluster.cachedFlows, cacheKey, flows) } @@ -1473,7 +1473,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string, var flows []binding.Flow localGatewayMAC := c.nodeConfig.GatewayConfig.MAC for peerCIDR, remoteGatewayIP := range peerConfigs { - flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteViaTun(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP, enableStretchedNetworkPolicy)...) + flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteGateway(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP, enableStretchedNetworkPolicy)...) // Add SNAT flows to change cross-cluster packets' source IP to local Gateway IP. flows = append(flows, c.featureMulticluster.snatConntrackFlows(*peerCIDR, localGatewayIP)...) } diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index e4bfac68741..a9b7009487f 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -68,7 +68,7 @@ func (f *featureMulticluster) replayFlows() []binding.Flow { return getCachedFlows(f.cachedFlows) } -func (f *featureMulticluster) l3FwdFlowToRemoteViaTun( +func (f *featureMulticluster) l3FwdFlowToRemoteGateway( localGatewayMAC net.HardwareAddr, peerServiceCIDR net.IPNet, tunnelPeer net.IP, diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 8975c98f6bd..149246a1c74 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -80,4 +80,11 @@ type Interface interface { // DeleteLocalAntreaFlexibleIPAMPodRule is used to delete related IP set entries when an AntreaFlexibleIPAM Pod is deleted. DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error + + // AddRouteForLink adds a route entry for a specific link in format: + // "dstCIDR" dev "link" scope link + AddRouteForLink(dstCIDR *net.IPNet, linkIndex int) error + + // DeleteRouteForLink deletes a route entry for a specific link. + DeleteRouteForLink(dstCIDR *net.IPNet, linkIndex int) error } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 51497536112..712fb297ed5 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -1551,6 +1551,33 @@ func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { return nil } +func (c *Client) AddRouteForLink(cidr *net.IPNet, linkIndex int) error { + route := &netlink.Route{ + Scope: netlink.SCOPE_LINK, + Dst: cidr, + LinkIndex: linkIndex, + } + + return c.netlink.RouteReplace(route) +} + +func (c *Client) DeleteRouteForLink(cidr *net.IPNet, linkIndex int) error { + route := &netlink.Route{ + Scope: netlink.SCOPE_LINK, + Dst: cidr, + LinkIndex: linkIndex, + } + + if err := c.netlink.RouteDel(route); err != nil { + if err.Error() == "no such process" { + klog.V(2).InfoS("Failed to delete WireGuard CIDR route since the route does not exist", "route", route) + return nil + } + return err + } + return nil +} + func getTransProtocolStr(protocol binding.Protocol) string { if protocol == binding.ProtocolTCP || protocol == binding.ProtocolTCPv6 { return "tcp" diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 82257b9a995..53c0eb86ec4 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -572,3 +572,11 @@ func generateNeigh(ip net.IP, linkIndex int) *util.Neighbor { State: "Permanent", } } + +func (c *Client) AddRouteForLink(dstCIDR *net.IPNet, linkIndex int) error { + return errors.New("AddRouteForLink is not implemented on Windows") +} + +func (c *Client) DeleteRouteForLink(dstCIDR *net.IPNet, linkIndex int) error { + return errors.New("DeleteRouteForLink is not implemented on Windows") +} diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 8d5a3256530..f0404b8de6c 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -92,6 +92,20 @@ func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) } +// AddRouteForLink mocks base method +func (m *MockInterface) AddRouteForLink(arg0 *net.IPNet, arg1 int) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddRouteForLink", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddRouteForLink indicates an expected call of AddRouteForLink +func (mr *MockInterfaceMockRecorder) AddRouteForLink(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRouteForLink", reflect.TypeOf((*MockInterface)(nil).AddRouteForLink), arg0, arg1) +} + // AddRoutes mocks base method func (m *MockInterface) AddRoutes(arg0 *net.IPNet, arg1 string, arg2, arg3 net.IP) error { m.ctrl.T.Helper() @@ -162,6 +176,20 @@ func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) } +// DeleteRouteForLink mocks base method +func (m *MockInterface) DeleteRouteForLink(arg0 *net.IPNet, arg1 int) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteRouteForLink", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteRouteForLink indicates an expected call of DeleteRouteForLink +func (mr *MockInterfaceMockRecorder) DeleteRouteForLink(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteRouteForLink", reflect.TypeOf((*MockInterface)(nil).DeleteRouteForLink), arg0, arg1) +} + // DeleteRoutes mocks base method func (m *MockInterface) DeleteRoutes(arg0 *net.IPNet) error { m.ctrl.T.Helper() diff --git a/pkg/agent/wireguard/client_linux.go b/pkg/agent/wireguard/client_linux.go index 8054d810731..b34c4631952 100644 --- a/pkg/agent/wireguard/client_linux.go +++ b/pkg/agent/wireguard/client_linux.go @@ -18,8 +18,6 @@ package wireguard import ( - "context" - "encoding/json" "errors" "fmt" "io" @@ -31,14 +29,9 @@ import ( "golang.org/x/sys/unix" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apitypes "k8s.io/apimachinery/pkg/types" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" - "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" ) @@ -65,14 +58,13 @@ var ( type client struct { wgClient wgctrlClient nodeName string - k8sClient clientset.Interface privateKey wgtypes.Key peerPublicKeyByNodeName *sync.Map wireGuardConfig *config.WireGuardConfig gatewayConfig *config.GatewayConfig } -func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig) (Interface, error) { +func New(nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig) (Interface, error) { wgClient, err := wgctrl.New() if err != nil { return nil, err @@ -83,7 +75,6 @@ func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuard c := &client{ wgClient: wgClient, nodeName: nodeConfig.Name, - k8sClient: clientSet, wireGuardConfig: wireGuardConfig, peerPublicKeyByNodeName: &sync.Map{}, gatewayConfig: nodeConfig.GatewayConfig, @@ -91,31 +82,42 @@ func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuard return c, nil } -func (client *client) Init() error { +func (client *client) Init(ipv4 net.IP, ipv6 net.IP) (string, error) { link := &netlink.Wireguard{LinkAttrs: netlink.LinkAttrs{Name: client.wireGuardConfig.Name, MTU: client.wireGuardConfig.MTU}} err := linkAdd(link) // Ignore existing link as it may have already been created or managed by userspace process. if err != nil && !errors.Is(err, unix.EEXIST) { if errors.Is(err, unix.EOPNOTSUPP) { - return fmt.Errorf("WireGuard not supported by the Linux kernel (netlink: %w), make sure the WireGuard kernel module is loaded", err) + return "", fmt.Errorf("WireGuard not supported by the Linux kernel (netlink: %w), make sure the WireGuard kernel module is loaded", err) } - return err + return "", err } if err := linkSetUp(link); err != nil { - return err + return "", err } // Configure the IP addresses same as Antrea gateway so iptables MASQUERADE target will select it as source address. // It's necessary to make Service traffic requiring SNAT (e.g. host to ClusterIP, external to NodePort) accepted by // peer Node and to make their response routed back correctly. + // If ipv4 or ipv6 is not provided, the IP address from client's Gateway configuration will be used. // It uses "/32" mask for IPv4 address and "/128" mask for IPv6 address to avoid impacting routes on Antrea gateway. var gatewayIPs []*net.IPNet - if client.gatewayConfig.IPv4 != nil { + if ipv4 != nil { + gatewayIPs = append(gatewayIPs, &net.IPNet{ + IP: ipv4, + Mask: net.CIDRMask(32, 32), + }) + } else if client.gatewayConfig.IPv4 != nil { gatewayIPs = append(gatewayIPs, &net.IPNet{ IP: client.gatewayConfig.IPv4, Mask: net.CIDRMask(32, 32), }) } - if client.gatewayConfig.IPv6 != nil { + if ipv6 != nil { + gatewayIPs = append(gatewayIPs, &net.IPNet{ + IP: ipv6, + Mask: net.CIDRMask(128, 128), + }) + } else if client.gatewayConfig.IPv6 != nil { gatewayIPs = append(gatewayIPs, &net.IPNet{ IP: client.gatewayConfig.IPv6, Mask: net.CIDRMask(128, 128), @@ -123,12 +125,12 @@ func (client *client) Init() error { } // This must be executed after netlink.LinkSetUp as the latter ensures link.Attrs().Index is set. if err := utilConfigureLinkAddresses(link.Attrs().Index, gatewayIPs); err != nil { - return err + return "", err } client.wireGuardConfig.LinkIndex = link.Attrs().Index wgDev, err := client.wgClient.Device(client.wireGuardConfig.Name) if err != nil { - return err + return "", err } client.privateKey = wgDev.PrivateKey // WireGuard private key will be persistent across agent restarts. So we only need to @@ -136,7 +138,7 @@ func (client *client) Init() error { if client.privateKey == zeroKey { newPkey, err := wgtypes.GeneratePrivateKey() if err != nil { - return err + return "", err } client.privateKey = newPkey } @@ -145,21 +147,8 @@ func (client *client) Init() error { ListenPort: &client.wireGuardConfig.Port, ReplacePeers: false, } - patch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "annotations": map[string]string{ - types.NodeWireGuardPublicAnnotationKey: client.privateKey.PublicKey().String(), - }, - }, - }) - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - _, err := client.k8sClient.CoreV1().Nodes().Patch(context.TODO(), client.nodeName, apitypes.MergePatchType, patch, metav1.PatchOptions{}, "status") - return err - }); err != nil { - return fmt.Errorf("error when patching the Node with the '%s' annotation: %w", types.NodeWireGuardPublicAnnotationKey, err) - } - return client.wgClient.ConfigureDevice(client.wireGuardConfig.Name, cfg) + return client.privateKey.PublicKey().String(), client.wgClient.ConfigureDevice(client.wireGuardConfig.Name, cfg) } func (client *client) RemoveStalePeers(currentPeerPublickeys map[string]string) error { @@ -256,3 +245,13 @@ func (client *client) DeletePeer(nodeName string) error { client.peerPublicKeyByNodeName.Delete(nodeName) return nil } + +func (client *client) CleanUp() error { + if err := netlink.LinkDel(&netlink.Device{ + LinkAttrs: netlink.LinkAttrs{ + Name: client.wireGuardConfig.Name, Index: client.wireGuardConfig.LinkIndex, + }}); err != nil && err.Error() != "no such device" { + return err + } + return nil +} diff --git a/pkg/agent/wireguard/client_test.go b/pkg/agent/wireguard/client_test.go index 9627467aa25..cb10e6cda82 100644 --- a/pkg/agent/wireguard/client_test.go +++ b/pkg/agent/wireguard/client_test.go @@ -28,9 +28,6 @@ import ( "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/config" ) @@ -385,8 +382,7 @@ func Test_DeletePeer(t *testing.T) { } func Test_New(t *testing.T) { - client := fake.NewSimpleClientset() - _, err := New(client, &config.NodeConfig{Name: "test"}, &config.WireGuardConfig{}) + _, err := New(&config.NodeConfig{Name: "test"}, &config.WireGuardConfig{}) require.NoError(t, err) } @@ -397,6 +393,8 @@ func Test_Init(t *testing.T) { lindSetupErr error utilConfigErr error expectedErr string + extraIPv4 net.IP + extraIPv6 net.IP }{ { name: "init successfully", @@ -421,14 +419,17 @@ func Test_Init(t *testing.T) { utilConfigErr: errors.New("link address config failed"), expectedErr: "link address config failed", }, + { + name: "init successfully with provided IPv4 address", + extraIPv4: net.ParseIP("192.168.0.0"), + }, + { + name: "init successfully with provided IPv6 address", + extraIPv6: net.ParseIP("0000:0000:0000:0000:0000:0000:0000:0000"), + }, } client := getFakeClient() - client.k8sClient = fake.NewSimpleClientset(&corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fake-node-1", - }, - }) client.gatewayConfig = &config.GatewayConfig{ IPv4: net.ParseIP("192.168.0.2"), IPv6: net.ParseIP("fd12:ab:34:a001::11"), @@ -446,7 +447,7 @@ func Test_Init(t *testing.T) { return tt.utilConfigErr } - err := client.Init() + _, err := client.Init(tt.extraIPv4, tt.extraIPv6) if tt.expectedErr != "" { assert.Equal(t, tt.expectedErr, err.Error()) } else { diff --git a/pkg/agent/wireguard/client_windows.go b/pkg/agent/wireguard/client_windows.go index eb20d2ff802..a377b846d5d 100644 --- a/pkg/agent/wireguard/client_windows.go +++ b/pkg/agent/wireguard/client_windows.go @@ -20,11 +20,9 @@ package wireguard import ( "fmt" - clientset "k8s.io/client-go/kubernetes" - "antrea.io/antrea/pkg/agent/config" ) -func New(clientSet clientset.Interface, nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig) (Interface, error) { +func New(nodeConfig *config.NodeConfig, wireGuardConfig *config.WireGuardConfig) (Interface, error) { return nil, fmt.Errorf("WireGuard is not implemented for windows") } diff --git a/pkg/agent/wireguard/client_windows_test.go b/pkg/agent/wireguard/client_windows_test.go index d910436e2cb..b18de5487b9 100644 --- a/pkg/agent/wireguard/client_windows_test.go +++ b/pkg/agent/wireguard/client_windows_test.go @@ -24,6 +24,6 @@ import ( ) func TestNew(t *testing.T) { - _, err := New(nil, nil, nil) + _, err := New(nil, nil) assert.Equal(t, "WireGuard is not implemented for windows", err.Error()) } diff --git a/pkg/agent/wireguard/interface.go b/pkg/agent/wireguard/interface.go index 4b11b33709c..99c285d7042 100644 --- a/pkg/agent/wireguard/interface.go +++ b/pkg/agent/wireguard/interface.go @@ -20,14 +20,18 @@ import ( type Interface interface { // Init initializes the WireGuard client and sets up the WireGuard device. - // It will generate a new private key if necessary and update the public key to the Node's annotation. - Init() error + // It will generate a new private key if necessary. + // If IPv4 or IPv6 address specified, it will be set as the WireGuard interface's + // IP address. + Init(ipv4 net.IP, ipv6 net.IP) (string, error) // UpdatePeer updates WireGuard peer by provided public key and Node IPs. // It will create a new WireGuard peer if the specified Node is not present in WireGuard device. - UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, podCIDRs []*net.IPNet) error + UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, allowedIPs []*net.IPNet) error // RemoveStalePeers reads existing WireGuard peers from the WireGuard device and deletes those which are not in currentPeerPublickeys. // currentPeerPublickeys is a map of Node names to public keys. It is useful to clean up stale WireGuard peers upon antrea starting. RemoveStalePeers(currentPeerPublickeys map[string]string) error // DeletePeer deletes the WireGuard peer by Node name. DeletePeer(nodeName string) error + // CleanUp cleans the network interface on the host created by WireGuard client. + CleanUp() error } diff --git a/pkg/agent/wireguard/testing/mock_wireguard.go b/pkg/agent/wireguard/testing/mock_wireguard.go new file mode 100644 index 00000000000..4c06f591e2f --- /dev/null +++ b/pkg/agent/wireguard/testing/mock_wireguard.go @@ -0,0 +1,42 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package testing + +import ( + "net" +) + +type MockWireGuardClient struct{} + +func (m *MockWireGuardClient) Init(ipv4 net.IP, ipv6 net.IP) (string, error) { + return "key", nil +} + +func (m *MockWireGuardClient) CleanUp() error { + return nil +} + +func (m *MockWireGuardClient) UpdatePeer(nodeName, publicKeyString string, peerNodeIP net.IP, allowedIPs []*net.IPNet) error { + return nil +} + +func (m *MockWireGuardClient) RemoveStalePeers(currentPeerPublickeys map[string]string) error { + return nil +} + +func (m *MockWireGuardClient) DeletePeer(nodeName string) error { + return nil +} diff --git a/pkg/apis/ports.go b/pkg/apis/ports.go index 4843e783676..c13b045536f 100644 --- a/pkg/apis/ports.go +++ b/pkg/apis/ports.go @@ -27,4 +27,6 @@ const ( AntreaAgentClusterMembershipPort = 10351 // WireGuardListenPort is the default port for WireGuard encrypted traffic. WireGuardListenPort = 51820 + // MulticlusterWireGuardListenPort is the default port for Multi-cluster WireGuard encrypted traffic. + MulticlusterWireGuardListenPort = 51821 ) diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 03d586eac38..cdc3d623e0c 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -300,6 +300,13 @@ type MulticlusterConfig struct { // clusters directly. This feature also requires Pod CIDRs to be provided in the Multi-cluster Controller // configuration. EnablePodToPodConnectivity bool `yaml:"enablePodToPodConnectivity,omitempty"` + // Antrea Multi-cluster WireGuard tunnel configuration. + WireGuard WireGuardConfig `yaml:"wireGuard,omitempty"` + // Determines how cross-cluster traffic is encrypted. + // It has the following options: + // - none (default): Cross-cluster traffic will not be encrypted. + // - wireGuard: Enable WireGuard for tunnel traffic encryption. + TrafficEncryptionMode string `yaml:"trafficEncryptionMode,omitempty"` } type ExternalNodeConfig struct {