Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support dynamic interface flow collector #133

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 231 additions & 62 deletions pkg/exporter/probe/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/binary"
"fmt"
"strings"
"sync"
"syscall"

"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -40,17 +42,177 @@ func init() {
}

type flowArgs struct {
Dev string
Dev string `mapstructure:"interface-name"`
}

func getDefaultRouteDevice() (netlink.Link, error) {
filter := &netlink.Route{
Dst: nil,
}
routers, err := netlink.RouteListFiltered(syscall.AF_INET, filter, netlink.RT_FILTER_DST)
if err != nil {
return nil, err
}

if len(routers) == 0 {
return nil, fmt.Errorf("no default route found")
}

if len(routers) > 1 {
return nil, fmt.Errorf("multi default route found")
}

link, err := netlink.LinkByIndex(routers[0].LinkIndex)
if err != nil {
return nil, err
}
return link, nil
}

type linkFlowHelper interface {
start() error
stop() error
}

type dynamicLinkFlowHelper struct {
bpfObjs *bpfObjects
pattern string
done chan struct{}
flows map[int]*ebpfFlow
lock sync.Mutex
}

func (h *dynamicLinkFlowHelper) tryStartLinkFlow(link netlink.Link) {
log.Infof("flow: try start flow on nic %s, index %d", link.Attrs().Name, link.Attrs().Index)
if _, ok := h.flows[link.Attrs().Index]; ok {
log.Warnf("new interface(%s) index %d already exists, skip process", link.Attrs().Name, link.Attrs().Index)
return
}
flow := &ebpfFlow{
dev: link,
bpfObjs: h.bpfObjs,
}

if err := flow.start(); err != nil {
log.Errorf("failed start flow on dev %s", link.Attrs().Name)
return
}

h.flows[link.Attrs().Index] = flow
}

func (h *dynamicLinkFlowHelper) tryStopLinkFlow(name string, index int) {
log.Infof("flow: try stop flow on nic %s, index %d", name, index)
flow, ok := h.flows[index]
if !ok {
log.Warnf("deleted interface index %d not exists, skip process", index)
return
}
_ = flow.stop()
delete(h.flows, index)
}

func (h *dynamicLinkFlowHelper) start() error {
h.done = make(chan struct{})
ch := make(chan netlink.LinkUpdate)
links, err := netlink.LinkList()
if err != nil {
return fmt.Errorf("%s error list link, err: %w", probeName, err)
}
for _, link := range links {
if !strings.HasSuffix(link.Attrs().Name, h.pattern) {
continue
}
h.tryStartLinkFlow(link)
}
go func() {
if err := netlink.LinkSubscribe(ch, h.done); err != nil {
log.Errorf("%s error watch link change, err: %v", probeName, err)
close(h.done)
}
}()
go func() {
h.lock.Lock()
defer h.lock.Unlock()
for {
select {
case change := <-ch:
if !strings.HasSuffix(change.Attrs().Name, h.pattern) {
break
}
switch change.Header.Type {
case syscall.RTM_NEWLINK:
link, err := netlink.LinkByIndex(int(change.Index))
if err != nil {
log.Errorf("failed get new created link by index %d, name %s, err: %v", change.Index, change.Attrs().Name, err)
break
}
h.tryStartLinkFlow(link)
case syscall.RTM_DELLINK:
h.tryStopLinkFlow(change.Attrs().Name, int(change.Index))
}
case <-h.done:
return
}
}
}()
return nil
}

func (h *dynamicLinkFlowHelper) stop() error {
close(h.done)
h.lock.Lock()
defer h.lock.Unlock()
var first error
for _, flow := range h.flows {
if err := flow.stop(); err != nil {
if first == nil {
first = err
}
}
}
return first
}

