Skip to content

Commit

Permalink
add codes that used to collect flow info from kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwlqx committed Aug 24, 2023
1 parent d3862a3 commit 6efe5bb
Show file tree
Hide file tree
Showing 12 changed files with 742 additions and 22 deletions.
55 changes: 55 additions & 0 deletions bpf/flow.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include <vmlinux.h>
#include <bpf_helpers.h>
#include <bpf_tracing.h>
#include <bpf_core_read.h>
#include <inspector.h>

#define TC_ACT_OK 0

//todo aggregate all flow based metrics in one map to save memory.
struct flow_metrics {
u64 packets;
u64 bytes;
u32 drops;
u32 retrans;
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, struct flow_tuple_4);
__type(value, struct flow_metrics);
__uint(max_entries, 4096);
} insp_flow4_metrics SEC(".maps");


static inline int __do_flow(struct __sk_buff *skb){
struct flow_tuple_4 tuple = {0};
if(set_flow_tuple4(skb, &tuple) < 0){
goto out;
}

struct flow_metrics *metric = bpf_map_lookup_elem(&insp_flow4_metrics, &tuple);
if(metric){
__sync_fetch_and_add(&metric->packets, 1);
__sync_fetch_and_add(&metric->bytes, skb->len);
}else {
struct flow_metrics m = {1, skb->len, 0, 0};
bpf_map_update_elem(&insp_flow4_metrics, &tuple, &m, BPF_ANY);
}
out:
return TC_ACT_OK;
}

SEC("tc/ingress")
int tc_ingress(struct __sk_buff *skb){
return __do_flow(skb);
}

SEC("tc/egress")
int tc_egress(struct __sk_buff *skb){
return __do_flow(skb);
}

char LICENSE[] SEC("license") = "Dual BSD/GPL";


99 changes: 99 additions & 0 deletions bpf/headers/bpf/bpf_endian.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) */
#ifndef __BPF_ENDIAN__
#define __BPF_ENDIAN__

/*
* Isolate byte #n and put it into byte #m, for __u##b type.
* E.g., moving byte #6 (nnnnnnnn) into byte #1 (mmmmmmmm) for __u64:
* 1) xxxxxxxx nnnnnnnn xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx mmmmmmmm xxxxxxxx
* 2) nnnnnnnn xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx mmmmmmmm xxxxxxxx 00000000
* 3) 00000000 00000000 00000000 00000000 00000000 00000000 00000000 nnnnnnnn
* 4) 00000000 00000000 00000000 00000000 00000000 00000000 nnnnnnnn 00000000
*/
#define ___bpf_mvb(x, b, n, m) ((__u##b)(x) << (b-(n+1)*8) >> (b-8) << (m*8))

#define ___bpf_swab16(x) ((__u16)( \
___bpf_mvb(x, 16, 0, 1) | \
___bpf_mvb(x, 16, 1, 0)))

#define ___bpf_swab32(x) ((__u32)( \
___bpf_mvb(x, 32, 0, 3) | \
___bpf_mvb(x, 32, 1, 2) | \
___bpf_mvb(x, 32, 2, 1) | \
___bpf_mvb(x, 32, 3, 0)))

#define ___bpf_swab64(x) ((__u64)( \
___bpf_mvb(x, 64, 0, 7) | \
___bpf_mvb(x, 64, 1, 6) | \
___bpf_mvb(x, 64, 2, 5) | \
___bpf_mvb(x, 64, 3, 4) | \
___bpf_mvb(x, 64, 4, 3) | \
___bpf_mvb(x, 64, 5, 2) | \
___bpf_mvb(x, 64, 6, 1) | \
___bpf_mvb(x, 64, 7, 0)))

/* LLVM's BPF target selects the endianness of the CPU
* it compiles on, or the user specifies (bpfel/bpfeb),
* respectively. The used __BYTE_ORDER__ is defined by
* the compiler, we cannot rely on __BYTE_ORDER from
* libc headers, since it doesn't reflect the actual
* requested byte order.
*
* Note, LLVM's BPF target has different __builtin_bswapX()
* semantics. It does map to BPF_ALU | BPF_END | BPF_TO_BE
* in bpfel and bpfeb case, which means below, that we map
* to cpu_to_be16(). We could use it unconditionally in BPF
* case, but better not rely on it, so that this header here
* can be used from application and BPF program side, which
* use different targets.
*/
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
# define __bpf_ntohs(x) __builtin_bswap16(x)
# define __bpf_htons(x) __builtin_bswap16(x)
# define __bpf_constant_ntohs(x) ___bpf_swab16(x)
# define __bpf_constant_htons(x) ___bpf_swab16(x)
# define __bpf_ntohl(x) __builtin_bswap32(x)
# define __bpf_htonl(x) __builtin_bswap32(x)
# define __bpf_constant_ntohl(x) ___bpf_swab32(x)
# define __bpf_constant_htonl(x) ___bpf_swab32(x)
# define __bpf_be64_to_cpu(x) __builtin_bswap64(x)
# define __bpf_cpu_to_be64(x) __builtin_bswap64(x)
# define __bpf_constant_be64_to_cpu(x) ___bpf_swab64(x)
# define __bpf_constant_cpu_to_be64(x) ___bpf_swab64(x)
#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
# define __bpf_ntohs(x) (x)
# define __bpf_htons(x) (x)
# define __bpf_constant_ntohs(x) (x)
# define __bpf_constant_htons(x) (x)
# define __bpf_ntohl(x) (x)
# define __bpf_htonl(x) (x)
# define __bpf_constant_ntohl(x) (x)
# define __bpf_constant_htonl(x) (x)
# define __bpf_be64_to_cpu(x) (x)
# define __bpf_cpu_to_be64(x) (x)
# define __bpf_constant_be64_to_cpu(x) (x)
# define __bpf_constant_cpu_to_be64(x) (x)
#else
# error "Fix your compiler's __BYTE_ORDER__?!"
#endif

