Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gc rules #618

Merged
merged 2 commits into from
Apr 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"net"
"net/netip"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/vishvananda/netlink"
"k8s.io/apimachinery/pkg/util/sets"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/AliyunContainerService/terway/deviceplugin"
Expand Down Expand Up @@ -79,6 +83,8 @@

wg sync.WaitGroup

gcRulesOnce sync.Once

rpc.UnimplementedTerwayBackendServer
}

Expand Down Expand Up @@ -422,6 +428,21 @@
IPv6: n.enableIPv6,
}

switch pod.PodNetworkType {
case daemon.PodNetworkTypeENIMultiIP:
reply.IPType = rpc.IPType_TypeENIMultiIP
case daemon.PodNetworkTypeVPCIP:
reply.IPType = rpc.IPType_TypeVPCIP
case daemon.PodNetworkTypeVPCENI:
reply.IPType = rpc.IPType_TypeVPCENI

Check warning on line 437 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L431-L437

Added lines #L431 - L437 were not covered by tests

default:
return nil, &types.Error{
Code: types.ErrInternalError,
Msg: "Unknown pod network type",

Check warning on line 442 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L439-L442

Added lines #L439 - L442 were not covered by tests
}
}

// 2. Find old resource info
oldRes, err := n.getPodResource(pod)
if err != nil {
Expand Down Expand Up @@ -510,9 +531,14 @@
}
exist := make(map[string]bool)

existIPs := sets.Set[string]{}

Check warning on line 534 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L534

Added line #L534 was not covered by tests

for _, pod := range pods {
if !pod.SandboxExited {
exist[utils.PodInfoKey(pod.Namespace, pod.Name)] = true
if pod.PodIPs.IPv4 != nil {
existIPs.Insert(pod.PodIPs.IPv4.String())

Check warning on line 540 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L539-L540

Added lines #L539 - L540 were not covered by tests
}
}
}

Expand All @@ -523,6 +549,12 @@
podResources := getPodResources(objList)

for _, podRes := range podResources {
if podRes.PodInfo != nil {
if podRes.PodInfo.PodIPs.IPv4 != nil {
existIPs.Insert(podRes.PodInfo.PodIPs.IPv4.String())

Check warning on line 554 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L552-L554

Added lines #L552 - L554 were not covered by tests
}
}

podID := utils.PodInfoKey(podRes.PodInfo.Namespace, podRes.PodInfo.Name)
if _, ok := exist[podID]; ok {
continue
Expand Down Expand Up @@ -568,9 +600,38 @@
}
serviceLog.Info("removed pod", "pod", podID)
}

if os.Getenv("TERWAY_GC_RULES") == "true" {
n.gcRulesOnce.Do(func() {
gcLeakedRules(existIPs)
})

Check warning on line 607 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L604-L607

Added lines #L604 - L607 were not covered by tests
}

return nil
}

func gcLeakedRules(existIP sets.Set[string]) {
links, err := netlink.LinkList()
if err != nil {
serviceLog.Error(err, "error list links")
return

Check warning on line 617 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L613-L617

Added lines #L613 - L617 were not covered by tests
}

ipvlLinks := lo.Filter(links, func(item netlink.Link, index int) bool {
_, ok := item.(*netlink.IPVlan)
return ok
})

Check warning on line 623 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L620-L623

Added lines #L620 - L623 were not covered by tests

gcRoutes(ipvlLinks, existIP)

Check warning on line 625 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L625

Added line #L625 was not covered by tests

normalLinks := lo.Filter(links, func(item netlink.Link, index int) bool {
_, ok := item.(*netlink.Device)
return ok
})

Check warning on line 630 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L627-L630

Added lines #L627 - L630 were not covered by tests

gcTCFilters(normalLinks, existIP)

Check warning on line 632 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L632

Added line #L632 was not covered by tests
}

func (n *networkService) migrateEIP(ctx context.Context, objs []interface{}) error {
once := sync.Once{}

Expand Down Expand Up @@ -1341,3 +1402,84 @@
}
return podResources
}

func gcRoutes(links []netlink.Link, existIP sets.Set[string]) {
for _, link := range links {
routes, err := netlink.RouteList(link, netlink.FAMILY_V4)
if err != nil {
serviceLog.Error(err, "gc list route", "link", link)
return

Check warning on line 1411 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1406-L1411

Added lines #L1406 - L1411 were not covered by tests
}
for _, route := range routes {
if route.Dst == nil {
continue

Check warning on line 1415 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1413-L1415

Added lines #L1413 - L1415 were not covered by tests
}
// if not found
if existIP.Has(route.Dst.IP.String()) {
continue

Check warning on line 1419 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1418-L1419

Added lines #L1418 - L1419 were not covered by tests
}

serviceLog.Info("gc del route", "route", route)
err = netlink.RouteDel(&route)
if err != nil {
serviceLog.Error(err, "gc del route", "route", route)
return

Check warning on line 1426 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1422-L1426

Added lines #L1422 - L1426 were not covered by tests
}
}
}
}

func gcTCFilters(links []netlink.Link, existIP sets.Set[string]) {

Check warning on line 1432 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1432

Added line #L1432 was not covered by tests
// map ip to u32
toU32List := lo.Map(existIP.UnsortedList(), func(item string, index int) uint32 {
ip := net.ParseIP(item)
if ip == nil {
return 0

Check warning on line 1437 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1434-L1437

Added lines #L1434 - L1437 were not covered by tests
}
return binary.BigEndian.Uint32(ip.To4())

Check warning on line 1439 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1439

Added line #L1439 was not covered by tests
})
u32IPMap := lo.SliceToMap(toU32List, func(item uint32) (uint32, struct{}) {
return item, struct{}{}
})

Check warning on line 1443 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1441-L1443

Added lines #L1441 - L1443 were not covered by tests

for _, link := range links {
filters, err := netlink.FilterList(link, netlink.HANDLE_MIN_EGRESS)
if err != nil {
serviceLog.Error(err, "gc list filter", "link", link)
continue

Check warning on line 1449 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1445-L1449

Added lines #L1445 - L1449 were not covered by tests
}
for _, filter := range filters {
u32, ok := filter.(*netlink.U32)
if !ok {
continue

Check warning on line 1454 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1451-L1454

Added lines #L1451 - L1454 were not covered by tests
}
if u32.Priority != 50001 {
continue

Check warning on line 1457 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1456-L1457

Added lines #L1456 - L1457 were not covered by tests
}

if u32.Sel == nil || len(u32.Sel.Keys) != 1 {
continue

Check warning on line 1461 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1460-L1461

Added lines #L1460 - L1461 were not covered by tests
}
if len(u32.Actions) != 1 {
continue

Check warning on line 1464 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1463-L1464

Added lines #L1463 - L1464 were not covered by tests
}
_, ok = u32.Actions[0].(*netlink.VlanAction)
if !ok {
continue

Check warning on line 1468 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1466-L1468

Added lines #L1466 - L1468 were not covered by tests
}
if u32.Sel.Keys[0].Off != 12 {
continue

Check warning on line 1471 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1470-L1471

Added lines #L1470 - L1471 were not covered by tests
}
_, ok = u32IPMap[u32.Sel.Keys[0].Val]
if ok {
continue

Check warning on line 1475 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1473-L1475

Added lines #L1473 - L1475 were not covered by tests
}

serviceLog.Info("gc tc filter", "filter", filter)
err = netlink.FilterDel(filter)
if err != nil {
serviceLog.Error(err, "gc list filter", "link", link)

Check warning on line 1481 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L1478-L1481

Added lines #L1478 - L1481 were not covered by tests
}
}
}
}
Loading