Skip to content

Commit

Permalink
Merge pull request #220 from jzwlqx/feature/label-pktdrops
Browse files Browse the repository at this point in the history
add tuple labels to packet loss
  • Loading branch information
BSWANG authored Mar 27, 2024
2 parents 8fe253e + 4ab42fc commit e6e9efb
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 283 deletions.
14 changes: 5 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
FROM docker.io/library/golang:1.20.5-alpine AS build
# --build-arg GOPROXY=https://goproxy.cn,direct
ARG GOPROXY
# --build-arg ALPINE_MIRROR=mirrors.aliyun.com
FROM docker.io/library/golang:1.22.1-alpine AS build

ARG ALPINE_MIRROR
ENV GOPROXY=$GOPROXY
ENV ALPINE_MIRROR=$ALPINE_MIRROR

RUN if [ ! -z "$ALPINE_MIRROR" ]; then sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories; fi && \
apk add gcc g++ linux-headers git make bash && \
go env -w GOPROXY=$GOPROXY
apk add gcc g++ linux-headers git make bash

WORKDIR /go/src/github.com/alibaba/kubeskoop/
RUN go env -w GOMODCACHE=/root/.cache/go-build
Expand All @@ -24,10 +20,10 @@ ADD ./webui /webconsole
RUN yarn install && yarn build

FROM docker.io/library/alpine:3.19 as base
ARG GOPROXY

ARG ALPINE_MIRROR
ENV GOPROXY=$GOPROXY
ENV ALPINE_MIRROR=$ALPINE_MIRROR

RUN if [ ! -z "$ALPINE_MIRROR" ]; then sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories; fi && \
apk add --no-cache \
iproute2 \
Expand Down
5 changes: 1 addition & 4 deletions bpf/headers/inspector.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ struct flow_tuple_4 {
};

union addr {
unsigned char v6addr[16];
u32 v4addr;
struct {
u64 d1;
u64 d2;
} v6addr;
} __attribute__((packed));

struct skb_meta {
Expand Down
37 changes: 1 addition & 36 deletions bpf/packetloss.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,12 @@ struct kfree_skb_args {
};

struct insp_pl_event_t {
char target[TASK_COMM_LEN];
struct tuple tuple;
struct skb_meta skb_meta;
u32 pid;
u32 cpu;
u64 location;
s64 stack_id;
};

struct insp_pl_metric_t {
u64 location;
u32 netns;
u8 protocol;
};

struct insp_pl_event_t *unused_event __attribute__((unused));
struct insp_pl_metric_t *unused_event2 __attribute__((unused));

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct insp_pl_metric_t);
__type(value, u64);
__uint(max_entries, 4096);
} insp_pl_metric SEC(".maps");
const struct insp_pl_event_t *unused_insp_pl_event_t __attribute__((unused));

