Skip to content

Commit

Permalink
Basic high-availability for auto egress IPs
Browse files Browse the repository at this point in the history
If a namespace has multiple egress IPs, monitor egress traffic and
switch to an alternate egress IP if the currently-selected one appears
dead.
  • Loading branch information
danwinship committed May 2, 2018
1 parent f295bbe commit 928f1cd
Show file tree
Hide file tree
Showing 4 changed files with 483 additions and 29 deletions.
55 changes: 52 additions & 3 deletions pkg/network/node/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import (
networkapi "github.com/openshift/origin/pkg/network/apis/network"
"github.com/openshift/origin/pkg/network/common"
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
"github.com/openshift/origin/pkg/util/netutils"

"github.com/vishvananda/netlink"
)

type nodeEgress struct {
nodeIP string
sdnIP string
requestedIPs sets.String
offline bool
}

type namespaceEgress struct {
Expand All @@ -48,6 +51,7 @@ type egressIPWatcher struct {

networkInformers networkinformers.SharedInformerFactory
iptables *NodeIPTables
vxlanMonitor *egressVXLANMonitor

nodesByNodeIP map[string]*nodeEgress
namespacesByVNID map[uint32]*namespaceEgress
Expand Down Expand Up @@ -87,6 +91,10 @@ func (eip *egressIPWatcher) Start(networkInformers networkinformers.SharedInform
eip.networkInformers = networkInformers
eip.iptables = iptables

updates := make(chan *egressVXLANNode)
eip.vxlanMonitor = newEgressVXLANMonitor(eip.oc.ovs, updates)
go eip.watchVXLAN(updates)

eip.watchHostSubnets()
eip.watchNetNamespaces()
return nil
Expand Down Expand Up @@ -179,17 +187,23 @@ func (eip *egressIPWatcher) handleAddOrUpdateHostSubnet(obj, _ interface{}, even
hs := obj.(*networkapi.HostSubnet)
glog.V(5).Infof("Watch %s event for HostSubnet %q", eventType, hs.Name)

eip.updateNodeEgress(hs.HostIP, hs.EgressIPs)
_, cidr, err := net.ParseCIDR(hs.Subnet)
if err != nil {
utilruntime.HandleError(fmt.Errorf("could not parse HostSubnet %q CIDR: %v", hs.Name, err))
}
sdnIP := netutils.GenerateDefaultGateway(cidr).String()

eip.updateNodeEgress(hs.HostIP, sdnIP, hs.EgressIPs)
}

func (eip *egressIPWatcher) handleDeleteHostSubnet(obj interface{}) {
hs := obj.(*networkapi.HostSubnet)
glog.V(5).Infof("Watch %s event for HostSubnet %q", watch.Deleted, hs.Name)

eip.updateNodeEgress(hs.HostIP, nil)
eip.updateNodeEgress(hs.HostIP, "", nil)
}

func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []string) {
func (eip *egressIPWatcher) updateNodeEgress(nodeIP, sdnIP string, nodeEgressIPs []string) {
eip.Lock()
defer eip.Unlock()

Expand All @@ -200,11 +214,18 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri
}
node = &nodeEgress{
nodeIP: nodeIP,
sdnIP: sdnIP,
requestedIPs: sets.NewString(),
}
eip.nodesByNodeIP[nodeIP] = node
if eip.vxlanMonitor != nil && node.nodeIP != eip.localIP {
eip.vxlanMonitor.AddNode(node.nodeIP, node.sdnIP)
}
} else if len(nodeEgressIPs) == 0 {
delete(eip.nodesByNodeIP, nodeIP)
if eip.vxlanMonitor != nil {
eip.vxlanMonitor.RemoveNode(node.nodeIP)
}
}
oldRequestedIPs := node.requestedIPs
node.requestedIPs = sets.NewString(nodeEgressIPs...)
Expand Down Expand Up @@ -350,6 +371,8 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error
if active == nil {
if eg.assignedNodeIP == "" {
glog.V(4).Infof("VNID %d cannot use unassigned egress IP %s", ns.vnid, eg.ip)
} else if len(ns.requestedIPs) > 1 && eg.nodes[0].offline {
glog.V(4).Infof("VNID %d cannot use egress IP %s on offline node %s", ns.vnid, eg.ip, eg.assignedNodeIP)
} else {
active = eg
}
Expand Down Expand Up @@ -429,3 +452,29 @@ func (eip *egressIPWatcher) releaseEgressIP(egressIP, mark string) error {

return nil
}

func (eip *egressIPWatcher) watchVXLAN(updates chan *egressVXLANNode) {
for node := range updates {
eip.updateNode(node.nodeIP, node.offline)
}
}

func (eip *egressIPWatcher) updateNode(nodeIP string, offline bool) {
eip.Lock()
defer eip.Unlock()

node := eip.nodesByNodeIP[nodeIP]
if node == nil {
eip.vxlanMonitor.RemoveNode(nodeIP)
return
}

node.offline = offline
for _, ip := range node.requestedIPs.UnsortedList() {
eg := eip.egressIPs[ip]
if eg != nil {
eip.egressIPChanged(eg)
}
}
eip.syncEgressIPs()
}
52 changes: 26 additions & 26 deletions pkg/network/node/egressip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func setupEgressIPWatcher(t *testing.T) (*egressIPWatcher, []string) {
func TestEgressIP(t *testing.T) {
eip, flows := setupEgressIPWatcher(t)

eip.updateNodeEgress("172.17.0.3", []string{})
eip.updateNodeEgress("172.17.0.4", []string{})
eip.updateNodeEgress("172.17.0.3", "", []string{})
eip.updateNodeEgress("172.17.0.4", "", []string{})
eip.deleteNamespaceEgress(42)
eip.deleteNamespaceEgress(43)

Expand All @@ -168,7 +168,7 @@ func TestEgressIP(t *testing.T) {
t.Fatalf("%v", err)
}

eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) // Added .100
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) // Added .100
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -179,8 +179,8 @@ func TestEgressIP(t *testing.T) {
}

// Assign HostSubnet.EgressIP first, then NetNamespace.EgressIP, with a remote EgressIP
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.101", "172.17.0.100"}) // Added .101
eip.updateNodeEgress("172.17.0.5", []string{"172.17.0.105"}) // Added .105
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.101", "172.17.0.100"}) // Added .101
eip.updateNodeEgress("172.17.0.5", "", []string{"172.17.0.105"}) // Added .105
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestEgressIP(t *testing.T) {
t.Fatalf("%v", err)
}

eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.102", "172.17.0.104"}) // Added .102, .104
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.102", "172.17.0.104"}) // Added .102, .104
err = assertNetlinkChange(eip, "claim 172.17.0.104")
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -245,7 +245,7 @@ func TestEgressIP(t *testing.T) {
}

// Assign HostSubnet.EgressIP first, then NetNamespace.EgressIP, with a local EgressIP
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.102", "172.17.0.103"}) // Added .103, Dropped .104
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.102", "172.17.0.103"}) // Added .103, Dropped .104
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestEgressIP(t *testing.T) {
}

// Drop remote node EgressIP
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) // Dropped .101
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) // Dropped .101
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -299,7 +299,7 @@ func TestEgressIP(t *testing.T) {
}

// Drop local node EgressIP
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.102"}) // Dropped .103
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.102"}) // Dropped .103
err = assertNetlinkChange(eip, "release 172.17.0.103")
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -310,7 +310,7 @@ func TestEgressIP(t *testing.T) {
}

// Add them back, swapped
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100", "172.17.0.103"}) // Added .103
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100", "172.17.0.103"}) // Added .103
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -320,7 +320,7 @@ func TestEgressIP(t *testing.T) {
t.Fatalf("%v", err)
}

eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.101", "172.17.0.102"}) // Added .101
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.101", "172.17.0.102"}) // Added .101
err = assertNetlinkChange(eip, "claim 172.17.0.101")
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -335,7 +335,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) {
eip, flows := setupEgressIPWatcher(t)

eip.updateNamespaceEgress(42, []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"})
err := assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"},
)
Expand All @@ -351,7 +351,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) {
}

