From 86e070616474d141c6dbce281814dc7aacd9fff7 Mon Sep 17 00:00:00 2001 From: Wataru Ishida Date: Tue, 6 Dec 2016 02:42:46 -0500 Subject: [PATCH] support ipip This commit tries to make calico-bgp-daemon works identical with BIRD backend. struct ipamCache is introduced to sync the content under /calico/v1/ipam everytime we try to update FIB, we lookup this cache and set appropriate values (LinkIndex and netlink.FLAG_ONLINK) when IPIP is configured. Signed-off-by: Wataru Ishida --- Makefile | 2 +- ipam.go | 154 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 116 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 258 insertions(+), 14 deletions(-) create mode 100644 ipam.go diff --git a/Makefile b/Makefile index e72107d..eb25246 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ dist/gobgp: dist/calico-bgp-daemon: $(SRC_FILES) vendor mkdir -p $(@D) go build -v -o dist/calico-bgp-daemon \ - -ldflags "-X main.VERSION=$(GOBGPD_VERSION) -s -w" main.go + -ldflags "-X main.VERSION=$(GOBGPD_VERSION) -s -w" main.go ipam.go build-containerized: clean vendor dist/gobgp mkdir -p dist diff --git a/ipam.go b/ipam.go new file mode 100644 index 0000000..ca3d07a --- /dev/null +++ b/ipam.go @@ -0,0 +1,154 @@ +// Copyright (C) 2017 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + + etcd "github.com/coreos/etcd/client" + "github.com/osrg/gobgp/table" + "golang.org/x/net/context" +) + +type ipPool struct { + CIDR string `json:"cidr"` + IPIP string `json:"ipip"` + Mode string `json:"ipip_mode"` +} + +func (lhs *ipPool) equal(rhs *ipPool) bool { + if lhs == rhs { + return true + } + if lhs == nil || rhs == nil { + return false + } + return lhs.CIDR == rhs.CIDR && lhs.IPIP == rhs.IPIP && lhs.Mode == rhs.Mode +} + +// Contain returns true if this ipPool contains 'prefix' +func (p *ipPool) contain(prefix string) bool { + k := table.CidrToRadixkey(prefix) + l := table.CidrToRadixkey(p.CIDR) + return strings.HasPrefix(k, l) +} + +type ipamCache struct { + mu sync.RWMutex + m map[string]*ipPool + etcdAPI etcd.KeysAPI + updateHandler func(*ipPool) error +} + +// match checks whether we have an IP pool which contains the given prefix. +// If we have, it returns the pool. +func (c *ipamCache) match(prefix string) *ipPool { + c.mu.RLock() + defer c.mu.RUnlock() + for _, p := range c.m { + if p.contain(prefix) { + return p + } + } + return nil +} + +// update updates the internal map with IPAM updates when the update +// is new addtion to the map or changes the existing item, it calls +// updateHandler +func (c *ipamCache) update(node *etcd.Node, del bool) error { + c.mu.Lock() + defer c.mu.Unlock() + log.Printf("update ipam cache: %s, %v, %t", node.Key, node.Value, del) + if node.Dir { + return nil + } + p := &ipPool{} + if err := json.Unmarshal([]byte(node.Value), p); err != nil { + return err + } + if p.CIDR == "" { + return fmt.Errorf("empty cidr: %s", node.Value) + } + q := c.m[p.CIDR] + if del { + delete(c.m, p.CIDR) + return nil + } else if p.equal(q) { + return nil + } + + c.m[p.CIDR] = p + + if c.updateHandler != nil { + return c.updateHandler(p) + } + return nil +} + +// sync synchronizes the contents under /calico/v1/ipam +func (c *ipamCache) sync() error { + res, err := c.etcdAPI.Get(context.Background(), CALICO_IPAM, &etcd.GetOptions{Recursive: true}) + if err != nil { + return err + } + + var index uint64 + for _, node := range res.Node.Nodes { + if node.ModifiedIndex > index { + index = node.ModifiedIndex + } + if err = c.update(node, false); err != nil { + return err + } + } + + watcher := c.etcdAPI.Watcher(CALICO_IPAM, &etcd.WatcherOptions{Recursive: true, AfterIndex: index}) + for { + res, err := watcher.Next(context.Background()) + if err != nil { + return err + } + del := false + node := res.Node + switch res.Action { + case "set", "create", "update", "compareAndSwap": + case "delete": + del = true + node = res.PrevNode + default: + log.Printf("unhandled action: %s", res.Action) + continue + } + if err = c.update(node, del); err != nil { + return err + } + } + return nil +} + +// create new IPAM cache +func newIPAMCache(api etcd.KeysAPI, updateHandler func(*ipPool) error) *ipamCache { + return &ipamCache{ + m: make(map[string]*ipPool), + updateHandler: updateHandler, + etcdAPI: api, + } +} diff --git a/main.go b/main.go index 043d3b5..1bf49a0 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,4 @@ -// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// Copyright (C) 2016-2017 Nippon Telegraph and Telephone Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ const ( CALICO_PREFIX = "/calico" CALICO_BGP = CALICO_PREFIX + "/bgp/v1" CALICO_AGGR = CALICO_PREFIX + "/ipam/v2/host" + CALICO_IPAM = CALICO_PREFIX + "/v1/ipam" defaultDialTimeout = 30 * time.Second @@ -106,6 +107,7 @@ type Server struct { etcd etcd.KeysAPI ipv4 net.IP ipv6 net.IP + ipam *ipamCache } func NewServer() (*Server, error) { @@ -172,27 +174,75 @@ func (s *Server) Serve() { } if err := s.bgpServer.Start(globalConfig); err != nil { - log.Fatal(err) + log.Fatal("failed to start BGP server:", err) } if err := s.initialPolicySetting(); err != nil { log.Fatal(err) } - // monitor routes from other BGP peers and update FIB - s.t.Go(s.watchBGPPath) + s.ipam = newIPAMCache(s.etcd, s.ipamUpdateHandler) + // sync IPAM and call ipamUpdateHandler + s.t.Go(func() error { return fmt.Errorf("syncIPAM: %s", s.ipam.sync()) }) + // watch routes from other BGP peers and update FIB + s.t.Go(func() error { return fmt.Errorf("watchBGPPath: %s", s.watchBGPPath()) }) // watch prefix assigned and announce to other BGP peers - s.t.Go(s.watchPrefix) + s.t.Go(func() error { return fmt.Errorf("watchPrefix: %s", s.watchPrefix()) }) // watch BGP configuration - s.t.Go(s.watchBGPConfig) + s.t.Go(func() error { return fmt.Errorf("watchBGPConfig: %s", s.watchBGPConfig()) }) // watch routes added by kernel and announce to other BGP peers - s.t.Go(s.watchKernelRoute) + s.t.Go(func() error { return fmt.Errorf("watchKernelRoute: %s", s.watchKernelRoute()) }) <-s.t.Dying() log.Fatal(s.t.Err()) } +func isCrossSubnet(gw net.IP, subnet net.IPNet) bool { + p := &ipPool{CIDR: subnet.String()} + result := !p.contain(gw.String() + "/32") + return result +} + +func (s *Server) ipamUpdateHandler(pool *ipPool) error { + filter := &netlink.Route{ + Protocol: RTPROT_GOBGP, + } + list, err := netlink.RouteListFiltered(netlink.FAMILY_V4, filter, netlink.RT_FILTER_PROTOCOL) + if err != nil { + return err + } + node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)}) + if err != nil { + return err + } + + for _, route := range list { + if route.Dst == nil { + continue + } + if pool.contain(route.Dst.String()) { + ipip := pool.IPIP != "" + if pool.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) { + ipip = false + } + if ipip { + i, err := net.InterfaceByName(pool.IPIP) + if err != nil { + return err + } + route.LinkIndex = i.Index + route.SetFlag(netlink.FLAG_ONLINK) + } else { + route.LinkIndex = 0 + route.Flags = 0 + } + return netlink.RouteReplace(&route) + } + } + return nil +} + func (s *Server) getNodeASN() (numorstring.ASNumber, error) { return s.getPeerASN(os.Getenv(NODENAME)) } @@ -512,7 +562,15 @@ func (s *Server) watchBGPConfig() error { if err != nil { return err } - log.Printf("watch: %v", res) + prev := "" + if res.PrevNode != nil { + prev = res.PrevNode.Value + } + log.Printf("watch: action: %s, key: %s node: %s, prev-node: %s", res.Action, res.Node.Key, res.Node.Value, prev) + if res.Action == "set" && res.Node.Value == prev { + log.Printf("same value. ignore") + continue + } handleNonMeshNeighbor := func(neighborType string) error { switch res.Action { @@ -549,6 +607,9 @@ func (s *Server) watchBGPConfig() error { continue } deleteNeighbor := func(node *etcd.Node) error { + if node.Value == "" { + return nil + } n := &bgpconfig.Neighbor{ Config: bgpconfig.NeighborConfig{ NeighborAddress: node.Value, @@ -570,6 +631,9 @@ func (s *Server) watchBGPConfig() error { return err } } + if res.Node.Value == "" { + continue + } asn, err := s.getPeerASN(host) if err != nil { return err @@ -662,6 +726,7 @@ func (s *Server) watchKernelRoute() error { return err } for update := range ch { + log.Printf("kernel update: %s", update) if update.Table == syscall.RT_TABLE_MAIN && (update.Protocol == syscall.RTPROT_KERNEL || update.Protocol == syscall.RTPROT_BOOT) { isWithdrawal := false switch update.Type { @@ -672,11 +737,11 @@ func (s *Server) watchKernelRoute() error { log.Printf("unhandled rtm type: %d", update.Type) continue } - log.Printf("kernel update: %s", update) path, err := s.makePath(update.Dst.String(), isWithdrawal) if err != nil { return err } + log.Printf("made path from kernel update: %s", path) if _, err = s.bgpServer.AddPath("", []*bgptable.Path{path}); err != nil { return err } @@ -687,7 +752,7 @@ func (s *Server) watchKernelRoute() error { // injectRoute is a helper function to inject BGP routes to linux kernel // TODO: multipath support -func injectRoute(path *bgptable.Path) error { +func (s *Server) injectRoute(path *bgptable.Path) error { nexthop := path.GetNexthop() nlri := path.GetNlri() dst, _ := netlink.ParseIPNet(nlri.String()) @@ -696,11 +761,35 @@ func injectRoute(path *bgptable.Path) error { Gw: nexthop, Protocol: RTPROT_GOBGP, } + + if dst.IP.To4() != nil { + if p := s.ipam.match(nlri.String()); p != nil { + ipip := p.IPIP != "" + + node, err := s.client.Nodes().Get(calicoapi.NodeMetadata{Name: os.Getenv(NODENAME)}) + if err != nil { + return err + } + + if p.Mode == "cross-subnet" && !isCrossSubnet(route.Gw, node.Spec.BGP.IPv4Address.Network().IPNet) { + ipip = false + } + if ipip { + i, err := net.InterfaceByName(p.IPIP) + if err != nil { + return err + } + route.LinkIndex = i.Index + route.SetFlag(netlink.FLAG_ONLINK) + } + } + } + if path.IsWithdraw { log.Printf("removed route %s from kernel", nlri) return netlink.RouteDel(route) } - log.Printf("added route %s to kernel", nlri) + log.Printf("added route %s to kernel %s", nlri, route) return netlink.RouteReplace(route) } @@ -719,7 +808,7 @@ func (s *Server) watchBGPPath() error { if path.IsLocal() { continue } - if err := injectRoute(path); err != nil { + if err := s.injectRoute(path); err != nil { return err } } @@ -863,10 +952,11 @@ func main() { os.Exit(0) } - logrus.SetLevel(logrus.DebugLevel) + logrus.SetLevel(logrus.InfoLevel) server, err := NewServer() if err != nil { + log.Printf("failed to create new server") log.Fatal(err) }