diff --git a/pkg/exporter/probe/flow/flow.go b/pkg/exporter/probe/flow/flow.go index 6999a964..e9d402ed 100644 --- a/pkg/exporter/probe/flow/flow.go +++ b/pkg/exporter/probe/flow/flow.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "strings" + "sync" "syscall" "github.com/prometheus/client_golang/prometheus" @@ -41,46 +42,174 @@ func init() { } type flowArgs struct { - Dev string + Dev string `mapstructure:"interface"` } -func getDefaultRouteDevice() (string, error) { +func getDefaultRouteDevice() (netlink.Link, error) { filter := &netlink.Route{ Dst: nil, } routers, err := netlink.RouteListFiltered(syscall.AF_INET, filter, netlink.RT_FILTER_DST) if err != nil { - return "", err + return nil, err } if len(routers) == 0 { - return "", fmt.Errorf("no default route found") + return nil, fmt.Errorf("no default route found") } if len(routers) > 1 { - return "", fmt.Errorf("multi default route found") + return nil, fmt.Errorf("multi default route found") } + link, err := netlink.LinkByIndex(routers[0].LinkIndex) if err != nil { - return "", err + return nil, err + } + return link, nil +} + +type linkFlowHelper interface { + start() error + stop() error +} + +type dynamicLinkFlowHelper struct { + bpfObjs *bpfObjects + pattern string + done chan struct{} + flows map[int]*ebpfFlow + lock sync.Mutex +} + +func (h *dynamicLinkFlowHelper) tryStartLinkFlow(link netlink.Link) { + log.Infof("flow: try start flow on nic %s, index %d", link.Attrs().Name, link.Attrs().Index) + if _, ok := h.flows[link.Attrs().Index]; ok { + log.Warnf("new interface(%s) index %d already exists, skip process", link.Attrs().Name, link.Attrs().Index) + return + } + flow := &ebpfFlow{ + dev: link, + bpfObjs: h.bpfObjs, + } + + if err := flow.start(); err != nil { + log.Errorf("failed start flow on dev %s", link.Attrs().Name) + return + } + + h.flows[link.Attrs().Index] = flow +} + +func (h *dynamicLinkFlowHelper) tryStopLinkFlow(name string, index int) { + log.Infof("flow: try stop flow on nic %s, index %d", name, index) + flow, ok := h.flows[index] + if !ok { + log.Warnf("deleted interface index %d not exists, skip process", index) + return + } + _ = flow.stop() + delete(h.flows, index) +} + +func (h *dynamicLinkFlowHelper) start() error { + h.done = make(chan struct{}) + ch := make(chan netlink.LinkUpdate) + links, err := netlink.LinkList() + if err != nil { + return fmt.Errorf("%s error list link, err: %w", probeName, err) } - return link.Attrs().Name, nil + for _, link := range links { + if !strings.HasSuffix(link.Attrs().Name, h.pattern) { + continue + } + h.tryStartLinkFlow(link) + } + go func() { + if err := netlink.LinkSubscribe(ch, h.done); err != nil { + log.Errorf("%s error watch link change, err: %v", probeName, err) + close(h.done) + } + }() + go func() { + h.lock.Lock() + defer h.lock.Unlock() + for { + select { + case change := <-ch: + if !strings.HasSuffix(change.Attrs().Name, h.pattern) { + break + } + switch change.Header.Type { + case syscall.RTM_NEWLINK: + link, err := netlink.LinkByIndex(int(change.Index)) + if err != nil { + log.Errorf("failed get new created link by index %d, name %s, err: %v", change.Index, change.Attrs().Name, err) + break + } + h.tryStartLinkFlow(link) + case syscall.RTM_DELLINK: + h.tryStopLinkFlow(change.Attrs().Name, int(change.Index)) + } + case <-h.done: + return + } + } + }() + return nil +} + +func (h *dynamicLinkFlowHelper) stop() error { + close(h.done) + h.lock.Lock() + defer h.lock.Unlock() + var first error + for _, flow := range h.flows { + if err := flow.stop(); err != nil { + if first == nil { + first = err + } + } + } + return first } func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) { p := &metricsProbe{} if args.Dev == "" { - var err error - p.dev, err = getDefaultRouteDevice() + log.Infof("flow: auto detect network device with default route") + dev, err := getDefaultRouteDevice() if err != nil { return nil, fmt.Errorf("fail detect default route dev, err: %w", err) } + log.Infof("flow: default network device %s", dev.Attrs().Name) + + p.helper = &ebpfFlow{ + dev: dev, + bpfObjs: &p.bpfObjs, + } } else { pattern := strings.TrimSuffix(args.Dev, "*") - p.dev = pattern if pattern != args.Dev { - p.wildcard = true + log.Infof("flow: network device pattern %s", pattern) + p.helper = &dynamicLinkFlowHelper{ + bpfObjs: &p.bpfObjs, + pattern: pattern, + done: make(chan struct{}), + flows: make(map[int]*ebpfFlow), + } + } else { + link, err := netlink.LinkByName(pattern) + if err != nil { + return nil, fmt.Errorf("cannot get network interface by name %s, err: %w", pattern, err) + } + + log.Infof("flow: network device %s", pattern) + p.helper = &ebpfFlow{ + bpfObjs: &p.bpfObjs, + dev: link, + } } } @@ -98,31 +227,8 @@ func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) { } type metricsProbe struct { - bpfObjs bpfObjects - dev string - wildcard bool - done chan struct{} -} - -type ebpfFlow struct { - dev string -} - -func (f *ebpfFlow) start() error { - if err := f.loadAndAttachBPF(); err != nil { - var verifierError *ebpf.VerifierError - log.Error("failed load ebpf program", err) - if errors.As(err, &verifierError) { - log.Warn("detail", strings.Join(verifierError.Log, "\n")) - } - - return err - } - return nil -} - -func (f *ebpfFlow) stop() error { - return nil + bpfObjs bpfObjects + helper linkFlowHelper } func (p *metricsProbe) Start(_ context.Context) error { @@ -135,37 +241,15 @@ func (p *metricsProbe) Start(_ context.Context) error { return err } - p.done = make(chan struct{}) - if p.wildcard { - go p.watchNICs() - } else { - - } - return nil + return p.helper.start() } func (p *metricsProbe) Stop(_ context.Context) error { - return p.bpfObjs.Close() -} - -func (p *metricsProbe) cleanup() error { - //TODO only clean qdisc after replace qdisc successfully - link, err := netlink.LinkByName(p.args.Dev) - if err == nil { - _ = cleanQdisc(link) + if err := p.helper.stop(); err != nil { + return err } return p.bpfObjs.Close() } - -func toIPString(addr uint32) string { - var bytes [4]byte - bytes[0] = byte(addr & 0xff) - bytes[1] = byte(addr >> 8 & 0xff) - bytes[2] = byte(addr >> 16 & 0xff) - bytes[3] = byte(addr >> 24 & 0xff) - return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3]) -} - func (p *metricsProbe) collectOnce(emit probe.Emit) error { htons := func(port uint16) uint16 { data := make([]byte, 2) @@ -216,7 +300,60 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error { return nil } -func (p *metricsProbe) setupTCFilter(link netlink.Link) error { +func (p *metricsProbe) loadBPF() error { + if err := rlimit.RemoveMemlock(); err != nil { + return fmt.Errorf("remove limit failed: %s", err.Error()) + } + + opts := ebpf.CollectionOptions{} + + opts.Programs = ebpf.ProgramOptions{ + KernelTypes: bpfutil.LoadBTFSpecOrNil(), + } + + // Load pre-compiled programs and maps into the kernel. + if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil { + return fmt.Errorf("failed loading objects: %w", err) + } + return nil +} + +type ebpfFlow struct { + dev netlink.Link + bpfObjs *bpfObjects +} + +func (f *ebpfFlow) start() error { + err := f.attachBPF() + if err != nil { + log.Errorf("%s failed attach ebpf to dev %s, cleanup", probeName, f.dev) + _ = f.cleanup() + } + return err +} + +func (f *ebpfFlow) stop() error { + err := f.cleanup() + if err != nil { + log.Errorf("failed stop flow on dev %s", f.dev.Attrs().Name) + } + return err +} + +func (f *ebpfFlow) cleanup() error { + return cleanQdisc(f.dev) +} + +func toIPString(addr uint32) string { + var bytes [4]byte + bytes[0] = byte(addr & 0xff) + bytes[1] = byte(addr >> 8 & 0xff) + bytes[2] = byte(addr >> 16 & 0xff) + bytes[3] = byte(addr >> 24 & 0xff) + return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3]) +} + +func (f *ebpfFlow) setupTCFilter(link netlink.Link) error { if err := replaceQdisc(link); err != nil { return errors.Wrapf(err, "failed replace qdics clsact for dev %s", link.Attrs().Name) } @@ -229,11 +366,11 @@ func (p *metricsProbe) setupTCFilter(link netlink.Link) error { case ingress: directionName = "ingress" filterParent = netlink.HANDLE_MIN_INGRESS - prog = p.bpfObjs.bpfPrograms.TcIngress + prog = f.bpfObjs.bpfPrograms.TcIngress case egress: directionName = "egress" filterParent = netlink.HANDLE_MIN_EGRESS - prog = p.bpfObjs.bpfPrograms.TcEgress + prog = f.bpfObjs.bpfPrograms.TcEgress default: return fmt.Errorf("invalid direction value: %d", direction) } @@ -266,36 +403,9 @@ func (p *metricsProbe) setupTCFilter(link netlink.Link) error { return nil } -func (p *metricsProbe) loadBPF() error { - if err := rlimit.RemoveMemlock(); err != nil { - return fmt.Errorf("remove limit failed: %s", err.Error()) - } - - opts := ebpf.CollectionOptions{} - - opts.Programs = ebpf.ProgramOptions{ - KernelTypes: bpfutil.LoadBTFSpecOrNil(), - } - - // Load pre-compiled programs and maps into the kernel. - if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil { - return fmt.Errorf("failed loading objects: %w", err) - } - return nil -} - -func (p *metricsProbe) loadAndAttachBPF() error { - eth0, err := netlink.LinkByName(p.args.Dev) - if err != nil { - return fmt.Errorf("fail get link %s, err: %w", p.args.Dev, err) - } - - if err := p.loadBPF(); err != nil { - return err - } - - if err := p.setupTCFilter(eth0); err != nil { - return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", p.args.Dev, err) +func (f *ebpfFlow) attachBPF() error { + if err := f.setupTCFilter(f.dev); err != nil { + return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", f.dev, err) } return nil } diff --git a/pkg/exporter/probe/flow/flow_test.go b/pkg/exporter/probe/flow/flow_test.go index 26c5929a..c6dc6399 100644 --- a/pkg/exporter/probe/flow/flow_test.go +++ b/pkg/exporter/probe/flow/flow_test.go @@ -1,12 +1,13 @@ package flow import ( - "github.com/vishvananda/netlink" "syscall" "testing" + + "github.com/vishvananda/netlink" ) -func TestX(t *testing.T) { +func TestDefaultRouterDev(t *testing.T) { filter := &netlink.Route{ Dst: nil, } @@ -19,5 +20,20 @@ func TestX(t *testing.T) { for _, r := range routers { t.Logf("t: %d, link: %d, r: %s", r.Type, r.LinkIndex, r.Dst.String()) } - t.Fail() +} + +func TestLinkWatch(t *testing.T) { + changes := make(chan netlink.LinkUpdate) + done := make(chan struct{}) + if err := netlink.LinkSubscribe(changes, done); err != nil { + t.Logf("failed watch, err: %v", err) + } + + go func() { + for c := range changes { + t.Logf("mstype: %d, change %d, index %d,name: %s. newlink: %d", + c.Header.Type, c.Change, c.Index, c.Link.Attrs().Name, syscall.RTM_NEWLINK) + } + }() + <-done }