Skip to content

Commit

Permalink
Simplify TrafficManager interface
Browse files Browse the repository at this point in the history
There are now only two methods:
- SetupAndEnsureForwardRules
- SetupAndEnsureMasqRules
This will help hide the implementation details that are specific to
iptables and nftables.
  • Loading branch information
thomasferrandiz committed Feb 12, 2024
1 parent 2d46bb0 commit b76def1
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 167 deletions.
173 changes: 41 additions & 132 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/coreos/pkg/flagutil"
"github.com/flannel-io/flannel/pkg/ip"
"github.com/flannel-io/flannel/pkg/ipmatch"
"github.com/flannel-io/flannel/pkg/lease"
"github.com/flannel-io/flannel/pkg/subnet"
etcd "github.com/flannel-io/flannel/pkg/subnet/etcd"
"github.com/flannel-io/flannel/pkg/subnet/kube"
Expand Down Expand Up @@ -338,95 +337,56 @@ func main() {

//Create TrafficManager and instanciate it based on whether we use iptables or nftables
trafficMngr := newTrafficManager()
flannelIPv4Net := ip.IP4Net{}
flannelIpv6Net := ip.IP6Net{}
if config.EnableIPv4 {
flannelIPv4Net, err = config.GetFlannelNetwork(&bn.Lease().Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
}
if config.EnableIPv6 {
flannelIpv6Net, err = config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
}
// Set up ipMasq if needed
if opts.ipMasq {
if config.EnableIPv4 {
net, err := config.GetFlannelNetwork(&bn.Lease().Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
if err = recycleIPTables(trafficMngr, net, bn.Lease()); err != nil {
log.Errorf("Failed to recycle IPTables rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Setting up masking rules")
trafficMngr.CreateIP4Chain("nat", "FLANNEL-POSTRTG")
getRules := func() []trafficmngr.IPTablesRule {
if config.HasNetworks() {
return trafficMngr.MasqRules(config.Networks, bn.Lease())
} else {
return trafficMngr.MasqRules([]ip.IP4Net{config.Network}, bn.Lease())
}
}
go trafficMngr.SetupAndEnsureIP4Tables(getRules, opts.iptablesResyncSeconds)

}
if config.EnableIPv6 {
ip6net, err := config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
if err = recycleIP6Tables(trafficMngr, ip6net, bn.Lease()); err != nil {
log.Errorf("Failed to recycle IP6Tables rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Setting up masking ip6 rules")
trafficMngr.CreateIP6Chain("nat", "FLANNEL-POSTRTG")
getRules := func() []trafficmngr.IPTablesRule {
if config.HasIPv6Networks() {
return trafficMngr.MasqIP6Rules(config.IPv6Networks, bn.Lease())
} else {
return trafficMngr.MasqIP6Rules([]ip.IP6Net{config.IPv6Network}, bn.Lease())
}
}
go trafficMngr.SetupAndEnsureIP6Tables(getRules, opts.iptablesResyncSeconds)
prevNetworks := ReadCIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK")
prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")

prevIPv6Networks := ReadIP6CIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_NETWORK")
prevIPv6Subnet := ReadIP6CIDRFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_SUBNET")

err = trafficMngr.SetupAndEnsureMasqRules(flannelIPv4Net, prevSubnet,
prevNetworks,
flannelIpv6Net, prevIPv6Subnet,
prevIPv6Networks,
bn.Lease(),
opts.iptablesResyncSeconds)
if err != nil {
log.Errorf("Failed to setup masq rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
}

// Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts)
// In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
// In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
if opts.iptablesForwardRules {
if config.EnableIPv4 {
net, err := config.GetFlannelNetwork(&bn.Lease().Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Changing default FORWARD chain policy to ACCEPT")
trafficMngr.CreateIP4Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return trafficMngr.ForwardRules(net.String())
}
go trafficMngr.SetupAndEnsureIP4Tables(getRules, opts.iptablesResyncSeconds)
}
if config.EnableIPv6 {
ip6net, err := config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT")
trafficMngr.CreateIP6Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return trafficMngr.ForwardRules(ip6net.String())
}
go trafficMngr.SetupAndEnsureIP6Tables(getRules, opts.iptablesResyncSeconds)
}
trafficMngr.SetupAndEnsureForwardRules(
flannelIPv4Net,
flannelIpv6Net,
opts.iptablesResyncSeconds)
}

if err := sm.HandleSubnetFile(opts.subnetFile, config, opts.ipMasq, bn.Lease().Subnet, bn.Lease().IPv6Subnet, bn.MTU()); err != nil {
Expand Down Expand Up @@ -465,57 +425,6 @@ func main() {
os.Exit(0)
}

func recycleIPTables(tm trafficmngr.TrafficManager, nw ip.IP4Net, myLease *lease.Lease) error {
prevNetworks := ReadCIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK")
prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")

//Find the cidr in FLANNEL_NETWORK which contains the podCIDR (i.e. FLANNEL_SUBNET) of this node
prevNetwork := ip.IP4Net{}
for _, net := range prevNetworks {
if net.ContainsCIDR(&prevSubnet) {
prevNetwork = net
break
}
}
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork != nw && prevSubnet != myLease.Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules", nw, myLease.Subnet, prevNetwork, prevSubnet)
newLease := &lease.Lease{
Subnet: prevSubnet,
}
if err := tm.DeleteIP4Tables(tm.MasqRules(prevNetworks, newLease)); err != nil {
return err
}
}
return nil
}

func recycleIP6Tables(tm trafficmngr.TrafficManager, nw ip.IP6Net, myLease *lease.Lease) error {
prevNetworks := ReadIP6CIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_NETWORK")
prevSubnet := ReadIP6CIDRFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_SUBNET")

//Find the cidr in FLANNEL_IPV6_NETWORK which contains the podCIDR (i.e. FLANNEL_IPV6_SUBNET) of this node
prevNetwork := ip.IP6Net{}
for _, net := range prevNetworks {
if net.ContainsCIDR(&prevSubnet) {
prevNetwork = net
break
}
}

// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork.String() != nw.String() && prevSubnet.String() != myLease.IPv6Subnet.String() {
log.Infof("Current ipv6 network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old ip6tables rules", nw, myLease.IPv6Subnet, prevNetwork, prevSubnet)
lease := &lease.Lease{
IPv6Subnet: prevSubnet,
}
if err := tm.DeleteIP6Tables(tm.MasqIP6Rules(prevNetworks, lease)); err != nil {
return err
}
}
return nil
}

func shutdownHandler(ctx context.Context, sigs chan os.Signal, cancel context.CancelFunc) {
// Wait for the context do be Done or for the signal to come in to shutdown.
select {
Expand Down
16 changes: 16 additions & 0 deletions pkg/subnet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,19 @@ func (c *Config) HasIPv6Networks() bool {
return false
}
}

func (c *Config) GetNetworks() []ip.IP4Net {
if len(c.Networks) > 0 {
return c.Networks
} else {
return []ip.IP4Net{c.Network}
}
}

func (c *Config) GeIPv6tNetworks() []ip.IP6Net {
if len(c.Networks) > 0 {
return c.IPv6Networks
} else {
return []ip.IP6Net{c.IPv6Network}
}
}
104 changes: 95 additions & 9 deletions pkg/trafficmngr/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,74 @@ type IPTablesManager struct{}

const kubeProxyMark string = "0x4000/0x4000"

func (iptm IPTablesManager) MasqRules(cluster_cidrs []ip.IP4Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
func (iptm IPTablesManager) SetupAndEnsureMasqRules(flannelIPv4Net, prevSubnet ip.IP4Net,
prevNetworks []ip.IP4Net,
flannelIPv6Net, prevIPv6Subnet ip.IP6Net,
prevIPv6Networks []ip.IP6Net,
currentlease *lease.Lease,
resyncPeriod int) error {
if flannelIPv4Net.String() != "" {
//Find the cidr in FLANNEL_NETWORK which contains the podCIDR (i.e. FLANNEL_SUBNET) of this node
prevNetwork := ip.IP4Net{}
for _, net := range prevNetworks {
if net.ContainsCIDR(&prevSubnet) {
prevNetwork = net
break
}
}
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork != flannelIPv4Net && prevSubnet != currentlease.Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules",
flannelIPv4Net, currentlease.Subnet, prevNetwork, prevSubnet)
newLease := &lease.Lease{
Subnet: prevSubnet,
}
if err := iptm.deleteIP4Tables(iptm.masqRules(prevNetworks, newLease)); err != nil {
return err
}
}

log.Infof("Setting up masking rules")
iptm.CreateIP4Chain("nat", "FLANNEL-POSTRTG")
//Note: doesn't work for multiple networks but we disabled MultiClusterCIDR anyway
getRules := func() []trafficmngr.IPTablesRule {
return iptm.masqRules([]ip.IP4Net{flannelIPv4Net}, currentlease)
}
go iptm.setupAndEnsureIP4Tables(getRules, resyncPeriod)
}
if flannelIPv6Net.String() != "" {
//Find the cidr in FLANNEL_IPV6_NETWORK which contains the podCIDR (i.e. FLANNEL_IPV6_SUBNET) of this node
prevIPv6Network := ip.IP6Net{}
for _, net := range prevIPv6Networks {
if net.ContainsCIDR(&prevIPv6Subnet) {
prevIPv6Network = net
break
}
}
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevIPv6Network != flannelIPv6Net && prevIPv6Subnet != currentlease.IPv6Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules",
flannelIPv6Net, currentlease.IPv6Subnet, prevIPv6Network, prevIPv6Subnet)
newLease := &lease.Lease{
IPv6Subnet: prevIPv6Subnet,
}
if err := iptm.deleteIP6Tables(iptm.masqIP6Rules(prevIPv6Networks, newLease)); err != nil {
return err
}
}

log.Infof("Setting up masking rules for IPv6")
iptm.CreateIP6Chain("nat", "FLANNEL-POSTRTG")
//Note: doesn't work for multiple networks but we disabled MultiClusterCIDR anyway
getRules := func() []trafficmngr.IPTablesRule {
return iptm.masqIP6Rules([]ip.IP6Net{flannelIPv6Net}, currentlease)
}
go iptm.setupAndEnsureIP6Tables(getRules, resyncPeriod)
}
return nil
}

func (iptm IPTablesManager) masqRules(cluster_cidrs []ip.IP4Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
pod_cidr := lease.Subnet.String()
ipt, err := iptables.New()
supports_random_fully := false
Expand Down Expand Up @@ -90,7 +157,7 @@ func (iptm IPTablesManager) MasqRules(cluster_cidrs []ip.IP4Net, lease *lease.Le
return rules
}

func (iptm IPTablesManager) MasqIP6Rules(cluster_cidrs []ip.IP6Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
func (iptm IPTablesManager) masqIP6Rules(cluster_cidrs []ip.IP6Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
pod_cidr := lease.IPv6Subnet.String()
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
supports_random_fully := false
Expand Down Expand Up @@ -141,7 +208,26 @@ func (iptm IPTablesManager) MasqIP6Rules(cluster_cidrs []ip.IP6Net, lease *lease
return rules
}

func (iptm IPTablesManager) ForwardRules(flannelNetwork string) []trafficmngr.IPTablesRule {
func (iptm IPTablesManager) SetupAndEnsureForwardRules(flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) {
if flannelIPv4Network.String() != "" {
log.Infof("Changing default FORWARD chain policy to ACCEPT")
iptm.CreateIP4Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return iptm.forwardRules(flannelIPv4Network.String())
}
go iptm.setupAndEnsureIP4Tables(getRules, resyncPeriod)
}
if flannelIPv6Network.String() != "" {
log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT")
iptm.CreateIP6Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return iptm.forwardRules(flannelIPv6Network.String())
}
go iptm.setupAndEnsureIP6Tables(getRules, resyncPeriod)
}
}

func (iptm IPTablesManager) forwardRules(flannelNetwork string) []trafficmngr.IPTablesRule {
return []trafficmngr.IPTablesRule{
// This rule ensure that the flannel iptables rules are executed before other rules on the node
{Table: "filter", Action: "-A", Chain: "FORWARD", Rulespec: []string{"-m", "comment", "--comment", "flanneld forward", "-j", "FLANNEL-FWD"}},
Expand Down Expand Up @@ -281,7 +367,7 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []traffic
return nil
}

func (iptm IPTablesManager) SetupAndEnsureIP4Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
func (iptm IPTablesManager) setupAndEnsureIP4Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
rules := getRules()
log.Infof("generated %d rules", len(rules))
ipt, err := iptables.New()
Expand Down Expand Up @@ -320,7 +406,7 @@ func (iptm IPTablesManager) SetupAndEnsureIP4Tables(getRules func() []trafficmng
}
}

func (iptm IPTablesManager) SetupAndEnsureIP6Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
func (iptm IPTablesManager) setupAndEnsureIP6Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
rules := getRules()
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
Expand Down Expand Up @@ -358,8 +444,8 @@ func (iptm IPTablesManager) SetupAndEnsureIP6Tables(getRules func() []trafficmng
}
}

// DeleteIP4Tables delete specified iptables rules
func (iptm IPTablesManager) DeleteIP4Tables(rules []trafficmngr.IPTablesRule) error {
// deleteIP4Tables delete specified iptables rules
func (iptm IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.New()
if err != nil {
// if we can't find iptables, give up and return
Expand All @@ -380,8 +466,8 @@ func (iptm IPTablesManager) DeleteIP4Tables(rules []trafficmngr.IPTablesRule) er
return nil
}

// DeleteIP6Tables delete specified iptables rules
func (iptm IPTablesManager) DeleteIP6Tables(rules []trafficmngr.IPTablesRule) error {
// deleteIP6Tables delete specified iptables rules
func (iptm IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
// if we can't find iptables, give up and return
Expand Down
2 changes: 1 addition & 1 deletion pkg/trafficmngr/iptables/iptables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestDeleteRules(t *testing.T) {
ipt := &MockIPTables{t: t}
iptr := &MockIPTablesRestore{t: t}
iptm := IPTablesManager{}
baseRules := iptm.MasqRules([]ip.IP4Net{{
baseRules := iptm.masqRules([]ip.IP4Net{{
IP: ip.MustParseIP4("10.0.1.0"),
PrefixLen: 16,
}}, testingLease())
Expand Down
Loading

0 comments on commit b76def1

Please sign in to comment.