Skip to content

Commit

Permalink
Merge hostgw network with ipip network
Browse files Browse the repository at this point in the history
  • Loading branch information
chenchun committed Nov 30, 2017
1 parent 4436858 commit 0444147
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 503 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ K8S_VERSION=v1.6.6
GOARM=7

# These variables can be overridden by setting an environment variable.
TEST_PACKAGES?=pkg/ip subnet subnet/etcdv2 network backend/hostgw
TEST_PACKAGES?=pkg/ip subnet subnet/etcdv2 network backend
TEST_PACKAGES_EXPANDED=$(TEST_PACKAGES:%=github.com/coreos/flannel/%)
PACKAGES?=$(TEST_PACKAGES) network
PACKAGES_EXPANDED=$(PACKAGES:%=github.com/coreos/flannel/%)
Expand Down
178 changes: 178 additions & 0 deletions backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package backend

import (
"bytes"
"net"
"sync"
"time"

"golang.org/x/net/context"

"github.com/coreos/flannel/subnet"
log "github.com/golang/glog"
"github.com/vishvananda/netlink"
)

type ExternalInterface struct {
Expand Down Expand Up @@ -61,3 +66,176 @@ func (n *SimpleNetwork) MTU() int {
func (_ *SimpleNetwork) Run(ctx context.Context) {
<-ctx.Done()
}

const (
routeCheckRetries = 10
)

type RouteNetwork struct {
SimpleNetwork
BackendType string
routes []netlink.Route
SM subnet.Manager
GetRoute func(lease *subnet.Lease) *netlink.Route
Mtu int
LinkIndex int
}

func (n *RouteNetwork) MTU() int {
return n.Mtu
}

func (n *RouteNetwork) Run(ctx context.Context) {
wg := sync.WaitGroup{}

log.Info("Watching for new subnet leases")
evts := make(chan []subnet.Event)
wg.Add(1)
go func() {
subnet.WatchLeases(ctx, n.SM, n.SubnetLease, evts)
wg.Done()
}()

n.routes = make([]netlink.Route, 0, 10)
wg.Add(1)
go func() {
n.routeCheck(ctx)
wg.Done()
}()

defer wg.Wait()

for {
select {
case evtBatch := <-evts:
n.handleSubnetEvents(evtBatch)

case <-ctx.Done():
return
}
}
}

func (n *RouteNetwork) handleSubnetEvents(batch []subnet.Event) {
for _, evt := range batch {
switch evt.Type {
case subnet.EventAdded:
log.Infof("Subnet added: %v via %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP)

if evt.Lease.Attrs.BackendType != n.BackendType {
log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
continue
}
route := n.GetRoute(&evt.Lease)

n.addToRouteList(*route)
// Check if route exists before attempting to add it
routeList, err := netlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{Dst: route.Dst}, netlink.RT_FILTER_DST)
if err != nil {
log.Warningf("Unable to list routes: %v", err)
}
if len(routeList) > 0 && !routeEqual(routeList[0], *route) {
// Same Dst different Gw or different link index. Remove it, correct route will be added below.
log.Warningf("Replacing existing route to %v via %v dev index %d with %v via %v dev index %d.", evt.Lease.Subnet, routeList[0].Gw, routeList[0].LinkIndex, evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, route.LinkIndex)
if err := netlink.RouteDel(&routeList[0]); err != nil {
log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
continue
}
n.removeFromRouteList(routeList[0])
}
if len(routeList) > 0 && routeEqual(routeList[0], *route) {
// Same Dst and same Gw, keep it and do not attempt to add it.
log.Infof("Route to %v via %v dev index %d already exists, skipping.", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, routeList[0].LinkIndex)
} else if err := netlink.RouteAdd(route); err != nil {
log.Errorf("Error adding route to %v via %v dev index %d: %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, route.LinkIndex, err)
continue
}

case subnet.EventRemoved:
log.Info("Subnet removed: ", evt.Lease.Subnet)

if evt.Lease.Attrs.BackendType != n.BackendType {
log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
continue
}

route := n.GetRoute(&evt.Lease)
// Always remove the route from the route list.
n.removeFromRouteList(*route)

if err := netlink.RouteDel(route); err != nil {
log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
continue
}

default:
log.Error("Internal error: unknown event type: ", int(evt.Type))
}
}
}

func (n *RouteNetwork) addToRouteList(route netlink.Route) {
for _, r := range n.routes {
if routeEqual(r, route) {
return
}
}
n.routes = append(n.routes, route)
}

func (n *RouteNetwork) removeFromRouteList(route netlink.Route) {
for index, r := range n.routes {
if routeEqual(r, route) {
n.routes = append(n.routes[:index], n.routes[index+1:]...)
return
}
}
}

func (n *RouteNetwork) routeCheck(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(routeCheckRetries * time.Second):
n.checkSubnetExistInRoutes()
}
}
}

func (n *RouteNetwork) checkSubnetExistInRoutes() {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_V4)
if err == nil {
for _, route := range n.routes {
exist := false
for _, r := range routeList {
if r.Dst == nil {
continue
}
if routeEqual(r, route) {
exist = true
break
}
}
if !exist {
if err := netlink.RouteAdd(&route); err != nil {
if nerr, ok := err.(net.Error); !ok {
log.Errorf("Error recovering route to %v: %v, %v", route.Dst, route.Gw, nerr)
}
continue
} else {
log.Infof("Route recovered %v : %v", route.Dst, route.Gw)
}
}
}
} else {
log.Errorf("Error fetching route list. Will automatically retry: %v", err)
}
}