func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) {
p := &metricsProbe{}

if args.Dev == "" {
args.Dev = "eth0"
}
log.Infof("flow: auto detect network device with default route")
dev, err := getDefaultRouteDevice()
if err != nil {
return nil, fmt.Errorf("fail detect default route dev, err: %w", err)
}
log.Infof("flow: default network device %s", dev.Attrs().Name)

p := &metricsProbe{
args: args,
p.helper = &ebpfFlow{
dev: dev,
bpfObjs: &p.bpfObjs,
}
} else {
pattern := strings.TrimSuffix(args.Dev, "*")
if pattern != args.Dev {
log.Infof("flow: network device pattern %s", pattern)
p.helper = &dynamicLinkFlowHelper{
bpfObjs: &p.bpfObjs,
pattern: pattern,
done: make(chan struct{}),
flows: make(map[int]*ebpfFlow),
}
} else {
link, err := netlink.LinkByName(pattern)
if err != nil {
return nil, fmt.Errorf("cannot get network interface by name %s, err: %w", pattern, err)
}

log.Infof("flow: network device %s", pattern)
p.helper = &ebpfFlow{
bpfObjs: &p.bpfObjs,
dev: link,
}
}
}

opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
Expand All @@ -66,46 +228,28 @@ func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) {

type metricsProbe struct {
bpfObjs bpfObjects
args flowArgs
helper linkFlowHelper
}

func (p *metricsProbe) Start(_ context.Context) error {
//TODO watch every netns create/destroy
if err := p.loadAndAttachBPF(); err != nil {
if err := p.loadBPF(); err != nil {
var verifierError *ebpf.VerifierError
log.Error("failed load ebpf program", err)
if errors.As(err, &verifierError) {
log.Warn("detail", strings.Join(verifierError.Log, "\n"))
}

return err
}

return nil
return p.helper.start()
}

func (p *metricsProbe) Stop(_ context.Context) error {
return p.cleanup()
}

func (p *metricsProbe) cleanup() error {
//TODO only clean qdisc after replace qdisc successfully
link, err := netlink.LinkByName(p.args.Dev)
if err == nil {
_ = cleanQdisc(link)
if err := p.helper.stop(); err != nil {
return err
}
return p.bpfObjs.Close()
}

func toIPString(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func (p *metricsProbe) collectOnce(emit probe.Emit) error {
htons := func(port uint16) uint16 {
data := make([]byte, 2)
Expand Down Expand Up @@ -156,7 +300,60 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error {
return nil
}

func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
func (p *metricsProbe) loadBPF() error {
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}

opts := ebpf.CollectionOptions{}

opts.Programs = ebpf.ProgramOptions{
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}

// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil {
return fmt.Errorf("failed loading objects: %w", err)
}
return nil
}

type ebpfFlow struct {
dev netlink.Link
bpfObjs *bpfObjects
}

func (f *ebpfFlow) start() error {
err := f.attachBPF()
if err != nil {
log.Errorf("%s failed attach ebpf to dev %s, cleanup", probeName, f.dev)
_ = f.cleanup()
}
return err
}

func (f *ebpfFlow) stop() error {
err := f.cleanup()
if err != nil {
log.Errorf("failed stop flow on dev %s", f.dev.Attrs().Name)
}
return err
}

func (f *ebpfFlow) cleanup() error {
return cleanQdisc(f.dev)
}

func toIPString(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func (f *ebpfFlow) setupTCFilter(link netlink.Link) error {
if err := replaceQdisc(link); err != nil {
return errors.Wrapf(err, "failed replace qdics clsact for dev %s", link.Attrs().Name)
}
Expand All @@ -169,11 +366,11 @@ func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
case ingress:
directionName = "ingress"
filterParent = netlink.HANDLE_MIN_INGRESS
prog = p.bpfObjs.bpfPrograms.TcIngress
prog = f.bpfObjs.bpfPrograms.TcIngress
case egress:
directionName = "egress"
filterParent = netlink.HANDLE_MIN_EGRESS
prog = p.bpfObjs.bpfPrograms.TcEgress
prog = f.bpfObjs.bpfPrograms.TcEgress
default:
return fmt.Errorf("invalid direction value: %d", direction)
}
Expand Down Expand Up @@ -206,37 +403,9 @@ func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
return nil
}

func (p *metricsProbe) loadBPF() error {
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}

opts := ebpf.CollectionOptions{}

opts.Programs = ebpf.ProgramOptions{
LogLevel: ebpf.LogLevelInstruction | ebpf.LogLevelBranch | ebpf.LogLevelStats,
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}

// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil {
return fmt.Errorf("failed loading objects: %w", err)
}
return nil
}

func (p *metricsProbe) loadAndAttachBPF() error {
eth0, err := netlink.LinkByName(p.args.Dev)
if err != nil {
return fmt.Errorf("fail get link %s, err: %w", p.args.Dev, err)
}

if err := p.loadBPF(); err != nil {
return err
}

if err := p.setupTCFilter(eth0); err != nil {
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", p.args.Dev, err)
func (f *ebpfFlow) attachBPF() error {
if err := f.setupTCFilter(f.dev); err != nil {
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", f.dev, err)
}
return nil
}
Expand Down
Loading
Loading