Skip to content
This repository has been archived by the owner on Jan 22, 2021. It is now read-only.

Commit

Permalink
support ipip
Browse files Browse the repository at this point in the history
This commit tries to make calico-bgp-daemon works identical with BIRD
backend.

struct ipamCache is introduced to sync the content under /calico/v1/ipam
everytime we try to update FIB, we lookup this cache and set appropriate
values (LinkIndex and netlink.FLAG_ONLINK) when IPIP is configured.

Signed-off-by: Wataru Ishida <ishida.wataru@lab.ntt.co.jp>
  • Loading branch information
Wataru Ishida committed Mar 7, 2017
1 parent 7dafea6 commit 86e0706
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dist/gobgp:
dist/calico-bgp-daemon: $(SRC_FILES) vendor
mkdir -p $(@D)
go build -v -o dist/calico-bgp-daemon \
-ldflags "-X main.VERSION=$(GOBGPD_VERSION) -s -w" main.go
-ldflags "-X main.VERSION=$(GOBGPD_VERSION) -s -w" main.go ipam.go

build-containerized: clean vendor dist/gobgp
mkdir -p dist
Expand Down
154 changes: 154 additions & 0 deletions ipam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright (C) 2017 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"fmt"
"log"
"strings"
"sync"

etcd "github.com/coreos/etcd/client"
"github.com/osrg/gobgp/table"
"golang.org/x/net/context"
)

type ipPool struct {
CIDR string `json:"cidr"`
IPIP string `json:"ipip"`
Mode string `json:"ipip_mode"`
}

func (lhs *ipPool) equal(rhs *ipPool) bool {
if lhs == rhs {
return true
}
if lhs == nil || rhs == nil {
return false
}
return lhs.CIDR == rhs.CIDR && lhs.IPIP == rhs.IPIP && lhs.Mode == rhs.Mode
}

// Contain returns true if this ipPool contains 'prefix'
func (p *ipPool) contain(prefix string) bool {
k := table.CidrToRadixkey(prefix)
l := table.CidrToRadixkey(p.CIDR)
return strings.HasPrefix(k, l)
}

type ipamCache struct {
mu sync.RWMutex
m map[string]*ipPool
etcdAPI etcd.KeysAPI
updateHandler func(*ipPool) error
}

// match checks whether we have an IP pool which contains the given prefix.
// If we have, it returns the pool.
func (c *ipamCache) match(prefix string) *ipPool {
c.mu.RLock()
defer c.mu.RUnlock()
for _, p := range c.m {
if p.contain(prefix) {
return p
}
}
return nil
}

// update updates the internal map with IPAM updates when the update
// is new addtion to the map or changes the existing item, it calls
// updateHandler
func (c *ipamCache) update(node *etcd.Node, del bool) error {
c.mu.Lock()
defer c.mu.Unlock()
log.Printf("update ipam cache: %s, %v, %t", node.Key, node.Value, del)
if node.Dir {
return nil
}
p := &ipPool{}
if err := json.Unmarshal([]byte(node.Value), p); err != nil {
return err
}
if p.CIDR == "" {
return fmt.Errorf("empty cidr: %s", node.Value)
}
q := c.m[p.CIDR]
if del {
delete(c.m, p.CIDR)
return nil
} else if p.equal(q) {
return nil
}

c.m[p.CIDR] = p

if c.updateHandler != nil {
return c.updateHandler(p)
}
return nil
}

// sync synchronizes the contents under /calico/v1/ipam
func (c *ipamCache) sync() error {
res, err := c.etcdAPI.Get(context.Background(), CALICO_IPAM, &etcd.GetOptions{Recursive: true})
if err != nil {
return err
}

var index uint64
for _, node := range res.Node.Nodes {
if node.ModifiedIndex > index {
index = node.ModifiedIndex
}
if err = c.update(node, false); err != nil {
return err
}
}

watcher := c.etcdAPI.Watcher(CALICO_IPAM, &etcd.WatcherOptions{Recursive: true, AfterIndex: index})
for {
res, err := watcher.Next(context.Background())
if err != nil {
return err
}
del := false
node := res.Node
switch res.Action {
case "set", "create", "update", "compareAndSwap":
case "delete":
del = true
node = res.PrevNode
default:
log.Printf("unhandled action: %s", res.Action)
continue
}
if err = c.update(node, del); err != nil {
return err
}
}
return nil
}