func routeEqual(x, y netlink.Route) bool {
if x.Dst.IP.Equal(y.Dst.IP) && x.Gw.Equal(y.Gw) && bytes.Equal(x.Dst.Mask, y.Dst.Mask) && x.LinkIndex == y.LinkIndex {
return true
}
return false
}
34 changes: 22 additions & 12 deletions backend/hostgw/hostgw_network_test.go → backend/common_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 flannel authors
// Copyright 2017 flannel authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,13 +13,12 @@
// limitations under the License.
// +build !windows

package hostgw
package backend

import (
"net"
"testing"

"github.com/coreos/flannel/backend"
"github.com/coreos/flannel/pkg/ip"
"github.com/coreos/flannel/pkg/ns"
"github.com/coreos/flannel/subnet"
Expand All @@ -40,26 +39,37 @@ func TestRouteCache(t *testing.T) {
if err := netlink.LinkSetUp(lo); err != nil {
t.Fatal(err)
}
nw := network{extIface: &backend.ExternalInterface{Iface: &net.Interface{Index: lo.Attrs().Index}}}
nw := RouteNetwork{
SimpleNetwork: SimpleNetwork{
ExtIface: &ExternalInterface{Iface: &net.Interface{Index: lo.Attrs().Index}},
},
}
nw.GetRoute = func(lease *subnet.Lease) *netlink.Route {
return &netlink.Route{
Dst: lease.Subnet.ToIPNet(),
Gw: lease.Attrs.PublicIP.ToIP(),
LinkIndex: nw.LinkIndex,
}
}
gw1, gw2 := ip.FromIP(net.ParseIP("127.0.0.1")), ip.FromIP(net.ParseIP("127.0.0.2"))
subnet1 := ip.IP4Net{IP: ip.FromIP(net.ParseIP("192.168.0.0")), PrefixLen: 24}
nw.handleSubnetEvents([]subnet.Event{
{Type: subnet.EventAdded, Lease: subnet.Lease{Subnet: subnet1, Attrs: subnet.LeaseAttrs{PublicIP: gw1, BackendType: "host-gw"}}},
})
if len(nw.rl) != 1 {
t.Fatal(nw.rl)
if len(nw.routes) != 1 {
t.Fatal(nw.routes)
}
if !routeEqual(nw.rl[0], netlink.Route{Dst: subnet1.ToIPNet(), Gw: gw1.ToIP()}) {
t.Fatal(nw.rl[0])
if !routeEqual(nw.routes[0], netlink.Route{Dst: subnet1.ToIPNet(), Gw: gw1.ToIP()}) {
t.Fatal(nw.routes[0])
}
// change gateway of previous route
nw.handleSubnetEvents([]subnet.Event{
{Type: subnet.EventAdded, Lease: subnet.Lease{
Subnet: subnet1, Attrs: subnet.LeaseAttrs{PublicIP: gw2, BackendType: "host-gw"}}}})
if len(nw.rl) != 1 {
t.Fatal(nw.rl)
if len(nw.routes) != 1 {
t.Fatal(nw.routes)
}
if !routeEqual(nw.rl[0], netlink.Route{Dst: subnet1.ToIPNet(), Gw: gw2.ToIP()}) {
t.Fatal(nw.rl[0])
if !routeEqual(nw.routes[0], netlink.Route{Dst: subnet1.ToIPNet(), Gw: gw2.ToIP()}) {
t.Fatal(nw.routes[0])
}
}
28 changes: 17 additions & 11 deletions backend/hostgw/hostgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@ import (
"github.com/coreos/flannel/backend"
"github.com/coreos/flannel/pkg/ip"
"github.com/coreos/flannel/subnet"
"github.com/vishvananda/netlink"
"golang.org/x/net/context"
)

func init() {
backend.Register("host-gw", New)
}

const (
routeCheckRetries = 10
)

type HostgwBackend struct {
sm subnet.Manager
extIface *backend.ExternalInterface
networks map[string]*network
}

func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backend, error) {
Expand All @@ -46,16 +42,26 @@ func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backen
be := &HostgwBackend{
sm: sm,
extIface: extIface,
networks: make(map[string]*network),
}

return be, nil
}

func (be *HostgwBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
n := &network{
extIface: be.extIface,
sm: be.sm,
n := &backend.RouteNetwork{
SimpleNetwork: backend.SimpleNetwork{
ExtIface: be.extIface,
},
SM: be.sm,
BackendType: "host-gw",
Mtu: be.extIface.Iface.MTU,
LinkIndex: be.extIface.Iface.Index,
}
n.GetRoute = func(lease *subnet.Lease) *netlink.Route {
return &netlink.Route{
Dst: lease.Subnet.ToIPNet(),
Gw: lease.Attrs.PublicIP.ToIP(),
LinkIndex: n.LinkIndex,
}
}

attrs := subnet.LeaseAttrs{
Expand All @@ -66,7 +72,7 @@ func (be *HostgwBackend) RegisterNetwork(ctx context.Context, config *subnet.Con
l, err := be.sm.AcquireLease(ctx, &attrs)
switch err {
case nil:
n.lease = l
n.SubnetLease = l

case context.Canceled, context.DeadlineExceeded:
return nil, err
Expand Down
Loading

0 comments on commit 0444147

Please sign in to comment.