struct {
__uint(type, BPF_MAP_TYPE_STACK_TRACE);
Expand All @@ -56,26 +38,9 @@ struct {
SEC("tracepoint/skb/kfree_skb")
int kfree_skb(struct kfree_skb_args *args) {
struct sk_buff *skb = (struct sk_buff *)args->skb;
struct insp_pl_metric_t mkey = {0};
struct insp_pl_event_t event = {0};

set_tuple(skb, &event.tuple);
set_meta(skb, &event.skb_meta);

mkey.netns = get_netns(skb);
mkey.location = (u64)args->location;
mkey.protocol = (u8)args->protocol;
u64 *valp = bpf_map_lookup_elem(&insp_pl_metric, &mkey);
if (!valp) {
u64 initval = 1;
bpf_map_update_elem(&insp_pl_metric, &mkey, &initval, 0);
} else {
__sync_fetch_and_add(valp, 1);
}

bpf_get_current_comm(&event.target, sizeof(event.target));
event.pid = bpf_get_current_pid_tgid()>> 32;
event.cpu = bpf_get_smp_processor_id();
event.location = (u64)args->location;
event.stack_id = bpf_get_stackid((struct pt_regs *)args, &insp_pl_stack,
KERN_STACKID_FLAGS);
Expand Down
Empty file added bpf/tcpretrans.c
Empty file.
1 change: 1 addition & 0 deletions deploy/net-exporter-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ event:
sinks:
- name: stderr



2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.6
github.com/jmoiron/sqlx v1.3.5
github.com/json-iterator/go v1.1.12
github.com/mattn/go-sqlite3 v1.14.6
github.com/mattn/go-sqlite3 v1.14.22
github.com/mdlayher/netlink v1.7.1
github.com/mitchellh/mapstructure v1.5.0
github.com/moby/ipvs v1.1.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,9 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down
18 changes: 17 additions & 1 deletion pkg/exporter/bpfutil/format.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package bpfutil

import (
"encoding/binary"
"fmt"
"net"
"strings"
"syscall"
)

// GetAddrStr get string format ip address,default in ipv4
func GetAddrStr(proto uint16, addr [16]byte) string {
func GetAddrStr(proto uint16, addr [16]uint8) string {
switch proto {
case syscall.ETH_P_IPV6:
return fmt.Sprintf("[%s]", net.IP(addr[:]).String())
Expand All @@ -17,6 +18,21 @@ func GetAddrStr(proto uint16, addr [16]byte) string {
}
}

func GetV4AddrStr(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func Htons(i uint16) uint16 {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, i)
return binary.LittleEndian.Uint16(data)
}

/*
enum {
TCP_ESTABLISHED = 1,
Expand Down
43 changes: 7 additions & 36 deletions pkg/exporter/probe/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package flow

import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -218,7 +217,7 @@ func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) {
opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
VariableLabels: []string{"protocol", "src", "src_type", "src_node", "src_namespace", "src_pod", "dst", "dst_type", "dst_node", "dst_namespace", "dst_pod", "sport", "dport"},
VariableLabels: probe.TupleMetricsLabels,
SingleMetricsOpts: []probe.SingleMetricsOpts{
{Name: metricsBytes, ValueType: prometheus.CounterValue},
{Name: metricsPackets, ValueType: prometheus.CounterValue},
Expand Down Expand Up @@ -252,12 +251,8 @@ func (p *metricsProbe) Stop(_ context.Context) error {
}
return p.bpfObjs.Close()
}

func (p *metricsProbe) collectOnce(emit probe.Emit) error {
htons := func(port uint16) uint16 {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, port)
return binary.LittleEndian.Uint16(data)
}
var values []bpfFlowMetrics
var key bpfFlowTuple4
iterator := p.bpfObjs.bpfMaps.InspFlow4Metrics.Iterate()
Expand All @@ -273,21 +268,6 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error {
val.Packets += values[i].Packets
}

var protocol string

switch key.Proto {
case 1:
protocol = "icmp"
case 6:
protocol = "tcp"
case 17:
protocol = "udp"
case 132:
protocol = "sctp"
default:
log.Errorf("%s unknown ip protocol number %d", probeName, key.Proto)
}

ipInfo := func(ip string) []string {
info := nettop.GetIPInfo(ip)
if info == nil {
Expand All @@ -305,16 +285,16 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error {
return []string{"unknown", "", "", ""}
}

labels := []string{protocol}
srcIP := toIPString(key.Src)
labels := []string{bpfutil.GetProtoStr(key.Proto)}
srcIP := bpfutil.GetV4AddrStr(key.Src)
labels = append(labels, srcIP)
labels = append(labels, ipInfo(srcIP)...)

dstIP := toIPString(key.Dst)
dstIP := bpfutil.GetV4AddrStr(key.Dst)
labels = append(labels, dstIP)
labels = append(labels, ipInfo(dstIP)...)
labels = append(labels, fmt.Sprintf("%d", htons(key.Sport)))
labels = append(labels, fmt.Sprintf("%d", htons(key.Dport)))
labels = append(labels, fmt.Sprintf("%d", bpfutil.Htons(key.Sport)))
labels = append(labels, fmt.Sprintf("%d", bpfutil.Htons(key.Dport)))

emit("bytes", labels, float64(val.Bytes))
emit("packets", labels, float64(val.Packets))
Expand Down Expand Up @@ -366,15 +346,6 @@ func (f *ebpfFlow) cleanup() error {
return cleanQdisc(f.dev)
}

func toIPString(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func (f *ebpfFlow) setupTCFilter(link netlink.Link) error {
if err := replaceQdisc(link); err != nil {
return errors.Wrapf(err, "failed replace qdics clsact for dev %s", link.Attrs().Name)
Expand Down
87 changes: 87 additions & 0 deletions pkg/exporter/probe/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package probe
import (
"fmt"

"github.com/alibaba/kubeskoop/pkg/exporter/bpfutil"
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

var legacyMetricsLabels = []string{"target_node", "target_namespace", "target_pod", "node", "namespace", "pod"}
var StandardMetricsLabels = []string{"k8s_node", "k8s_namespace", "k8s_pod"}
var TupleMetricsLabels = []string{"protocol", "src", "src_type", "src_node", "src_namespace", "src_pod", "dst", "dst_type", "dst_node", "dst_namespace", "dst_pod", "sport", "dport"}

func BuildStandardMetricsLabelValues(entity *nettop.Entity) []string {
return []string{nettop.GetNodeName(), entity.GetPodNamespace(), entity.GetPodName()}
Expand Down Expand Up @@ -110,6 +112,91 @@ func LagacyEventLabels(netns uint32) []Label {
}
}

func BuildTupleMetricsLabels(tuple *Tuple) []string {
ipInfo := func(ip string) []string {
info := nettop.GetIPInfo(ip)
if info == nil {
return []string{"unknown", "", "", ""}
}

switch info.Type {
case nettop.IPTypeNode:
return []string{"node", info.NodeName, "", ""}
case nettop.IPTypePod:
return []string{"pod", "", info.PodNamespace, info.PodName}
default:
log.Warningf("unknown ip type %s for %s", ip, info.Type)
}
return []string{"unknown", "", "", ""}
}

labels := []string{bpfutil.GetProtoStr(tuple.Protocol)}
labels = append(labels, tuple.Src)
labels = append(labels, ipInfo(tuple.Src)...)

labels = append(labels, tuple.Dst)
labels = append(labels, ipInfo(tuple.Dst)...)
labels = append(labels, fmt.Sprintf("%d", bpfutil.Htons(tuple.Sport)))
labels = append(labels, fmt.Sprintf("%d", bpfutil.Htons(tuple.Dport)))
return labels
}

func BuildTupleEventLabels(tuple *Tuple) []Label {
ipInfo := func(prefix, ip string) (ret []Label) {
var values [4]string

defer func() {
ret = []Label{
{Name: prefix + "_type", Value: values[0]},
{Name: prefix + "_node", Value: values[1]},
{Name: prefix + "_namespace", Value: values[2]},
{Name: prefix + "_pod", Value: values[3]},
}

}()

info := nettop.GetIPInfo(ip)
if info == nil {
values = [...]string{"unknown", "", "", ""}
return
}

switch info.Type {
case nettop.IPTypeNode:
values = [...]string{"node", info.NodeName, "", ""}
case nettop.IPTypePod:
values = [...]string{"pod", "", info.PodNamespace, info.PodName}
default:
log.Warningf("unknown ip type %s for %s", ip, info.Type)
}
values = [...]string{"unknown", "", "", ""}
return
}

labels := []Label{
{Name: "protocol", Value: bpfutil.GetProtoStr(tuple.Protocol)},
}
labels = append(labels, Label{
Name: "src", Value: tuple.Src,
})

labels = append(labels, ipInfo("src", tuple.Src)...)

labels = append(labels, Label{
Name: "dst", Value: tuple.Dst,
})
labels = append(labels, ipInfo("dst", tuple.Dst)...)
labels = append(labels, Label{
Name: "sport", Value: fmt.Sprintf("%d", bpfutil.Htons(tuple.Sport)),
})

labels = append(labels, Label{
Name: "dport", Value: fmt.Sprintf("%d", bpfutil.Htons(tuple.Dport)),
})

return labels
}

func CopyLegacyMetricsMap(m map[string]map[uint32]uint64) map[string]map[uint32]uint64 {
ret := make(map[string]map[uint32]uint64)
for key, nsMap := range m {
Expand Down
Loading

0 comments on commit e6e9efb

Please sign in to comment.