// create new IPAM cache
func newIPAMCache(api etcd.KeysAPI, updateHandler func(*ipPool) error) *ipamCache {
return &ipamCache{
m: make(map[string]*ipPool),
updateHandler: updateHandler,
etcdAPI: api,
}
}
116 changes: 103 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
// Copyright (C) 2016-2017 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@ const (
CALICO_PREFIX = "/calico"
CALICO_BGP = CALICO_PREFIX + "/bgp/v1"
CALICO_AGGR = CALICO_PREFIX + "/ipam/v2/host"
CALICO_IPAM = CALICO_PREFIX + "/v1/ipam"

defaultDialTimeout = 30 * time.Second

Expand Down Expand Up @@ -106,6 +107,7 @@ type Server struct {
etcd etcd.KeysAPI
ipv4 net.IP
ipv6 net.IP
ipam *ipamCache
}

func NewServer() (*Server, error) {
Expand Down Expand Up @@ -172,27 +174,75 @@ func (s *Server) Serve() {
}

if err := s.bgpServer.Start(globalConfig); err != nil {
log.Fatal(err)
log.Fatal("failed to start BGP server:", err)
}

if err := s.initialPolicySetting(); err != nil {
log.Fatal(err)
}

// monitor routes from other BGP peers and update FIB
s.t.Go(s.watchBGPPath)
s.ipam = newIPAMCache(s.etcd, s.ipamUpdateHandler)
// sync IPAM and call ipamUpdateHandler
s.t.Go(func() error { return fmt.Errorf("syncIPAM: %s", s.ipam.sync()) })
// watch routes from other BGP peers and update FIB
s.t.Go(func() error { return fmt.Errorf("watchBGPPath: %s", s.watchBGPPath()) })
// watch prefix assigned and announce to other BGP peers
s.t.Go(s.watchPrefix)
s.t.Go(func() error { return fmt.Errorf("watchPrefix: %s", s.watchPrefix()) })
// watch BGP configuration
s.t.Go(s.watchBGPConfig)
s.t.Go(func() error { return fmt.Errorf("watchBGPConfig: %s", s.watchBGPConfig()) })
// watch routes added by kernel and announce to other BGP peers
s.t.Go(s.watchKernelRoute)
s.t.Go(func() error { return fmt.Errorf("watchKernelRoute: %s", s.watchKernelRoute()) })

<-s.t.Dying()
log.Fatal(s.t.Err())

}

func isCrossSubnet(gw net.IP, subnet net.IPNet) bool {
p := &ipPool{CIDR: subnet.String()}
result := !p.contain(gw.String() + "/32")
return result
}

func (s *Server) ipamUpdateHandler(pool *ipPool) error {
filter := &netlink.Route{
Protocol: RTPROT_GOBGP,
}
list, err := netlink.RouteListFiltered(netlink.FAMILY_V4, filter, netlink.RT_FILTER_PROTOCOL)
if err != nil {
return err
}
node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)})
if err != nil {
return err
}

for _, route := range list {
if route.Dst == nil {
continue
}
if pool.contain(route.Dst.String()) {
ipip := pool.IPIP != ""
if pool.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) {
ipip = false
}
if ipip {
i, err := net.InterfaceByName(pool.IPIP)
if err != nil {
return err
}
route.LinkIndex = i.Index
route.SetFlag(netlink.FLAG_ONLINK)
} else {
route.LinkIndex = 0
route.Flags = 0
}
return netlink.RouteReplace(&route)
}
}
return nil
}

