From 243cf7ed4e4d8df5db4565fe3715ed54d558ec82 Mon Sep 17 00:00:00 2001 From: Thomas Ferrandiz Date: Mon, 12 Dec 2022 09:01:57 +0000 Subject: [PATCH] Implement MultiClusterCIDR API in flannel This API requires Kubernetes 1.26 and is available for vxlan, wireguard and host-gw backends. Signed-off-by: Thomas Ferrandiz --- Documentation/MultiClusterCIDR/README.md | 137 ++++++++++++++++ Documentation/kube-flannel-psp.yml | 4 +- Documentation/kube-flannel.yml | 15 +- backend/ipip/ipip.go | 7 +- backend/udp/udp_amd64.go | 6 +- backend/vxlan/vxlan.go | 12 +- backend/wireguard/device.go | 11 +- backend/wireguard/wireguard.go | 16 +- backend/wireguard/wireguard_network.go | 52 +++++- dist/functional-test-k8s.sh | 10 +- go.mod | 2 +- main.go | 200 +++++++++++++++-------- network/iptables.go | 162 +++++++++--------- network/iptables_test.go | 9 +- pkg/ip/ip6net.go | 14 ++ pkg/ip/ipnet.go | 14 ++ subnet/config.go | 110 +++++++++++-- subnet/config_test.go | 22 +++ subnet/etcd/local_manager.go | 5 + subnet/kube/cluster_cidr.go | 101 ++++++++++++ subnet/kube/kube.go | 98 ++++++++++- subnet/subnet.go | 50 ++++++ 22 files changed, 866 insertions(+), 191 deletions(-) create mode 100644 Documentation/MultiClusterCIDR/README.md create mode 100644 subnet/kube/cluster_cidr.go diff --git a/Documentation/MultiClusterCIDR/README.md b/Documentation/MultiClusterCIDR/README.md new file mode 100644 index 000000000..27711c65f --- /dev/null +++ b/Documentation/MultiClusterCIDR/README.md @@ -0,0 +1,137 @@ +Flannel provides experimental support for the new [MultiClusterCIDR API](https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2593-multiple-cluster-cidrs) introduced as an alpha feature in Kubernetes 1.26. + +## Prerequisites +* A cluster running Kubernetes 1.26 (this was tested on version `1.26.0-alpha.1`) +* Use flannel version `0.21.0` or later +* The MultiClusterCIDR API can be used with vxlan, wireguard and host-gw backend + +*Note*: once a PodCIDR is allocated to a node, it cannot be modified or removed. So you need to configure the MultiClusterCIDR before you add the new nodes to your cluster. + +## How to use the MultiClusterCIDR API +### Enable the new API in the control plane +* Edit `/etc/kubernetes/manifests/kube-controller-manager.yaml` and add the following lines in the `spec.containers.command` section: +``` + - --cidr-allocator-type=MultiCIDRRangeAllocator + - --feature-gates=MultiCIDRRangeAllocator=true +``` + +* Edit `/etc/kubernetes/manifests/kube-apiserver.yaml` and add the following line in the `spec.containers.command` section: +``` + - --runtime-config=networking.k8s.io/v1alpha1 +``` + +Both components should restart automatically and a default ClusterCIDR resource will be created based on the usual `pod-network-cidr` parameter. + +For example: +```bash +$ kubectl get clustercidr +NAME PERNODEHOSTBITS IPV4 IPV6 AGE +default-cluster-cidr 8 10.244.0.0/16 2001:cafe:42::/112 24h + +$ kubectl describe clustercidr default-cluster-cidr +Name: default-cluster-cidr +Labels: +Annotations: +NodeSelector: +PerNodeHostBits: 8 +IPv4: 10.244.0.0/16 +IPv6: 2001:cafe:42::/112 +Events: +``` + +### Enable the new feature in flannel +This feature is disabled by default. To enable it, add the following flag to the args of the `kube-flannel` container: +``` + - --use-multi-cluster-cidr +``` + +Since you will specify the subnets to use for pods IP addresses through the new API, you do not need the `Network` and `IPv6Network` sections in the flannel configuration. Thus your flannel configuration could look like this: +```json +{ + "EnableIPv6": true, + "Backend": { + "Type": "host-gw" + } +} +``` + + +If you let them in, they will simply be ignored by flannel. +NOTE: this only applies when using the MultiClusterCIDR API. + +### Configure the required `clustercidr` resources +Before adding nodes to the cluster, you need to add new `clustercidr` resources. + +For example: +```yaml +apiVersion: networking.k8s.io/v1alpha1 +kind: ClusterCIDR +metadata: + name: my-cidr-1 +spec: + nodeSelector: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: In + values: + - "worker1" + perNodeHostBits: 8 + ipv4: 10.248.0.0/16 + ipv6: 2001:cafe:43::/112 +--- +apiVersion: networking.k8s.io/v1alpha1 +kind: ClusterCIDR +metadata: + name: my-cidr-2 +spec: + nodeSelector: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: In + values: + - "worker2" + perNodeHostBits: 8 + ipv4: 10.247.0.0/16 + ipv6: "" +``` +For more details on the `spec` section, see the [feature specification page](https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2593-multiple-cluster-cidrs#expected-behavior). + +*WARNING*: all the fields in the `spec` section are immutable. + +For more information on Node Selectors, see [the Kubernetes documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/). + +### Add nodes to the cluster +The new nodes will be allocated a `PodCIDR` based on the configured `clustercidr`. +flannel will ensure connectivity between all the pods regardless of the subnet in which the pod's IP address has been allocated. + +## Notes on the subnet.env file +flanneld writes a file (located by default at /run/flannel/subnet.env) that is used by the flannel cni plugin which is called by the kubelet every time a pod is added or removed from the node. This file changes slightly with the new API. The `FLANNEL_NETWORK` and `FLANNEL_IPV6_NETWORK` become lists of CIDRs instead of sigle CIDR entry. They will hold the list of CIDRs declared in the `clustercidr` resource of the API. The file is updated by flanneld every time a new `clustercidr` is created. + +As an example, it could look like this: +```bash +FLANNEL_NETWORK=10.42.0.0/16,192.168.0.0/16 +FLANNEL_SUBNET=10.42.0.1/24 +FLANNEL_IPV6_NETWORK=2001:cafe:42::/56 +FLANNEL_IPV6_SUBNET=2001:cafe:42::1/64,2001:cafd:42::1/64 +FLANNEL_MTU=1450 +FLANNEL_IPMASQ=true +``` + +## Notes on using IPv6 with the MultiClusterCIDR API +The feature is fully compatible with IPv6 and dual-stack networking. +Each `clustercidr` resource can include an IPv4 and/or an IPv6 subnet. +If both are provided, the PodCIDR allocated based on this `clustercidr` will be dual-stack. +The controller allows you to use IPv4, IPv6 and dual-stack `clustercidr` resources all at the same time to facilitate cluster migrations. +As a result, it is up to you to ensure the coherence of your IP allocation. + +If you want to use dual-stack networking with the new API, we recommend that you do not specify the `--pod-network-cidr` flag to `kubeadm` when installing the cluster so that you can manually configure the controller later. +In that case, when you edit `/etc/kubernetes/manifests/kube-controller-manager.yaml`, add: +``` + - --cidr-allocator-type=MultiCIDRRangeAllocator + - --feature-gates=MultiCIDRRangeAllocator=true + - --cluster-cidr=10.244.0.0/16,2001:cafe:42::/112 #replace with your own default clusterCIDR + - --node-cidr-mask-size-ipv6=120 + - --allocate-node-cidrs +``` diff --git a/Documentation/kube-flannel-psp.yml b/Documentation/kube-flannel-psp.yml index 0a4536516..cd280e294 100644 --- a/Documentation/kube-flannel-psp.yml +++ b/Documentation/kube-flannel-psp.yml @@ -166,8 +166,8 @@ spec: serviceAccountName: flannel initContainers: - name: install-cni-plugin - #image: flannelcni/flannel-cni-plugin:v1.1.0 for ppc64le and mips64le (dockerhub limitations may apply) - image: docker.io/rancher/mirrored-flannelcni-flannel-cni-plugin:v1.1.0 + #image: flannelcni/flannel-cni-plugin:v1.1.2 for ppc64le and mips64le (dockerhub limitations may apply) + image: docker.io/rancher/mirrored-flannelcni-flannel-cni-plugin:v1.1.2 command: - cp args: diff --git a/Documentation/kube-flannel.yml b/Documentation/kube-flannel.yml index 0bc0d084b..694e4b935 100644 --- a/Documentation/kube-flannel.yml +++ b/Documentation/kube-flannel.yml @@ -31,6 +31,13 @@ rules: - nodes/status verbs: - patch +- apiGroups: + - "networking.k8s.io" + resources: + - clustercidrs + verbs: + - list + - watch --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 @@ -123,8 +130,8 @@ spec: serviceAccountName: flannel initContainers: - name: install-cni-plugin - #image: flannelcni/flannel-cni-plugin:v1.1.0 for ppc64le and mips64le (dockerhub limitations may apply) - image: docker.io/rancher/mirrored-flannelcni-flannel-cni-plugin:v1.1.0 + #image: flannelcni/flannel-cni-plugin:v1.1.2 #for ppc64le and mips64le (dockerhub limitations may apply) + image: docker.io/rancher/mirrored-flannelcni-flannel-cni-plugin:v1.1.2 command: - cp args: @@ -135,7 +142,7 @@ spec: - name: cni-plugin mountPath: /opt/cni/bin - name: install-cni - #image: flannelcni/flannel:v0.20.2 for ppc64le and mips64le (dockerhub limitations may apply) + #image: flannelcni/flannel:v0.20.2 #for ppc64le and mips64le (dockerhub limitations may apply) image: docker.io/rancher/mirrored-flannelcni-flannel:v0.20.2 command: - cp @@ -150,7 +157,7 @@ spec: mountPath: /etc/kube-flannel/ containers: - name: kube-flannel - #image: flannelcni/flannel:v0.20.2 for ppc64le and mips64le (dockerhub limitations may apply) + #image: flannelcni/flannel:v0.20.2 #for ppc64le and mips64le (dockerhub limitations may apply) image: docker.io/rancher/mirrored-flannelcni-flannel:v0.20.2 command: - /opt/bin/flanneld diff --git a/backend/ipip/ipip.go b/backend/ipip/ipip.go index a64c90968..ae56f1039 100644 --- a/backend/ipip/ipip.go +++ b/backend/ipip/ipip.go @@ -89,7 +89,12 @@ func (be *IPIPBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, return nil, fmt.Errorf("failed to acquire lease: %v", err) } - link, err := be.configureIPIPDevice(n.SubnetLease, subnet.GetFlannelNetwork(config)) + net, err := config.GetFlannelNetwork(&n.SubnetLease.Subnet) + if err != nil { + return nil, err + } + + link, err := be.configureIPIPDevice(n.SubnetLease, net) if err != nil { return nil, err diff --git a/backend/udp/udp_amd64.go b/backend/udp/udp_amd64.go index e8dc9f6eb..39a2a2fdd 100644 --- a/backend/udp/udp_amd64.go +++ b/backend/udp/udp_amd64.go @@ -78,11 +78,15 @@ func (be *UdpBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, c return nil, fmt.Errorf("failed to acquire lease: %v", err) } + net, err := config.GetFlannelNetwork(&l.Subnet) + if err != nil { + return nil, err + } // Tunnel's subnet is that of the whole overlay network (e.g. /16) // and not that of the individual host (e.g. /24) tunNet := ip.IP4Net{ IP: l.Subnet.IP, - PrefixLen: subnet.GetFlannelNetwork(config).PrefixLen, + PrefixLen: net.PrefixLen, } return newNetwork(be.sm, be.extIface, cfg.Port, tunNet, l) diff --git a/backend/vxlan/vxlan.go b/backend/vxlan/vxlan.go index 7c4542624..8a6227325 100644 --- a/backend/vxlan/vxlan.go +++ b/backend/vxlan/vxlan.go @@ -191,12 +191,20 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, // This IP is just used as a source address for host to workload traffic (so // the return path for the traffic has an address on the flannel network to use as the destination) if config.EnableIPv4 { - if err := dev.Configure(ip.IP4Net{IP: lease.Subnet.IP, PrefixLen: 32}, subnet.GetFlannelNetwork(config)); err != nil { + net, err := config.GetFlannelNetwork(&lease.Subnet) + if err != nil { + return nil, err + } + if err := dev.Configure(ip.IP4Net{IP: lease.Subnet.IP, PrefixLen: 32}, net); err != nil { return nil, fmt.Errorf("failed to configure interface %s: %w", dev.link.Attrs().Name, err) } } if config.EnableIPv6 { - if err := v6Dev.ConfigureIPv6(ip.IP6Net{IP: lease.IPv6Subnet.IP, PrefixLen: 128}, subnet.GetFlannelIPv6Network(config)); err != nil { + net, err := config.GetFlannelIPv6Network(&lease.IPv6Subnet) + if err != nil { + return nil, err + } + if err := v6Dev.ConfigureIPv6(ip.IP6Net{IP: lease.IPv6Subnet.IP, PrefixLen: 128}, net); err != nil { return nil, fmt.Errorf("failed to configure interface %s: %w", v6Dev.link.Attrs().Name, err) } } diff --git a/backend/wireguard/device.go b/backend/wireguard/device.go index c603cd1e3..4c6afba98 100644 --- a/backend/wireguard/device.go +++ b/backend/wireguard/device.go @@ -219,12 +219,21 @@ func (dev *wgDevice) upAndAddRoute(dst *net.IPNet) error { return fmt.Errorf("failed to set interface %s to UP state: %w", dev.attrs.name, err) } + err = dev.addRoute(dst) + if err != nil { + return fmt.Errorf("failed to add route to destination (%s) to interface (%s): %w", dst, dev.attrs.name, err) + } + return nil +} + +func (dev *wgDevice) addRoute(dst *net.IPNet) error { route := netlink.Route{ LinkIndex: dev.link.Attrs().Index, Scope: netlink.SCOPE_LINK, Dst: dst, } - err = netlink.RouteAdd(&route) + + err := netlink.RouteAdd(&route) if err != nil { return fmt.Errorf("failed to add route %s: %w", dev.attrs.name, err) } diff --git a/backend/wireguard/wireguard.go b/backend/wireguard/wireguard.go index ceb94793d..71e89bcf0 100644 --- a/backend/wireguard/wireguard.go +++ b/backend/wireguard/wireguard.go @@ -149,7 +149,7 @@ func (be *WireguardBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGr } publicKey = dev.attrs.publicKey.String() } else { - return nil, fmt.Errorf("No valid Mode configured") + return nil, fmt.Errorf("no valid Mode configured") } subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, be.extIface.ExtV6Addr, config.EnableIPv4, config.EnableIPv6, publicKey) @@ -168,17 +168,25 @@ func (be *WireguardBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGr } if config.EnableIPv4 { - err = dev.Configure(lease.Subnet.IP, subnet.GetFlannelNetwork(config)) + net, err := config.GetFlannelNetwork(&lease.Subnet) + if err != nil { + return nil, err + } + err = dev.Configure(lease.Subnet.IP, net) if err != nil { return nil, err } } if config.EnableIPv6 { + ipv6net, err := config.GetFlannelIPv6Network(&lease.IPv6Subnet) + if err != nil { + return nil, err + } if cfg.Mode == Separate { - err = v6Dev.ConfigureV6(lease.IPv6Subnet.IP, subnet.GetFlannelIPv6Network(config)) + err = v6Dev.ConfigureV6(lease.IPv6Subnet.IP, ipv6net) } else { - err = dev.ConfigureV6(lease.IPv6Subnet.IP, subnet.GetFlannelIPv6Network(config)) + err = dev.ConfigureV6(lease.IPv6Subnet.IP, ipv6net) } if err != nil { return nil, err diff --git a/backend/wireguard/wireguard_network.go b/backend/wireguard/wireguard_network.go index 6736485f1..9afb62919 100644 --- a/backend/wireguard/wireguard_network.go +++ b/backend/wireguard/wireguard_network.go @@ -88,7 +88,7 @@ func (n *network) Run(ctx context.Context) { for { select { case evtBatch := <-events: - n.handleSubnetEvents(evtBatch) + n.handleSubnetEvents(ctx, evtBatch) case <-ctx.Done(): return @@ -132,7 +132,7 @@ func (n *network) selectPublicEndpoint(ip4 *ip.IP4, ip6 *ip.IP6) string { return ip4.String() } -func (n *network) handleSubnetEvents(batch []subnet.Event) { +func (n *network) handleSubnetEvents(ctx context.Context, batch []subnet.Event) { for _, event := range batch { switch event.Type { case subnet.EventAdded: @@ -152,7 +152,7 @@ func (n *network) handleSubnetEvents(batch []subnet.Event) { } } wireguardAttrs = v4wireguardAttrs - subnets = append(subnets, event.Lease.Subnet.ToIPNet()) + subnets = append(subnets, event.Lease.Subnet.ToIPNet()) //only used if n.mode != Separate } if event.Lease.EnableIPv6 { @@ -163,7 +163,7 @@ func (n *network) handleSubnetEvents(batch []subnet.Event) { } } wireguardAttrs = v6wireguardAttrs - subnets = append(subnets, event.Lease.IPv6Subnet.ToIPNet()) + subnets = append(subnets, event.Lease.IPv6Subnet.ToIPNet()) //only used if n.mode != Separate } if n.mode == Separate { @@ -176,6 +176,18 @@ func (n *network) handleSubnetEvents(batch []subnet.Event) { []net.IPNet{*event.Lease.Subnet.ToIPNet()}); err != nil { log.Errorf("failed to setup ipv4 peer (%s): %v", v4wireguardAttrs.PublicKey, err) } + netconf, err := n.sm.GetNetworkConfig(ctx) + if err != nil { + log.Errorf("could not read network config: %v", err) + } + flannelnet, err := netconf.GetFlannelNetwork(&event.Lease.Subnet) + if err != nil { + log.Errorf("could not get flannel network: %v", err) + } + + if err := n.dev.addRoute(flannelnet.ToIPNet()); err != nil { + log.Errorf("failed to add ipv4 route to (%s): %v", flannelnet, err) + } } if event.Lease.EnableIPv6 { @@ -187,6 +199,18 @@ func (n *network) handleSubnetEvents(batch []subnet.Event) { []net.IPNet{*event.Lease.IPv6Subnet.ToIPNet()}); err != nil { log.Errorf("failed to setup ipv6 peer (%s): %v", v6wireguardAttrs.PublicKey, err) } + netconf, err := n.sm.GetNetworkConfig(ctx) + if err != nil { + log.Errorf("could not read network config: %v", err) + } + ipv6flannelnet, err := netconf.GetFlannelIPv6Network(&event.Lease.IPv6Subnet) + if err != nil { + log.Errorf("could not get flannel network: %v", err) + } + + if err := n.v6Dev.addRoute(ipv6flannelnet.ToIPNet()); err != nil { + log.Errorf("failed to add ipv6 route to (%s): %v", ipv6flannelnet, err) + } } } else { var publicEndpoint string @@ -211,6 +235,26 @@ func (n *network) handleSubnetEvents(batch []subnet.Event) { peers); err != nil { log.Errorf("failed to setup peer (%s): %v", v4wireguardAttrs.PublicKey, err) } + netconf, err := n.sm.GetNetworkConfig(ctx) + if err != nil { + log.Errorf("could not read network config: %v", err) + } + flannelnet, err := netconf.GetFlannelNetwork(&event.Lease.Subnet) + if err != nil { + log.Errorf("could not get flannel network: %v", err) + } + + if err := n.dev.addRoute(flannelnet.ToIPNet()); err != nil { + log.Errorf("failed to add ipv4 route to (%s): %v", flannelnet, err) + } + ipv6flannelnet, err := netconf.GetFlannelIPv6Network(&event.Lease.IPv6Subnet) + if err != nil { + log.Errorf("could not get flannel network: %v", err) + } + + if err := n.dev.addRoute(ipv6flannelnet.ToIPNet()); err != nil { + log.Errorf("failed to add ipv6 route to (%s): %v", ipv6flannelnet, err) + } } case subnet.EventRemoved: diff --git a/dist/functional-test-k8s.sh b/dist/functional-test-k8s.sh index 90f804a52..294c4e6f7 100755 --- a/dist/functional-test-k8s.sh +++ b/dist/functional-test-k8s.sh @@ -250,9 +250,10 @@ check_iptables() { -A POSTROUTING -m comment --comment "flanneld masq" -j FLANNEL-POSTRTG -N FLANNEL-POSTRTG -A FLANNEL-POSTRTG -m mark --mark 0x4000/0x4000 -m comment --comment "flanneld masq" -j RETURN --A FLANNEL-POSTRTG -s 10.10.0.0/16 -d 10.10.0.0/16 -m comment --comment "flanneld masq" -j RETURN --A FLANNEL-POSTRTG -s 10.10.0.0/16 ! -d 224.0.0.0/4 -m comment --comment "flanneld masq" -j MASQUERADE --random-fully +-A FLANNEL-POSTRTG -s 10.10.1.0/24 -d 10.10.0.0/16 -m comment --comment "flanneld masq" -j RETURN +-A FLANNEL-POSTRTG -s 10.10.0.0/16 -d 10.10.1.0/24 -m comment --comment "flanneld masq" -j RETURN -A FLANNEL-POSTRTG ! -s 10.10.0.0/16 -d 10.10.1.0/24 -m comment --comment "flanneld masq" -j RETURN +-A FLANNEL-POSTRTG -s 10.10.0.0/16 ! -d 224.0.0.0/4 -m comment --comment "flanneld masq" -j MASQUERADE --random-fully -A FLANNEL-POSTRTG ! -s 10.10.0.0/16 -d 10.10.0.0/16 -m comment --comment "flanneld masq" -j MASQUERADE --random-fully EOM read -r -d '' POSTROUTING_RULES_FLANNEL2 << EOM @@ -260,9 +261,10 @@ EOM -A POSTROUTING -m comment --comment "flanneld masq" -j FLANNEL-POSTRTG -N FLANNEL-POSTRTG -A FLANNEL-POSTRTG -m mark --mark 0x4000/0x4000 -m comment --comment "flanneld masq" -j RETURN --A FLANNEL-POSTRTG -s 10.10.0.0/16 -d 10.10.0.0/16 -m comment --comment "flanneld masq" -j RETURN --A FLANNEL-POSTRTG -s 10.10.0.0/16 ! -d 224.0.0.0/4 -m comment --comment "flanneld masq" -j MASQUERADE --random-fully +-A FLANNEL-POSTRTG -s 10.10.2.0/24 -d 10.10.0.0/16 -m comment --comment "flanneld masq" -j RETURN +-A FLANNEL-POSTRTG -s 10.10.0.0/16 -d 10.10.2.0/24 -m comment --comment "flanneld masq" -j RETURN -A FLANNEL-POSTRTG ! -s 10.10.0.0/16 -d 10.10.2.0/24 -m comment --comment "flanneld masq" -j RETURN +-A FLANNEL-POSTRTG -s 10.10.0.0/16 ! -d 224.0.0.0/4 -m comment --comment "flanneld masq" -j MASQUERADE --random-fully -A FLANNEL-POSTRTG ! -s 10.10.0.0/16 -d 10.10.0.0/16 -m comment --comment "flanneld masq" -j MASQUERADE --random-fully EOM read -r -d '' FORWARD_RULES << EOM diff --git a/go.mod b/go.mod index fd53508d9..1126109ef 100644 --- a/go.mod +++ b/go.mod @@ -127,7 +127,7 @@ require ( honnef.co/go/tools v0.2.2 // indirect k8s.io/klog/v2 v2.70.1 // indirect k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect - k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect + k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) diff --git a/main.go b/main.go index f12f68eb6..fb05ce89d 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,6 @@ import ( "net/http" "os" "os/signal" - "path/filepath" "strconv" "strings" "sync" @@ -94,6 +93,7 @@ type CmdLineOpts struct { iptablesForwardRules bool netConfPath string setNodeNetworkUnavailable bool + useMultiClusterCidr bool } var ( @@ -130,6 +130,7 @@ func init() { flannelFlags.BoolVar(&opts.iptablesForwardRules, "iptables-forward-rules", true, "add default accept rules to FORWARD chain in iptables") flannelFlags.StringVar(&opts.netConfPath, "net-config-path", "/etc/kube-flannel/net-conf.json", "path to the network configuration file") flannelFlags.BoolVar(&opts.setNodeNetworkUnavailable, "set-node-network-unavailable", true, "set NodeNetworkUnavailable after ready") + flannelFlags.BoolVar(&opts.useMultiClusterCidr, "use-multi-cluster-cidr", false, "use MultiClusterCIDR API (alpha)") log.InitFlags(nil) @@ -169,7 +170,13 @@ func usage() { func newSubnetManager(ctx context.Context) (subnet.Manager, error) { if opts.kubeSubnetMgr { - return kube.NewSubnetManager(ctx, opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath, opts.setNodeNetworkUnavailable) + return kube.NewSubnetManager(ctx, + opts.kubeApiUrl, + opts.kubeConfigFile, + opts.kubeAnnotationPrefix, + opts.netConfPath, + opts.setNodeNetworkUnavailable, + opts.useMultiClusterCidr) } cfg := &etcd.EtcdConfig{ @@ -333,7 +340,14 @@ func main() { // Set up ipMasq if needed if opts.ipMasq { if config.EnableIPv4 { - if err = recycleIPTables(subnet.GetFlannelNetwork(config), bn.Lease()); err != nil { + net, err := config.GetFlannelNetwork(&bn.Lease().Subnet) + if err != nil { + log.Error(err) + cancel() + wg.Wait() + os.Exit(1) + } + if err = recycleIPTables(net, bn.Lease()); err != nil { log.Errorf("Failed to recycle IPTables rules, %v", err) cancel() wg.Wait() @@ -341,10 +355,25 @@ func main() { } log.Infof("Setting up masking rules") network.CreateIP4Chain("nat", "FLANNEL-POSTRTG") - go network.SetupAndEnsureIP4Tables(network.MasqRules(subnet.GetFlannelNetwork(config), bn.Lease()), opts.iptablesResyncSeconds) + getRules := func() []network.IPTablesRule { + if config.HasNetworks() { + return network.MasqRules(config.Networks, bn.Lease()) + } else { + return network.MasqRules([]ip.IP4Net{config.Network}, bn.Lease()) + } + } + go network.SetupAndEnsureIP4Tables(getRules, opts.iptablesResyncSeconds) + } if config.EnableIPv6 { - if err = recycleIP6Tables(subnet.GetFlannelIPv6Network(config), bn.Lease()); err != nil { + ip6net, err := config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet) + if err != nil { + log.Error(err) + cancel() + wg.Wait() + os.Exit(1) + } + if err = recycleIP6Tables(ip6net, bn.Lease()); err != nil { log.Errorf("Failed to recycle IP6Tables rules, %v", err) cancel() wg.Wait() @@ -352,7 +381,14 @@ func main() { } log.Infof("Setting up masking ip6 rules") network.CreateIP6Chain("nat", "FLANNEL-POSTRTG") - go network.SetupAndEnsureIP6Tables(network.MasqIP6Rules(subnet.GetFlannelIPv6Network(config), bn.Lease()), opts.iptablesResyncSeconds) + getRules := func() []network.IPTablesRule { + if config.HasIPv6Networks() { + return network.MasqIP6Rules(config.IPv6Networks, bn.Lease()) + } else { + return network.MasqIP6Rules([]ip.IP6Net{config.IPv6Network}, bn.Lease()) + } + } + go network.SetupAndEnsureIP6Tables(getRules, opts.iptablesResyncSeconds) } } @@ -361,18 +397,38 @@ func main() { // In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP. if opts.iptablesForwardRules { if config.EnableIPv4 { + net, err := config.GetFlannelNetwork(&bn.Lease().Subnet) + if err != nil { + log.Error(err) + cancel() + wg.Wait() + os.Exit(1) + } log.Infof("Changing default FORWARD chain policy to ACCEPT") network.CreateIP4Chain("filter", "FLANNEL-FWD") - go network.SetupAndEnsureIP4Tables(network.ForwardRules(subnet.GetFlannelNetwork(config).String()), opts.iptablesResyncSeconds) + getRules := func() []network.IPTablesRule { + return network.ForwardRules(net.String()) + } + go network.SetupAndEnsureIP4Tables(getRules, opts.iptablesResyncSeconds) } if config.EnableIPv6 { + ip6net, err := config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet) + if err != nil { + log.Error(err) + cancel() + wg.Wait() + os.Exit(1) + } log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT") network.CreateIP6Chain("filter", "FLANNEL-FWD") - go network.SetupAndEnsureIP6Tables(network.ForwardRules(subnet.GetFlannelIPv6Network(config).String()), opts.iptablesResyncSeconds) + getRules := func() []network.IPTablesRule { + return network.ForwardRules(ip6net.String()) + } + go network.SetupAndEnsureIP6Tables(getRules, opts.iptablesResyncSeconds) } } - if err := WriteSubnetFile(opts.subnetFile, config, opts.ipMasq, bn); err != nil { + if err := sm.HandleSubnetFile(opts.subnetFile, config, opts.ipMasq, bn.Lease().Subnet, bn.Lease().IPv6Subnet, bn.MTU()); err != nil { // Continue, even though it failed. log.Warningf("Failed to write subnet file: %s", err) } else { @@ -409,15 +465,24 @@ func main() { } func recycleIPTables(nw ip.IP4Net, lease *subnet.Lease) error { - prevNetwork := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK") + prevNetworks := ReadCIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK") prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET") + + //Find the cidr in FLANNEL_NETWORK which contains the podCIDR (i.e. FLANNEL_SUBNET) of this node + prevNetwork := ip.IP4Net{} + for _, net := range prevNetworks { + if net.ContainsCIDR(&prevSubnet) { + prevNetwork = net + break + } + } // recycle iptables rules only when network configured or subnet leased is not equal to current one. if prevNetwork != nw && prevSubnet != lease.Subnet { log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules", nw, lease.Subnet, prevNetwork, prevSubnet) lease := &subnet.Lease{ Subnet: prevSubnet, } - if err := network.DeleteIP4Tables(network.MasqRules(prevNetwork, lease)); err != nil { + if err := network.DeleteIP4Tables(network.MasqRules(prevNetworks, lease)); err != nil { return err } } @@ -425,15 +490,25 @@ func recycleIPTables(nw ip.IP4Net, lease *subnet.Lease) error { } func recycleIP6Tables(nw ip.IP6Net, lease *subnet.Lease) error { - prevNetwork := ReadIP6CIDRFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_NETWORK") + prevNetworks := ReadIP6CIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_NETWORK") prevSubnet := ReadIP6CIDRFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_SUBNET") + + //Find the cidr in FLANNEL_IPV6_NETWORK which contains the podCIDR (i.e. FLANNEL_IPV6_SUBNET) of this node + prevNetwork := ip.IP6Net{} + for _, net := range prevNetworks { + if net.ContainsCIDR(&prevSubnet) { + prevNetwork = net + break + } + } + // recycle iptables rules only when network configured or subnet leased is not equal to current one. if prevNetwork.String() != nw.String() && prevSubnet.String() != lease.IPv6Subnet.String() { log.Infof("Current ipv6 network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old ip6tables rules", nw, lease.IPv6Subnet, prevNetwork, prevSubnet) lease := &subnet.Lease{ IPv6Subnet: prevSubnet, } - if err := network.DeleteIP6Tables(network.MasqIP6Rules(prevNetwork, lease)); err != nil { + if err := network.DeleteIP6Tables(network.MasqIP6Rules(prevNetworks, lease)); err != nil { return err } } @@ -476,47 +551,6 @@ func getConfig(ctx context.Context, sm subnet.Manager) (*subnet.Config, error) { } } -func WriteSubnetFile(path string, config *subnet.Config, ipMasq bool, bn backend.Network) error { - dir, name := filepath.Split(path) - err := os.MkdirAll(dir, 0755) - if err != nil { - return err - } - tempFile := filepath.Join(dir, "."+name) - f, err := os.Create(tempFile) - if err != nil { - return err - } - if config.EnableIPv4 { - nw := config.Network - sn := bn.Lease().Subnet - // Write out the first usable IP by incrementing sn.IP by one - sn.IncrementIP() - fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw) - fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn) - } - if config.EnableIPv6 { - ip6Nw := config.IPv6Network - ip6Sn := bn.Lease().IPv6Subnet - // Write out the first usable IP by incrementing ip6Sn.IP by one - ip6Sn.IncrementIP() - fmt.Fprintf(f, "FLANNEL_IPV6_NETWORK=%s\n", ip6Nw) - fmt.Fprintf(f, "FLANNEL_IPV6_SUBNET=%s\n", ip6Sn) - } - - fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU()) - _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq) - f.Close() - if err != nil { - return err - } - - // rename(2) the temporary file to the desired location so that it becomes - // atomically visible with the contents - return os.Rename(tempFile, path) - // TODO - is this safe? What if it's not on the same FS? -} - func mustRunHealthz(stopChan <-chan struct{}, wg *sync.WaitGroup) { address := net.JoinHostPort(opts.healthzIP, strconv.Itoa(opts.healthzPort)) log.Infof("Start healthz server on %s", address) @@ -556,33 +590,71 @@ func mustRunHealthz(stopChan <-chan struct{}, wg *sync.WaitGroup) { } func ReadCIDRFromSubnetFile(path string, CIDRKey string) ip.IP4Net { - var prevCIDR ip.IP4Net + prevCIDRs := ReadCIDRsFromSubnetFile(path, CIDRKey) + if len(prevCIDRs) == 0 { + log.Warningf("no subnet found for key: %s in file: %s", CIDRKey, path) + return ip.IP4Net{} + } else if len(prevCIDRs) > 1 { + log.Errorf("error reading subnet: more than 1 entry found for key: %s in file %s: ", CIDRKey, path) + return ip.IP4Net{} + } else { + return prevCIDRs[0] + } +} + +func ReadCIDRsFromSubnetFile(path string, CIDRKey string) []ip.IP4Net { + prevCIDRs := make([]ip.IP4Net, 0) if _, err := os.Stat(path); !os.IsNotExist(err) { prevSubnetVals, err := godotenv.Read(path) if err != nil { log.Errorf("Couldn't fetch previous %s from subnet file at %s: %s", CIDRKey, path, err) } else if prevCIDRString, ok := prevSubnetVals[CIDRKey]; ok { - err = prevCIDR.UnmarshalJSON([]byte(prevCIDRString)) - if err != nil { - log.Errorf("Couldn't parse previous %s from subnet file at %s: %s", CIDRKey, path, err) + cidrs := strings.Split(prevCIDRString, ",") + prevCIDRs = make([]ip.IP4Net, 0) + for i := range cidrs { + _, cidr, err := net.ParseCIDR(cidrs[i]) + if err != nil { + log.Errorf("Couldn't parse previous %s from subnet file at %s: %s", CIDRKey, path, err) + } + prevCIDRs = append(prevCIDRs, ip.FromIPNet(cidr)) } + } } - return prevCIDR + return prevCIDRs } func ReadIP6CIDRFromSubnetFile(path string, CIDRKey string) ip.IP6Net { - var prevCIDR ip.IP6Net + prevCIDRs := ReadIP6CIDRsFromSubnetFile(path, CIDRKey) + if len(prevCIDRs) == 0 { + log.Warningf("no subnet found for key: %s in file: %s", CIDRKey, path) + return ip.IP6Net{} + } else if len(prevCIDRs) > 1 { + log.Errorf("error reading subnet: more than 1 entry found for key: %s in file %s: ", CIDRKey, path) + return ip.IP6Net{} + } else { + return prevCIDRs[0] + } +} + +func ReadIP6CIDRsFromSubnetFile(path string, CIDRKey string) []ip.IP6Net { + prevCIDRs := make([]ip.IP6Net, 0) if _, err := os.Stat(path); !os.IsNotExist(err) { prevSubnetVals, err := godotenv.Read(path) if err != nil { log.Errorf("Couldn't fetch previous %s from subnet file at %s: %s", CIDRKey, path, err) } else if prevCIDRString, ok := prevSubnetVals[CIDRKey]; ok { - err = prevCIDR.UnmarshalJSON([]byte(prevCIDRString)) - if err != nil { - log.Errorf("Couldn't parse previous %s from subnet file at %s: %s", CIDRKey, path, err) + cidrs := strings.Split(prevCIDRString, ",") + prevCIDRs = make([]ip.IP6Net, 0) + for i := range cidrs { + _, cidr, err := net.ParseCIDR(cidrs[i]) + if err != nil { + log.Errorf("Couldn't parse previous %s from subnet file at %s: %s", CIDRKey, path, err) + } + prevCIDRs = append(prevCIDRs, ip.FromIP6Net(cidr)) } + } } - return prevCIDR + return prevCIDRs } diff --git a/network/iptables.go b/network/iptables.go index fd198b019..87a72f1ee 100644 --- a/network/iptables.go +++ b/network/iptables.go @@ -48,88 +48,85 @@ type IPTablesRule struct { const kubeProxyMark string = "0x4000/0x4000" -func MasqRules(ipn ip.IP4Net, lease *subnet.Lease) []IPTablesRule { - n := ipn.String() - sn := lease.Subnet.String() - supports_random_fully := false +func MasqRules(cluster_cidrs []ip.IP4Net, lease *subnet.Lease) []IPTablesRule { + pod_cidr := lease.Subnet.String() ipt, err := iptables.New() - if err == nil { - supports_random_fully = ipt.HasRandomFully() - } - - if supports_random_fully { - return []IPTablesRule{ - // This rule ensure that the flannel iptables rules are executed before other rules on the node - {"nat", "-I", "POSTROUTING", []string{"-m", "comment", "--comment", "flanneld masq", "-j", "FLANNEL-POSTRTG"}}, - // This rule will not masquerade traffic marked by the kube-proxy to avoid double NAT bug on some kernel version - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-m", "mark", "--mark", kubeProxyMark, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0) - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // NAT if it's not multicast traffic - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "!", "-d", "224.0.0.0/4", "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", "--random-fully"}}, - // Prevent performing Masquerade on external traffic which arrives from a Node that owns the container/pod IP address - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", sn, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // Masquerade anything headed towards flannel from the host - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", "--random-fully"}}, - } - } else { - return []IPTablesRule{ - // This rule ensure that the flannel iptables rules are executed before other rules on the node - {"nat", "-I", "POSTROUTING", []string{"-m", "comment", "--comment", "flanneld masq", "-j", "FLANNEL-POSTRTG"}}, - // This rule will not masquerade traffic marked by the kube-proxy to avoid double NAT bug on some kernel version - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-m", "mark", "--mark", kubeProxyMark, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0) - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // NAT if it's not multicast traffic - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "!", "-d", "224.0.0.0/4", "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE"}}, - // Prevent performing Masquerade on external traffic which arrives from a Node that owns the container/pod IP address - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", sn, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // Masquerade anything headed towards flannel from the host - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE"}}, - } - } + var fully_randomize string + if err == nil && ipt.HasRandomFully() { + fully_randomize = "--random-fully" + } + rules := make([]IPTablesRule, 2) + // This rule ensure that the flannel iptables rules are executed before other rules on the node + rules[0] = IPTablesRule{"nat", "-I", "POSTROUTING", []string{"-m", "comment", "--comment", "flanneld masq", "-j", "FLANNEL-POSTRTG"}} + // This rule will not masquerade traffic marked by the kube-proxy to avoid double NAT bug on some kernel version + rules[1] = IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-m", "mark", "--mark", kubeProxyMark, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}} + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0), for any of the cluster_cidrs + rules = append(rules, + IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", pod_cidr, "-d", cluster_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, + IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", cluster_cidr, "-d", pod_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, + ) + } + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // Prevent performing Masquerade on external traffic which arrives from a Node that owns the container/pod IP address + rules = append(rules, IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", cluster_cidr, "-d", pod_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}) + } + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // NAT if it's not multicast traffic + rules = append(rules, IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", cluster_cidr, "!", "-d", "224.0.0.0/4", "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", fully_randomize}}) + } + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // Masquerade anything headed towards flannel from the host + rules = append(rules, IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", cluster_cidr, "-d", cluster_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", fully_randomize}}) + } + return rules } -func MasqIP6Rules(ipn ip.IP6Net, lease *subnet.Lease) []IPTablesRule { - n := ipn.String() - sn := lease.IPv6Subnet.String() - supports_random_fully := false +func MasqIP6Rules(cluster_cidrs []ip.IP6Net, lease *subnet.Lease) []IPTablesRule { + pod_cidr := lease.IPv6Subnet.String() ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6) - if err == nil { - supports_random_fully = ipt.HasRandomFully() - } - - if supports_random_fully { - return []IPTablesRule{ - // This rule ensure that the flannel iptables rules are executed before other rules on the node - {"nat", "-I", "POSTROUTING", []string{"-m", "comment", "--comment", "flanneld masq", "-j", "FLANNEL-POSTRTG"}}, - // This rule will not masquerade traffic marked by the kube-proxy to avoid double NAT bug on some kernel version - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-m", "mark", "--mark", kubeProxyMark, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0) - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // NAT if it's not multicast traffic - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "!", "-d", "ff00::/8", "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", "--random-fully"}}, - // Prevent performing Masquerade on external traffic which arrives from a Node that owns the container/pod IP address - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", sn, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // Masquerade anything headed towards flannel from the host - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", "--random-fully"}}, - } - } else { - return []IPTablesRule{ - // This rule ensure that the flannel iptables rules are executed before other rules on the node - {"nat", "-I", "POSTROUTING", []string{"-m", "comment", "--comment", "flanneld masq", "-j", "FLANNEL-POSTRTG"}}, - // This rule will not masquerade traffic marked by the kube-proxy to avoid double NAT bug on some kernel version - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-m", "mark", "--mark", kubeProxyMark, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0) - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // NAT if it's not multicast traffic - {"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", n, "!", "-d", "ff00::/8", "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE"}}, - // Prevent performing Masquerade on external traffic which arrives from a Node that owns the container/pod IP address - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", sn, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, - // Masquerade anything headed towards flannel from the host - {"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", n, "-d", n, "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE"}}, - } + var fully_randomize string + if err == nil && ipt.HasRandomFully() { + fully_randomize = "--random-fully" + } + rules := make([]IPTablesRule, 2) + + // This rule ensure that the flannel iptables rules are executed before other rules on the node + rules[0] = IPTablesRule{"nat", "-I", "POSTROUTING", []string{"-m", "comment", "--comment", "flanneld masq", "-j", "FLANNEL-POSTRTG"}} + // This rule will not masquerade traffic marked by the kube-proxy to avoid double NAT bug on some kernel version + rules[1] = IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-m", "mark", "--mark", kubeProxyMark, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}} + + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0), for any of the cluster_cidrs + rules = append(rules, + IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", pod_cidr, "-d", cluster_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, + IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", cluster_cidr, "-d", pod_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}, + ) + } + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // Prevent performing Masquerade on external traffic which arrives from a Node that owns the container/pod IP address + rules = append(rules, IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", cluster_cidr, "-d", pod_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "RETURN"}}) + } + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // NAT if it's not multicast traffic + rules = append(rules, IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"-s", cluster_cidr, "!", "-d", "ff00::/8", "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", fully_randomize}}) + } + for _, ccidr := range cluster_cidrs { + cluster_cidr := ccidr.String() + // Masquerade anything headed towards flannel from the host + rules = append(rules, IPTablesRule{"nat", "-A", "FLANNEL-POSTRTG", []string{"!", "-s", cluster_cidr, "-d", cluster_cidr, "-m", "comment", "--comment", "flanneld masq", "-j", "MASQUERADE", fully_randomize}}) + + } + + return rules } func ForwardRules(flannelNetwork string) []IPTablesRule { @@ -272,7 +269,9 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []IPTable return nil } -func SetupAndEnsureIP4Tables(rules []IPTablesRule, resyncPeriod int) { +func SetupAndEnsureIP4Tables(getRules func() []IPTablesRule, resyncPeriod int) { + rules := getRules() + log.Infof("generated %d rules", len(rules)) ipt, err := iptables.New() if err != nil { // if we can't find iptables, give up and return @@ -301,7 +300,7 @@ func SetupAndEnsureIP4Tables(rules []IPTablesRule, resyncPeriod int) { for { // Ensure that all the iptables rules exist every 5 seconds - if err := ensureIPTables(ipt, iptRestore, rules); err != nil { + if err := ensureIPTables(ipt, iptRestore, getRules()); err != nil { log.Errorf("Failed to ensure iptables rules: %v", err) } @@ -309,7 +308,8 @@ func SetupAndEnsureIP4Tables(rules []IPTablesRule, resyncPeriod int) { } } -func SetupAndEnsureIP6Tables(rules []IPTablesRule, resyncPeriod int) { +func SetupAndEnsureIP6Tables(getRules func() []IPTablesRule, resyncPeriod int) { + rules := getRules() ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6) if err != nil { // if we can't find iptables, give up and return @@ -338,7 +338,7 @@ func SetupAndEnsureIP6Tables(rules []IPTablesRule, resyncPeriod int) { for { // Ensure that all the iptables rules exist every 5 seconds - if err := ensureIPTables(ipt, iptRestore, rules); err != nil { + if err := ensureIPTables(ipt, iptRestore, getRules()); err != nil { log.Errorf("Failed to ensure iptables rules: %v", err) } diff --git a/network/iptables_test.go b/network/iptables_test.go index ed3a9cbb3..4d6adcd9a 100644 --- a/network/iptables_test.go +++ b/network/iptables_test.go @@ -119,7 +119,10 @@ func (mock *MockIPTables) AppendUnique(table string, chain string, rulespec ...s func TestDeleteRules(t *testing.T) { ipt := &MockIPTables{t: t} iptr := &MockIPTablesRestore{t: t} - baseRules := MasqRules(ip.IP4Net{}, lease()) + baseRules := MasqRules([]ip.IP4Net{{ + IP: ip.MustParseIP4("10.0.1.0"), + PrefixLen: 16, + }}, lease()) expectedRules := expectedTearDownIPTablesRestoreRules(baseRules) err := ipTablesBootstrap(ipt, iptr, baseRules) @@ -130,8 +133,8 @@ func TestDeleteRules(t *testing.T) { if err != nil { t.Error("Error setting up iptables") } - if len(ipt.rules) != 6 { - t.Errorf("Should be 6 masqRules, there are actually %d: %#v", len(ipt.rules), ipt.rules) + if len(ipt.rules) != 7 { + t.Errorf("Should be 7 masqRules, there are actually %d: %#v", len(ipt.rules), ipt.rules) } iptr.rules = []IPTablesRestoreRules{} diff --git a/pkg/ip/ip6net.go b/pkg/ip/ip6net.go index eafb3f6ae..7de69eeb9 100644 --- a/pkg/ip/ip6net.go +++ b/pkg/ip/ip6net.go @@ -134,6 +134,14 @@ func (n IP6Net) StringSep(hexSep, prefixSep string) string { return fmt.Sprintf("%s%s%d", n.IP.String(), prefixSep, n.PrefixLen) } +func MapIP6ToString(nws []IP6Net) []string { + res := make([]string, len(nws)) + for i := range nws { + res[i] = nws[i].String() + } + return res +} + func (n IP6Net) Network() IP6Net { mask := net.CIDRMask(int(n.PrefixLen), 128) return IP6Net{ @@ -196,6 +204,12 @@ func (n IP6Net) Contains(ip *IP6) bool { return (IP6)(*network).String() == (IP6)(*subnet).String() } +func (n *IP6Net) ContainsCIDR(other *IP6Net) bool { + ones1 := n.Mask() + ones2 := other.Mask() + return ones1.Cmp(ones2) <= 0 && n.Contains(other.IP) +} + func (n IP6Net) Empty() bool { return IsEmpty(n.IP) && n.PrefixLen == uint(0) } diff --git a/pkg/ip/ipnet.go b/pkg/ip/ipnet.go index ae0b8b027..a616b9010 100644 --- a/pkg/ip/ipnet.go +++ b/pkg/ip/ipnet.go @@ -121,6 +121,14 @@ func (n IP4Net) StringSep(octetSep, prefixSep string) string { return fmt.Sprintf("%s%s%d", n.IP.StringSep(octetSep), prefixSep, n.PrefixLen) } +func MapIP4ToString(nws []IP4Net) []string { + res := make([]string, len(nws)) + for i := range nws { + res[i] = nws[i].String() + } + return res +} + func (n IP4Net) Network() IP4Net { return IP4Net{ n.IP & IP4(n.Mask()), @@ -178,6 +186,12 @@ func (n IP4Net) Contains(ip IP4) bool { return (uint32(n.IP) & n.Mask()) == (uint32(ip) & n.Mask()) } +func (n *IP4Net) ContainsCIDR(other *IP4Net) bool { + ones1 := n.Mask() + ones2 := other.Mask() + return ones1 <= ones2 && n.Contains(other.IP) +} + func (n IP4Net) Empty() bool { return n.IP == IP4(0) && n.PrefixLen == uint(0) } diff --git a/subnet/config.go b/subnet/config.go index e428235ca..18b72ea93 100644 --- a/subnet/config.go +++ b/subnet/config.go @@ -19,8 +19,11 @@ import ( "errors" "fmt" "math/big" + "net" "github.com/flannel-io/flannel/pkg/ip" + "k8s.io/klog" + netutils "k8s.io/utils/net" ) type Config struct { @@ -69,9 +72,14 @@ func ParseConfig(s string) (*Config, error) { } cfg.BackendType = bt + cfg.Networks = make([]ip.IP4Net, 0) + cfg.IPv6Networks = make([]ip.IP6Net, 0) + return cfg, nil } +// CheckNetworkConfig checks the coherence of the flannel configuration. +// It is used only with the local network manager, not with the kubernetes-based manager. func CheckNetworkConfig(config *Config) error { if config.EnableIPv4 { if config.Network.Empty() { @@ -196,20 +204,102 @@ func CheckNetworkConfig(config *Config) error { return nil } -func GetFlannelNetwork(config *Config) ip.IP4Net { - if len(config.Networks) > 0 { - //TODO_TF select network properly - return config.Networks[0] +// GetFlannelNetwork returns the relevant IPv4 network (i.e. clusterCIDR) for subnet sn +// If Networks is not empty, GetFlannelNetwork returns the first networks that contains subnet sn. +// If Networks is empty, this means we are not using the MultiClusterCIDR API +// so GetFlannelNetwork falls back to the standard behavior and returns the single Network entry +func (c *Config) GetFlannelNetwork(sn *ip.IP4Net) (ip.IP4Net, error) { + if c.HasNetworks() { + for _, net := range c.Networks { + if net.ContainsCIDR(sn) { + return net, nil + } + } + return ip.IP4Net{}, fmt.Errorf("could not find flannel networks matching subnet %s", sn) + } else { + emptyNet := ip.IP4Net{} + if c.Network != emptyNet { + return c.Network, nil + } else { + return emptyNet, fmt.Errorf("could not find an ipv4 network in the flannel configuration") + } + } +} + +// GetFlannelIPv6Network returns the relevant IPv6 network (i.e. clusterCIDR) for subnet sn +// If Networks is not empty, GetFlannelIPv6Network returns the first networks that contains subnet sn. +// If Networks is empty, this means we are not using the MultiClusterCIDR API +// so GetFlannelIPv6Network falls back to the standard behavior and returns the single IPv6Network entry +func (c *Config) GetFlannelIPv6Network(sn *ip.IP6Net) (ip.IP6Net, error) { + if c.HasIPv6Networks() { + for _, net := range c.IPv6Networks { + if net.ContainsCIDR(sn) { + return net, nil + } + } + return ip.IP6Net{}, fmt.Errorf("could not find flannel ipv6 networks matching subnet %s", sn) + } else { + emptyNet := ip.IP6Net{} + if c.IPv6Network != emptyNet { + return c.IPv6Network, nil + } else { + return emptyNet, fmt.Errorf("could not find an ipv6 network in the flannel configuration") + } + + } +} + +// AddNetwork adds net to either c.Networks or c.IPv6Networks depending on its type +func (c *Config) AddNetwork(net *net.IPNet) { + if netutils.IsIPv4CIDR(net) { + ip4net := ip.FromIPNet(net) + if !c.containsIPv4Network(ip4net) { + c.Networks = append(c.Networks, ip4net) + } + } else if netutils.IsIPv6CIDR(net) { + ip6net := ip.FromIP6Net(net) + if !c.containsIPv6Network(ip6net) { + c.IPv6Networks = append(c.IPv6Networks, ip6net) + } + } else { + klog.Warningf("cannot add unknown CIDR to config: %s", net) + } +} + +func (c *Config) containsIPv4Network(net ip.IP4Net) bool { + for _, ip4net := range c.Networks { + if net.Equal(ip4net) { + return true + } + } + return false +} + +func (c *Config) containsIPv6Network(net ip.IP6Net) bool { + for _, ip6net := range c.IPv6Networks { + if net.Equal(ip6net) { + return true + } + } + return false +} + +// HasNetworks returns true if there is at least 1 IPv4 network in the flannel config, +// false otherwise +func (c *Config) HasNetworks() bool { + if c.Networks != nil { + return len(c.Networks) > 0 } else { - return config.Network + return false } } -func GetFlannelIPv6Network(config *Config) ip.IP6Net { - if len(config.IPv6Networks) > 0 { - //TODO_TF select network properly - return config.IPv6Networks[0] +// HasIPv6Networks returns true if there is at least 1 IPv6 network in the flannel config, +// false otherwise +func (c *Config) HasIPv6Networks() bool { + if c.IPv6Networks != nil { + return len(c.IPv6Networks) > 0 } else { - return config.IPv6Network + return false } } diff --git a/subnet/config_test.go b/subnet/config_test.go index 7e5f7422d..66adbe4b0 100644 --- a/subnet/config_test.go +++ b/subnet/config_test.go @@ -15,6 +15,7 @@ package subnet import ( + "net" "testing" ) @@ -129,3 +130,24 @@ func TestIPv6ConfigOverrides(t *testing.T) { t.Errorf("IPv6SubnetLen mismatch: expected 124, got %d", cfg.IPv6SubnetLen) } } + +func TestIPv6ConfigNetworks(t *testing.T) { + s := `{ "EnableIPv6": true, "IPv6Network": "fc00::/48", "enableIPv4": false }` + + cfg, err := ParseConfig(s) + if err != nil { + t.Fatalf("ParseConfig failed: %s", err) + } + ipv6 := net.ParseIP("fc00::") + + cfg.AddNetwork(&net.IPNet{IP: ipv6, Mask: net.CIDRMask(48, 128)}) + if len(cfg.IPv6Networks) >= 2 { + t.Fatalf("too many elements in IPv6Networks: %s", cfg.IPv6Networks) + } + ipv6 = net.ParseIP("fc01::") + + cfg.AddNetwork(&net.IPNet{IP: ipv6, Mask: net.CIDRMask(48, 128)}) + if len(cfg.IPv6Networks) != 2 { + t.Fatalf("IPv6 network not added properly to IPv6Networks: %s", cfg.IPv6Networks) + } +} diff --git a/subnet/etcd/local_manager.go b/subnet/etcd/local_manager.go index 102b756ca..ed8a17d90 100644 --- a/subnet/etcd/local_manager.go +++ b/subnet/etcd/local_manager.go @@ -454,3 +454,8 @@ func (m *LocalManager) Name() string { } return fmt.Sprintf("Etcd Local Manager with Previous Subnet: %s", previousSubnet) } + +// For etcd subnet manager, the file never changes so we just write it once at startup +func (m *LocalManager) HandleSubnetFile(path string, config *subnet.Config, ipMasq bool, sn ip.IP4Net, ipv6sn ip.IP6Net, mtu int) error { + return subnet.WriteSubnetFile(path, config, ipMasq, sn, ipv6sn, mtu) +} diff --git a/subnet/kube/cluster_cidr.go b/subnet/kube/cluster_cidr.go new file mode 100644 index 000000000..1a1d46a43 --- /dev/null +++ b/subnet/kube/cluster_cidr.go @@ -0,0 +1,101 @@ +// Copyright 2022 flannel authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kube + +import ( + "net" + + "github.com/flannel-io/flannel/subnet" + "golang.org/x/net/context" + networkingv1alpha1 "k8s.io/api/networking/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + log "k8s.io/klog" +) + +// handleAddClusterCidr is called every time a clustercidr resource is added +// to the kubernetes cluster. +// In flanneld, we need to add the new CIDRs (IPv4 and/or IPv6) to the configuration +// and update the configuration file used by the flannel cni plugin. +func (ksm *kubeSubnetManager) handleAddClusterCidr(obj interface{}) { + cluster := obj.(*networkingv1alpha1.ClusterCIDR) + if cluster == nil { + log.Errorf("received wrong object: %s", obj) + return + } + if cluster.Spec.IPv4 != "" { + log.Infof("handleAddClusterCidr: registering CIDR [ %s ]\n", cluster.Spec.IPv4) + _, cidr, err := net.ParseCIDR(cluster.Spec.IPv4) + if err != nil { + log.Errorf("error reading cluster spec: %s", err) + return + } + ksm.subnetConf.AddNetwork(cidr) + } + if cluster.Spec.IPv6 != "" { + log.Infof("handleAddClusterCidr: registering CIDR [ %s ]\n", cluster.Spec.IPv6) + _, cidr, err := net.ParseCIDR(cluster.Spec.IPv6) + if err != nil { + log.Errorf("error reading cluster spec: %s", err) + return + } + ksm.subnetConf.AddNetwork(cidr) + } + + err := subnet.WriteSubnetFile(ksm.snFileInfo.path, ksm.subnetConf, ksm.snFileInfo.ipMask, ksm.snFileInfo.sn, ksm.snFileInfo.IPv6sn, ksm.snFileInfo.mtu) + if err != nil { + log.Errorf("error writing subnet file: %s", err) + return + } +} + +// handleDeleteClusterCidr is called when flannel is notified that a clustercidr resource was deleted in the cluster. +// Since this should not happen with the current API, we log an error. +func (ksm *kubeSubnetManager) handleDeleteClusterCidr(obj interface{}) { + log.Error("deleting ClusterCIDR is not supported. This shouldn't get called") +} + +// readFlannelNetworksFromClusterCIDRList calls the k8s API to read all the clustercidr resources +// that exists when flannel starts. The cidrs are used to populate the Networks and IPv6Networks +// entries in the flannel configuration. +// This function is only used once when flannel starts. +// Later, we rely on an Informer to keep the configuration updated. +func readFlannelNetworksFromClusterCIDRList(ctx context.Context, c clientset.Interface, sc *subnet.Config) error { + clusters, err := c.NetworkingV1alpha1().ClusterCIDRs().List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + log.Infof("reading %d ClusterCIDRs from kube api\n", len(clusters.Items)) + for _, item := range clusters.Items { + if item.Spec.IPv4 != "" { + _, cidr, err := net.ParseCIDR(item.Spec.IPv4) + if err != nil { + return err + } + log.Infof("adding IPv4 CIDR %s to config.Networks", cidr) + sc.AddNetwork(cidr) + } + if item.Spec.IPv6 != "" { + _, cidr, err := net.ParseCIDR((item.Spec.IPv6)) + if err != nil { + return err + } + log.Infof("adding IPv6 CIDR %s to config.IPv6Networks", cidr) + sc.AddNetwork(cidr) + } + } + + return nil +} diff --git a/subnet/kube/kube.go b/subnet/kube/kube.go index a72b40741..85ac92242 100644 --- a/subnet/kube/kube.go +++ b/subnet/kube/kube.go @@ -28,6 +28,7 @@ import ( "github.com/flannel-io/flannel/subnet" "golang.org/x/net/context" v1 "k8s.io/api/core/v1" + networkingv1alpha1 "k8s.io/api/networking/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -51,6 +52,14 @@ const ( nodeControllerSyncTimeout = 10 * time.Minute ) +type subnetFileInfo struct { + path string + ipMask bool + sn ip.IP4Net + IPv6sn ip.IP6Net + mtu int +} + type kubeSubnetManager struct { enableIPv4 bool enableIPv6 bool @@ -61,11 +70,13 @@ type kubeSubnetManager struct { nodeController cache.Controller subnetConf *subnet.Config events chan subnet.Event + clusterCIDRController cache.Controller setNodeNetworkUnavailable bool disableNodeInformer bool + snFileInfo *subnetFileInfo } -func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPath string, setNodeNetworkUnavailable bool) (subnet.Manager, error) { +func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPath string, setNodeNetworkUnavailable, useMultiClusterCidr bool) (subnet.Manager, error) { var cfg *rest.Config var err error // Try to build kubernetes config from a master url or a kubeconfig filepath. If neither masterUrl @@ -112,7 +123,14 @@ func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPa return nil, fmt.Errorf("error parsing subnet config: %s", err) } - sm, err := newKubeSubnetManager(ctx, c, sc, nodeName, prefix) + if useMultiClusterCidr { + err = readFlannelNetworksFromClusterCIDRList(ctx, c, sc) + if err != nil { + return nil, fmt.Errorf("error reading flannel networks from k8s api: %s", err) + } + } + + sm, err := newKubeSubnetManager(ctx, c, sc, nodeName, prefix, useMultiClusterCidr) if err != nil { return nil, fmt.Errorf("error creating network manager: %s", err) } @@ -138,7 +156,7 @@ func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPa // newKubeSubnetManager fills the kubeSubnetManager. The most important part is the controller which will // watch for kubernetes node updates -func newKubeSubnetManager(ctx context.Context, c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) { +func newKubeSubnetManager(ctx context.Context, c clientset.Interface, sc *subnet.Config, nodeName, prefix string, useMultiClusterCidr bool) (*kubeSubnetManager, error) { var err error var ksm kubeSubnetManager ksm.annotations, err = newAnnotations(prefix) @@ -208,6 +226,31 @@ func newKubeSubnetManager(ctx context.Context, c clientset.Interface, sc *subnet ksm.nodeController = controller ksm.nodeStore = listers.NewNodeLister(indexer) } + + if useMultiClusterCidr { + _, clusterController := cache.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return ksm.client.NetworkingV1alpha1().ClusterCIDRs().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ksm.client.NetworkingV1alpha1().ClusterCIDRs().Watch(ctx, options) + }, + }, + &networkingv1alpha1.ClusterCIDR{}, + resyncPeriod, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ksm.handleAddClusterCidr(obj) + }, + DeleteFunc: func(obj interface{}) { + ksm.handleDeleteClusterCidr(obj) + }, + }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + ksm.clusterCIDRController = clusterController + } return &ksm, nil } @@ -390,19 +433,27 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.Le Expiration: time.Now().Add(24 * time.Hour), } if cidr != nil && ksm.enableIPv4 { - if !containsCIDR(subnet.GetFlannelNetwork(ksm.subnetConf).ToIPNet(), cidr) { + ipnet := ip.FromIPNet(cidr) + net, err := ksm.subnetConf.GetFlannelNetwork(&ipnet) + if err != nil { + return nil, err + } + // this check is still needed when we use the flannel configuration and not the MultiClusterCIDR API + if !containsCIDR(net.ToIPNet(), cidr) { return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q PodCIDR of the %q node", ksm.subnetConf.Network, cidr, ksm.nodeName) } lease.Subnet = ip.FromIPNet(cidr) } if ipv6Cidr != nil { - if subnet.GetFlannelIPv6Network(ksm.subnetConf).IP == nil { - return nil, fmt.Errorf("subnet %q specified in the PodCIDR, but doesn't exist in the flannel net config of the %q node", ipv6Cidr, ksm.nodeName) + ip6net := ip.FromIP6Net(ipv6Cidr) + net, err := ksm.subnetConf.GetFlannelIPv6Network(&ip6net) + if err != nil { + return nil, err } - - if !containsCIDR(subnet.GetFlannelIPv6Network(ksm.subnetConf).ToIPNet(), ipv6Cidr) { - return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q IPv6 PodCIDR of the %q node", subnet.GetFlannelIPv6Network(ksm.subnetConf), ipv6Cidr, ksm.nodeName) + // this check is still needed when we use the flannel configuration and not the MultiClusterCIDR API + if !containsCIDR(net.ToIPNet(), ipv6Cidr) { + return nil, fmt.Errorf("subnet %q specified in the flannel net config doesn't contain %q IPv6 PodCIDR of the %q node", net, ipv6Cidr, ksm.nodeName) } lease.IPv6Subnet = ip.FromIP6Net(ipv6Cidr) @@ -519,6 +570,21 @@ func (ksm *kubeSubnetManager) Name() string { // CompleteLease Set Kubernetes NodeNetworkUnavailable to false when starting // https://kubernetes.io/docs/concepts/architecture/nodes/#condition func (ksm *kubeSubnetManager) CompleteLease(ctx context.Context, lease *subnet.Lease, wg *sync.WaitGroup) error { + if ksm.clusterCIDRController != nil { + //start clusterController after all subnet manager has been fully initialized + log.Info("starting clusterCIDR controller...") + go ksm.clusterCIDRController.Run(ctx.Done()) + + log.Infof("Waiting %s for clusterCIDR controller to sync...", nodeControllerSyncTimeout) + err := wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) { + return ksm.clusterCIDRController.HasSynced(), nil + }) + + if err != nil { + return fmt.Errorf("error waiting for clusterCIDR to sync state: %v", err) + } + log.Infof("clusterCIDR controller sync successful") + } if !ksm.setNodeNetworkUnavailable { // not set NodeNetworkUnavailable NodeCondition return nil @@ -546,3 +612,17 @@ func containsCIDR(ipnet1, ipnet2 *net.IPNet) bool { ones2, _ := ipnet2.Mask.Size() return ones1 <= ones2 && ipnet1.Contains(ipnet2.IP) } + +// HandleSubnetFile writes the configuration file used by the CNI flannel plugin +// and stores the immutable data in a dedicated struct of the subnet manager +// so that we can update the file later when a clustercidr resource is created. +func (m *kubeSubnetManager) HandleSubnetFile(path string, config *subnet.Config, ipMasq bool, sn ip.IP4Net, ipv6sn ip.IP6Net, mtu int) error { + m.snFileInfo = &subnetFileInfo{ + path: path, + ipMask: ipMasq, + sn: sn, + IPv6sn: ipv6sn, + mtu: mtu, + } + return subnet.WriteSubnetFile(path, config, ipMasq, sn, ipv6sn, mtu) +} diff --git a/subnet/subnet.go b/subnet/subnet.go index b8a950f8c..14c8bb024 100644 --- a/subnet/subnet.go +++ b/subnet/subnet.go @@ -19,8 +19,11 @@ import ( "errors" "fmt" "net" + "os" + "path/filepath" "regexp" "strconv" + "strings" "sync" "time" @@ -142,8 +145,55 @@ func MakeSubnetKey(sn ip.IP4Net, sn6 ip.IP6Net) string { } } +func WriteSubnetFile(path string, config *Config, ipMasq bool, sn ip.IP4Net, ipv6sn ip.IP6Net, mtu int) error { + dir, name := filepath.Split(path) + err := os.MkdirAll(dir, 0755) + if err != nil { + return err + } + tempFile := filepath.Join(dir, "."+name) + f, err := os.Create(tempFile) + if err != nil { + return err + } + if config.EnableIPv4 { + if config.HasNetworks() { + fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", strings.Join(ip.MapIP4ToString(config.Networks), ",")) + } else { + fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", config.Network) + } + // Write out the first usable IP by incrementing sn.IP by one + sn.IncrementIP() + + fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn) + } + if config.EnableIPv6 { + if config.HasIPv6Networks() { + fmt.Fprintf(f, "FLANNEL_IPV6_NETWORK=%s\n", strings.Join(ip.MapIP6ToString(config.IPv6Networks), ",")) + } else { + fmt.Fprintf(f, "FLANNEL_IPV6_NETWORK=%s\n", config.IPv6Network) + } + // Write out the first usable IP by incrementing ip6Sn.IP by one + ipv6sn.IncrementIP() + fmt.Fprintf(f, "FLANNEL_IPV6_SUBNET=%s\n", ipv6sn) + } + + fmt.Fprintf(f, "FLANNEL_MTU=%d\n", mtu) + _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq) + f.Close() + if err != nil { + return err + } + + // rename(2) the temporary file to the desired location so that it becomes + // atomically visible with the contents + return os.Rename(tempFile, path) + // TODO - is this safe? What if it's not on the same FS? +} + type Manager interface { GetNetworkConfig(ctx context.Context) (*Config, error) + HandleSubnetFile(path string, config *Config, ipMasq bool, sn ip.IP4Net, ipv6sn ip.IP6Net, mtu int) error AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Lease, error) RenewLease(ctx context.Context, lease *Lease) error WatchLease(ctx context.Context, sn ip.IP4Net, sn6 ip.IP6Net, cursor interface{}) (LeaseWatchResult, error)