#define bpf_htons(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_htons(x) : __bpf_htons(x))
#define bpf_ntohs(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_ntohs(x) : __bpf_ntohs(x))
#define bpf_htonl(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_htonl(x) : __bpf_htonl(x))
#define bpf_ntohl(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_ntohl(x) : __bpf_ntohl(x))
#define bpf_cpu_to_be64(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_cpu_to_be64(x) : __bpf_cpu_to_be64(x))
#define bpf_be64_to_cpu(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_be64_to_cpu(x) : __bpf_be64_to_cpu(x))

#endif /* __BPF_ENDIAN__ */
59 changes: 59 additions & 0 deletions bpf/headers/inspector.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
// +build ignore

#include <bpf/bpf_endian.h>
#include "bpf_core_read.h"
#include "bpf_helpers.h"
#include "bpf_tracing.h"
Expand All @@ -19,6 +20,14 @@

#define PERF_MAX_STACK_DEPTH 32

struct flow_tuple_4 {
unsigned char proto;
u32 src;
u32 dst;
u16 sport;
u16 dport;
};

union addr {
u32 v4addr;
struct {
Expand Down Expand Up @@ -101,6 +110,56 @@ static __always_inline u32 get_netns(struct sk_buff *skb) {
return netns;
}

static __always_inline int set_flow_tuple4(struct __sk_buff *skb, struct flow_tuple_4 *tuple){
void *data = (void *)(long)skb->data;
struct ethhdr *eth = data;
void *data_end = (void *)(long)skb->data_end;
u16 l4_off = 0;
const char fmt[] = "source port %d\n";
//u16 bytes = 0;

if (data + sizeof(*eth) > data_end)
return -1;

if (eth->h_proto == bpf_htons(ETH_P_IP)) {
struct iphdr *iph = data + sizeof(*eth);

if (data + sizeof(*eth) + sizeof(*iph) > data_end)
return -1;

tuple->src = iph->saddr;
tuple->dst = iph->daddr;
tuple->proto = iph->protocol;

l4_off = sizeof(*eth) + iph->ihl * 4;

if (iph->protocol == IPPROTO_TCP){
struct tcphdr *tcph = data + l4_off;

if (data + l4_off + sizeof(*tcph) > data_end)
return -1;

tuple->sport = tcph->source;
tuple->dport = tcph->dest;
//bytes = tcph->doff * 4;
}else if(iph->protocol == IPPROTO_UDP){
struct udphdr *udph = data + l4_off;
if(data + l4_off + sizeof(*udph) > data_end)
return -1;

tuple->sport = udph->source;
tuple->dport = udph->dest;
//bytes = tcph->len;
}


} else if (eth->h_proto == bpf_htons(ETH_P_IPV6)) {
//not supported yet
}

return 0;
}

static __always_inline void set_tuple(struct sk_buff *skb, struct tuple *tpl) {
unsigned char *skb_head = 0;
u16 l3_off;
Expand Down
3 changes: 2 additions & 1 deletion deploy/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
debugmode: true
debugmode: false
metric_config:
interval: 15
port: 9102
Expand All @@ -10,6 +10,7 @@ metric_config:
- tcp
- tcpext
- udp
- flow
event_config:
port: 19102
loki_enable: false
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
Level: slog.DebugLevel,
}

slog.SetDefault(slog.New(opts.NewJSONHandler(os.Stderr)))
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stdout)))
} else {
slog.SetDefault(slog.New(slog.NewTextHandler(io.Discard)))
}
Expand Down
45 changes: 25 additions & 20 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@ import (
"os/signal"
"syscall"

"golang.org/x/exp/slog"

"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/alibaba/kubeskoop/pkg/exporter/proto"

_ "net/http" //for golangci-lint
_ "net/http/pprof" //for golangci-lint once more

gops "github.com/google/gops/agent"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/exp/slog"
log "golang.org/x/exp/slog"
"google.golang.org/grpc"
)