func (s *Server) getNodeASN() (numorstring.ASNumber, error) {
return s.getPeerASN(os.Getenv(NODENAME))
}
Expand Down Expand Up @@ -512,7 +562,15 @@ func (s *Server) watchBGPConfig() error {
if err != nil {
return err
}
log.Printf("watch: %v", res)
prev := ""
if res.PrevNode != nil {
prev = res.PrevNode.Value
}
log.Printf("watch: action: %s, key: %s node: %s, prev-node: %s", res.Action, res.Node.Key, res.Node.Value, prev)
if res.Action == "set" && res.Node.Value == prev {
log.Printf("same value. ignore")
continue
}

handleNonMeshNeighbor := func(neighborType string) error {
switch res.Action {
Expand Down Expand Up @@ -549,6 +607,9 @@ func (s *Server) watchBGPConfig() error {
continue
}
deleteNeighbor := func(node *etcd.Node) error {
if node.Value == "" {
return nil
}
n := &bgpconfig.Neighbor{
Config: bgpconfig.NeighborConfig{
NeighborAddress: node.Value,
Expand All @@ -570,6 +631,9 @@ func (s *Server) watchBGPConfig() error {
return err
}
}
if res.Node.Value == "" {
continue
}
asn, err := s.getPeerASN(host)
if err != nil {
return err
Expand Down Expand Up @@ -662,6 +726,7 @@ func (s *Server) watchKernelRoute() error {
return err
}
for update := range ch {
log.Printf("kernel update: %s", update)
if update.Table == syscall.RT_TABLE_MAIN && (update.Protocol == syscall.RTPROT_KERNEL || update.Protocol == syscall.RTPROT_BOOT) {
isWithdrawal := false
switch update.Type {
Expand All @@ -672,11 +737,11 @@ func (s *Server) watchKernelRoute() error {
log.Printf("unhandled rtm type: %d", update.Type)
continue
}
log.Printf("kernel update: %s", update)
path, err := s.makePath(update.Dst.String(), isWithdrawal)
if err != nil {
return err
}
log.Printf("made path from kernel update: %s", path)
if _, err = s.bgpServer.AddPath("", []*bgptable.Path{path}); err != nil {
return err
}
Expand All @@ -687,7 +752,7 @@ func (s *Server) watchKernelRoute() error {

// injectRoute is a helper function to inject BGP routes to linux kernel
// TODO: multipath support
func injectRoute(path *bgptable.Path) error {
func (s *Server) injectRoute(path *bgptable.Path) error {
nexthop := path.GetNexthop()
nlri := path.GetNlri()
dst, _ := netlink.ParseIPNet(nlri.String())
Expand All @@ -696,11 +761,35 @@ func injectRoute(path *bgptable.Path) error {
Gw: nexthop,
Protocol: RTPROT_GOBGP,
}

if dst.IP.To4() != nil {
if p := s.ipam.match(nlri.String()); p != nil {
ipip := p.IPIP != ""

node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)})
if err != nil {
return err
}

if p.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) {
ipip = false
}
if ipip {
i, err := net.InterfaceByName(p.IPIP)
if err != nil {
return err
}
route.LinkIndex = i.Index
route.SetFlag(netlink.FLAG_ONLINK)
}
}
}

if path.IsWithdraw {
log.Printf("removed route %s from kernel", nlri)
return netlink.RouteDel(route)
}
log.Printf("added route %s to kernel", nlri)
log.Printf("added route %s to kernel %s", nlri, route)
return netlink.RouteReplace(route)
}

Expand All @@ -719,7 +808,7 @@ func (s *Server) watchBGPPath() error {
if path.IsLocal() {
continue
}
if err := injectRoute(path); err != nil {
if err := s.injectRoute(path); err != nil {
return err
}
}
Expand Down Expand Up @@ -863,10 +952,11 @@ func main() {
os.Exit(0)
}

logrus.SetLevel(logrus.DebugLevel)
logrus.SetLevel(logrus.InfoLevel)

server, err := NewServer()
if err != nil {
log.Printf("failed to create new server")
log.Fatal(err)
}

Expand Down

0 comments on commit 86e0706

Please sign in to comment.