// Now assigning that IP to a node should switch OVS to use that since it's first in the list
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.101"})
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.101"})
err = assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Local},
)
Expand All @@ -369,7 +369,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) {
}

// Removing the original egress IP from its node should leave us with one working IP
eip.updateNodeEgress("172.17.0.3", nil)
eip.updateNodeEgress("172.17.0.3", "", nil)
err = assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Local},
)
Expand All @@ -378,7 +378,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) {
}

// Removing the remaining egress IP should now kill the namespace
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.200"})
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.200"})
err = assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Dropped},
)
Expand All @@ -387,8 +387,8 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) {
}

// Now add the egress IPs back...
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.101"})
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.101"})
err = assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"},
)
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestNodeIPAsEgressIP(t *testing.T) {
eip, flows := setupEgressIPWatcher(t)

// Trying to assign node IP as egress IP should fail. (It will log an error but this test doesn't notice that.)
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.4", "172.17.0.102"})
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.4", "172.17.0.102"})
err := assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -454,7 +454,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) {
eip, flows := setupEgressIPWatcher(t)

eip.updateNamespaceEgress(42, []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"})
err := assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"})
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -463,7 +463,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) {
// Adding the Egress IP to another node should not work and should cause the
// namespace to start dropping traffic. (And in particular, even though we're
// adding the Egress IP to the local node, there should not be a netlink change.)
eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.100"})
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -474,7 +474,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) {
}

// Removing the duplicate node egressIP should restore traffic to the broken namespace
eip.updateNodeEgress("172.17.0.4", []string{})
eip.updateNodeEgress("172.17.0.4", "", []string{})
err = assertNoNetlinkChanges(eip)
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -485,7 +485,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) {
}

// As above, but with a remote node IP
eip.updateNodeEgress("172.17.0.5", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.5", "", []string{"172.17.0.100"})
err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Dropped})
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -507,7 +507,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) {

// Removing the original egress node should result in the "duplicate" egress node
// now being used.
eip.updateNodeEgress("172.17.0.3", []string{})
eip.updateNodeEgress("172.17.0.3", "", []string{})
err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.5"})
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -518,7 +518,7 @@ func TestDuplicateNamespaceEgressIPs(t *testing.T) {
eip, flows := setupEgressIPWatcher(t)

eip.updateNamespaceEgress(42, []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"})
err := assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"})
if err != nil {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestDuplicateNamespaceEgressIPs(t *testing.T) {
// cause the rules to get deleted and then added back in the opposite order,
// which assertNoOVSChanges() would complain about, so we have to use
// assertOVSChanges() instead.
eip.updateNodeEgress("172.17.0.3", []string{})
eip.updateNodeEgress("172.17.0.3", "", []string{})
err = assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Dropped},
egressOVSChange{vnid: 43, egress: Dropped},
Expand All @@ -569,7 +569,7 @@ func TestDuplicateNamespaceEgressIPs(t *testing.T) {
t.Fatalf("%v", err)
}

eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"})
eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"})
err = assertOVSChanges(eip, &flows,
egressOVSChange{vnid: 42, egress: Dropped},
egressOVSChange{vnid: 43, egress: Dropped},
Expand Down
Loading

0 comments on commit 928f1cd

Please sign in to comment.