Expand All @@ -36,28 +41,28 @@ var (
Run: func(cmd *cobra.Command, args []string) {
insp := &inspServer{
v: *viper.New(),
ctx: slog.NewContext(context.Background(), slog.Default()),
ctx: log.NewContext(context.Background(), slog.Default()),
}

insp.v.SetConfigFile(configPath)
err := insp.MergeConfig()
if err != nil {
slog.Ctx(insp.ctx).Info("merge config", "err", err)
log.Ctx(insp.ctx).Info("merge config", "err", err)
return
}

if insp.config.DebugMode {
opts := slog.HandlerOptions{
opts := log.HandlerOptions{
AddSource: true,
Level: slog.DebugLevel,
Level: log.DebugLevel,
}
insp.ctx = slog.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stderr)))
insp.ctx = log.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stdout)))
} else {
opts := slog.HandlerOptions{
opts := log.HandlerOptions{
AddSource: false,
Level: slog.InfoLevel,
Level: log.InfoLevel,
}
insp.ctx = slog.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stderr)))
insp.ctx = log.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stdout)))
}

// nolint
Expand All @@ -73,7 +78,7 @@ var (
// block here
err = insp.start()
if err != nil {
slog.Ctx(insp.ctx).Info("start server", "err", err)
log.Ctx(insp.ctx).Info("start server", "err", err)
return
}
},
Expand Down Expand Up @@ -126,17 +131,17 @@ func (i *inspServer) MergeConfig() error {
err := i.v.ReadInConfig()
if err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
slog.Ctx(i.ctx).Info("validate config", "path", configPath, "err", err)
log.Ctx(i.ctx).Info("validate config", "path", configPath, "err", err)
return errors.Wrapf(err, "no such config")
}
slog.Ctx(i.ctx).Info("validate config", "err", err)
log.Ctx(i.ctx).Info("validate config", "err", err)
return err
}

cfg := &inspServerConfig{}
err = i.v.Unmarshal(cfg)
if err != nil {
slog.Ctx(i.ctx).Info("validate unmarshal config", "err", err)
log.Ctx(i.ctx).Info("validate unmarshal config", "err", err)
return err
}

Expand All @@ -147,7 +152,7 @@ func (i *inspServer) MergeConfig() error {

func (i *inspServer) start() error {
if err := gops.Listen(gops.Options{}); err != nil {
slog.Ctx(i.ctx).Info("start gops", "err", err)
log.Ctx(i.ctx).Info("start gops", "err", err)
}

go func() {
Expand All @@ -173,10 +178,10 @@ func (i *inspServer) start() error {
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenaddr := fmt.Sprintf(":%d", i.config.Mconfig.Port)
slog.Ctx(i.ctx).Info("inspector start metric server", "listenaddr", listenaddr)
log.Ctx(i.ctx).Info("inspector start metric server", "listenaddr", listenaddr)
srv := &http.Server{Addr: listenaddr}
if err := srv.ListenAndServe(); err != nil {
slog.Ctx(i.ctx).Info("inspector start metric server", "err", err, "listenaddr", listenaddr)
log.Ctx(i.ctx).Info("inspector start metric server", "err", err, "listenaddr", listenaddr)
}
}()

Expand All @@ -186,13 +191,13 @@ func (i *inspServer) start() error {
proto.RegisterInspectorServer(s, e)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", i.config.Econfig.Port))
if err != nil {
slog.Ctx(i.ctx).Warn("inspector start event server", "port", i.config.Econfig.Port, "err", err)
log.Ctx(i.ctx).Warn("inspector start event server", "port", i.config.Econfig.Port, "err", err)
return
}
slog.Ctx(i.ctx).Info("inspector eserver serve", "port", i.config.Econfig.Port)
log.Ctx(i.ctx).Info("inspector eserver serve", "port", i.config.Econfig.Port)
// grpc server block there, handle it with goroutine
if err := s.Serve(listener); err != nil {
slog.Ctx(i.ctx).Warn("inspector eserver serve", "port", i.config.Econfig.Port, "err", err)
log.Ctx(i.ctx).Warn("inspector eserver serve", "port", i.config.Econfig.Port, "err", err)
return
}
}()
Expand All @@ -202,7 +207,7 @@ func (i *inspServer) start() error {
}

func WaitSignals(ctx context.Context, sgs ...os.Signal) {
slog.Ctx(ctx).Info("keep running and start waiting for signals")
log.Ctx(ctx).Info("keep running and start waiting for signals")
s := make(chan os.Signal, 1)
signal.Notify(s, sgs...)
<-s
Expand Down
Loading

0 comments on commit 6efe5bb

Please sign in to comment.