Skip to content

Commit

Permalink
add codes that used to collect flow info from kernel
Browse files Browse the repository at this point in the history
we should refactor some framework code to support complete flow feature
  • Loading branch information
jzwlqx committed Aug 24, 2023
1 parent d3862a3 commit 8bdcbd2
Show file tree
Hide file tree
Showing 12 changed files with 741 additions and 22 deletions.
55 changes: 55 additions & 0 deletions bpf/flow.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include <vmlinux.h>
#include <bpf_helpers.h>
#include <bpf_tracing.h>
#include <bpf_core_read.h>
#include <inspector.h>

#define TC_ACT_OK 0

//todo aggregate all flow based metrics in one map to save memory.
struct flow_metrics {
u64 packets;
u64 bytes;
u32 drops;
u32 retrans;
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, struct flow_tuple_4);
__type(value, struct flow_metrics);
__uint(max_entries, 4096);
} insp_flow4_metrics SEC(".maps");


static inline int __do_flow(struct __sk_buff *skb){
struct flow_tuple_4 tuple = {0};
if(set_flow_tuple4(skb, &tuple) < 0){
goto out;
}

struct flow_metrics *metric = bpf_map_lookup_elem(&insp_flow4_metrics, &tuple);
if(metric){
__sync_fetch_and_add(&metric->packets, 1);
__sync_fetch_and_add(&metric->bytes, skb->len);
}else {
struct flow_metrics m = {1, skb->len, 0, 0};
bpf_map_update_elem(&insp_flow4_metrics, &tuple, &m, BPF_ANY);
}
out:
return TC_ACT_OK;
}

SEC("tc/ingress")
int tc_ingress(struct __sk_buff *skb){
return __do_flow(skb);
}

SEC("tc/egress")
int tc_egress(struct __sk_buff *skb){
return __do_flow(skb);
}

char LICENSE[] SEC("license") = "Dual BSD/GPL";


99 changes: 99 additions & 0 deletions bpf/headers/bpf/bpf_endian.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) */
#ifndef __BPF_ENDIAN__
#define __BPF_ENDIAN__

/*
* Isolate byte #n and put it into byte #m, for __u##b type.
* E.g., moving byte #6 (nnnnnnnn) into byte #1 (mmmmmmmm) for __u64:
* 1) xxxxxxxx nnnnnnnn xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx mmmmmmmm xxxxxxxx
* 2) nnnnnnnn xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx mmmmmmmm xxxxxxxx 00000000
* 3) 00000000 00000000 00000000 00000000 00000000 00000000 00000000 nnnnnnnn
* 4) 00000000 00000000 00000000 00000000 00000000 00000000 nnnnnnnn 00000000
*/
#define ___bpf_mvb(x, b, n, m) ((__u##b)(x) << (b-(n+1)*8) >> (b-8) << (m*8))

#define ___bpf_swab16(x) ((__u16)( \
___bpf_mvb(x, 16, 0, 1) | \
___bpf_mvb(x, 16, 1, 0)))

#define ___bpf_swab32(x) ((__u32)( \
___bpf_mvb(x, 32, 0, 3) | \
___bpf_mvb(x, 32, 1, 2) | \
___bpf_mvb(x, 32, 2, 1) | \
___bpf_mvb(x, 32, 3, 0)))

#define ___bpf_swab64(x) ((__u64)( \
___bpf_mvb(x, 64, 0, 7) | \
___bpf_mvb(x, 64, 1, 6) | \
___bpf_mvb(x, 64, 2, 5) | \
___bpf_mvb(x, 64, 3, 4) | \
___bpf_mvb(x, 64, 4, 3) | \
___bpf_mvb(x, 64, 5, 2) | \
___bpf_mvb(x, 64, 6, 1) | \
___bpf_mvb(x, 64, 7, 0)))

/* LLVM's BPF target selects the endianness of the CPU
* it compiles on, or the user specifies (bpfel/bpfeb),
* respectively. The used __BYTE_ORDER__ is defined by
* the compiler, we cannot rely on __BYTE_ORDER from
* libc headers, since it doesn't reflect the actual
* requested byte order.
*
* Note, LLVM's BPF target has different __builtin_bswapX()
* semantics. It does map to BPF_ALU | BPF_END | BPF_TO_BE
* in bpfel and bpfeb case, which means below, that we map
* to cpu_to_be16(). We could use it unconditionally in BPF
* case, but better not rely on it, so that this header here
* can be used from application and BPF program side, which
* use different targets.
*/
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
# define __bpf_ntohs(x) __builtin_bswap16(x)
# define __bpf_htons(x) __builtin_bswap16(x)
# define __bpf_constant_ntohs(x) ___bpf_swab16(x)
# define __bpf_constant_htons(x) ___bpf_swab16(x)
# define __bpf_ntohl(x) __builtin_bswap32(x)
# define __bpf_htonl(x) __builtin_bswap32(x)
# define __bpf_constant_ntohl(x) ___bpf_swab32(x)
# define __bpf_constant_htonl(x) ___bpf_swab32(x)
# define __bpf_be64_to_cpu(x) __builtin_bswap64(x)
# define __bpf_cpu_to_be64(x) __builtin_bswap64(x)
# define __bpf_constant_be64_to_cpu(x) ___bpf_swab64(x)
# define __bpf_constant_cpu_to_be64(x) ___bpf_swab64(x)
#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
# define __bpf_ntohs(x) (x)
# define __bpf_htons(x) (x)
# define __bpf_constant_ntohs(x) (x)
# define __bpf_constant_htons(x) (x)
# define __bpf_ntohl(x) (x)
# define __bpf_htonl(x) (x)
# define __bpf_constant_ntohl(x) (x)
# define __bpf_constant_htonl(x) (x)
# define __bpf_be64_to_cpu(x) (x)
# define __bpf_cpu_to_be64(x) (x)
# define __bpf_constant_be64_to_cpu(x) (x)
# define __bpf_constant_cpu_to_be64(x) (x)
#else
# error "Fix your compiler's __BYTE_ORDER__?!"
#endif

#define bpf_htons(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_htons(x) : __bpf_htons(x))
#define bpf_ntohs(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_ntohs(x) : __bpf_ntohs(x))
#define bpf_htonl(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_htonl(x) : __bpf_htonl(x))
#define bpf_ntohl(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_ntohl(x) : __bpf_ntohl(x))
#define bpf_cpu_to_be64(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_cpu_to_be64(x) : __bpf_cpu_to_be64(x))
#define bpf_be64_to_cpu(x) \
(__builtin_constant_p(x) ? \
__bpf_constant_be64_to_cpu(x) : __bpf_be64_to_cpu(x))

#endif /* __BPF_ENDIAN__ */
59 changes: 59 additions & 0 deletions bpf/headers/inspector.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
// +build ignore

#include <bpf/bpf_endian.h>
#include "bpf_core_read.h"
#include "bpf_helpers.h"
#include "bpf_tracing.h"
Expand All @@ -19,6 +20,14 @@

#define PERF_MAX_STACK_DEPTH 32

struct flow_tuple_4 {
unsigned char proto;
u32 src;
u32 dst;
u16 sport;
u16 dport;
};

union addr {
u32 v4addr;
struct {
Expand Down Expand Up @@ -101,6 +110,56 @@ static __always_inline u32 get_netns(struct sk_buff *skb) {
return netns;
}

static __always_inline int set_flow_tuple4(struct __sk_buff *skb, struct flow_tuple_4 *tuple){
void *data = (void *)(long)skb->data;
struct ethhdr *eth = data;
void *data_end = (void *)(long)skb->data_end;
u16 l4_off = 0;
const char fmt[] = "source port %d\n";
//u16 bytes = 0;

if (data + sizeof(*eth) > data_end)
return -1;

if (eth->h_proto == bpf_htons(ETH_P_IP)) {
struct iphdr *iph = data + sizeof(*eth);

if (data + sizeof(*eth) + sizeof(*iph) > data_end)
return -1;

tuple->src = iph->saddr;
tuple->dst = iph->daddr;
tuple->proto = iph->protocol;

l4_off = sizeof(*eth) + iph->ihl * 4;

if (iph->protocol == IPPROTO_TCP){
struct tcphdr *tcph = data + l4_off;

if (data + l4_off + sizeof(*tcph) > data_end)
return -1;

tuple->sport = tcph->source;
tuple->dport = tcph->dest;
//bytes = tcph->doff * 4;
}else if(iph->protocol == IPPROTO_UDP){
struct udphdr *udph = data + l4_off;
if(data + l4_off + sizeof(*udph) > data_end)
return -1;

tuple->sport = udph->source;
tuple->dport = udph->dest;
//bytes = tcph->len;
}


} else if (eth->h_proto == bpf_htons(ETH_P_IPV6)) {
//not supported yet
}

return 0;
}

static __always_inline void set_tuple(struct sk_buff *skb, struct tuple *tpl) {
unsigned char *skb_head = 0;
u16 l3_off;
Expand Down
3 changes: 2 additions & 1 deletion deploy/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
debugmode: true
debugmode: false
metric_config:
interval: 15
port: 9102
Expand All @@ -10,6 +10,7 @@ metric_config:
- tcp
- tcpext
- udp
- flow
event_config:
port: 19102
loki_enable: false
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
Level: slog.DebugLevel,
}

slog.SetDefault(slog.New(opts.NewJSONHandler(os.Stderr)))
slog.SetDefault(slog.New(opts.NewTextHandler(os.Stdout)))
} else {
slog.SetDefault(slog.New(slog.NewTextHandler(io.Discard)))
}
Expand Down
45 changes: 25 additions & 20 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@ import (
"os/signal"
"syscall"

"golang.org/x/exp/slog"

"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/alibaba/kubeskoop/pkg/exporter/proto"

_ "net/http"

Check warning on line 22 in pkg/exporter/cmd/server.go

View workflow job for this annotation

GitHub Actions / lint

blank-imports: a blank import should be only in a main or test package, or have a comment justifying it (revive)
_ "net/http/pprof"

gops "github.com/google/gops/agent"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/exp/slog"
log "golang.org/x/exp/slog"
"google.golang.org/grpc"
)

Expand All @@ -36,28 +41,28 @@ var (
Run: func(cmd *cobra.Command, args []string) {
insp := &inspServer{
v: *viper.New(),
ctx: slog.NewContext(context.Background(), slog.Default()),
ctx: log.NewContext(context.Background(), slog.Default()),
}

insp.v.SetConfigFile(configPath)
err := insp.MergeConfig()
if err != nil {
slog.Ctx(insp.ctx).Info("merge config", "err", err)
log.Ctx(insp.ctx).Info("merge config", "err", err)
return
}

if insp.config.DebugMode {
opts := slog.HandlerOptions{
opts := log.HandlerOptions{
AddSource: true,
Level: slog.DebugLevel,
Level: log.DebugLevel,
}
insp.ctx = slog.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stderr)))
insp.ctx = log.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stdout)))
} else {
opts := slog.HandlerOptions{
opts := log.HandlerOptions{
AddSource: false,
Level: slog.InfoLevel,
Level: log.InfoLevel,
}
insp.ctx = slog.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stderr)))
insp.ctx = log.NewContext(context.Background(), slog.New(opts.NewJSONHandler(os.Stdout)))
}

// nolint
Expand All @@ -73,7 +78,7 @@ var (
// block here
err = insp.start()
if err != nil {
slog.Ctx(insp.ctx).Info("start server", "err", err)
log.Ctx(insp.ctx).Info("start server", "err", err)
return
}
},
Expand Down Expand Up @@ -126,17 +131,17 @@ func (i *inspServer) MergeConfig() error {
err := i.v.ReadInConfig()
if err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
slog.Ctx(i.ctx).Info("validate config", "path", configPath, "err", err)
log.Ctx(i.ctx).Info("validate config", "path", configPath, "err", err)
return errors.Wrapf(err, "no such config")
}
slog.Ctx(i.ctx).Info("validate config", "err", err)
log.Ctx(i.ctx).Info("validate config", "err", err)
return err
}

cfg := &inspServerConfig{}
err = i.v.Unmarshal(cfg)
if err != nil {
slog.Ctx(i.ctx).Info("validate unmarshal config", "err", err)
log.Ctx(i.ctx).Info("validate unmarshal config", "err", err)
return err
}

Expand All @@ -147,7 +152,7 @@ func (i *inspServer) MergeConfig() error {

func (i *inspServer) start() error {
if err := gops.Listen(gops.Options{}); err != nil {
slog.Ctx(i.ctx).Info("start gops", "err", err)
log.Ctx(i.ctx).Info("start gops", "err", err)
}

go func() {
Expand All @@ -173,10 +178,10 @@ func (i *inspServer) start() error {
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenaddr := fmt.Sprintf(":%d", i.config.Mconfig.Port)
slog.Ctx(i.ctx).Info("inspector start metric server", "listenaddr", listenaddr)
log.Ctx(i.ctx).Info("inspector start metric server", "listenaddr", listenaddr)
srv := &http.Server{Addr: listenaddr}
if err := srv.ListenAndServe(); err != nil {
slog.Ctx(i.ctx).Info("inspector start metric server", "err", err, "listenaddr", listenaddr)
log.Ctx(i.ctx).Info("inspector start metric server", "err", err, "listenaddr", listenaddr)
}
}()

Expand All @@ -186,13 +191,13 @@ func (i *inspServer) start() error {
proto.RegisterInspectorServer(s, e)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", i.config.Econfig.Port))
if err != nil {
slog.Ctx(i.ctx).Warn("inspector start event server", "port", i.config.Econfig.Port, "err", err)
log.Ctx(i.ctx).Warn("inspector start event server", "port", i.config.Econfig.Port, "err", err)
return
}
slog.Ctx(i.ctx).Info("inspector eserver serve", "port", i.config.Econfig.Port)
log.Ctx(i.ctx).Info("inspector eserver serve", "port", i.config.Econfig.Port)
// grpc server block there, handle it with goroutine
if err := s.Serve(listener); err != nil {
slog.Ctx(i.ctx).Warn("inspector eserver serve", "port", i.config.Econfig.Port, "err", err)
log.Ctx(i.ctx).Warn("inspector eserver serve", "port", i.config.Econfig.Port, "err", err)
return
}
}()
Expand All @@ -202,7 +207,7 @@ func (i *inspServer) start() error {
}

func WaitSignals(ctx context.Context, sgs ...os.Signal) {
slog.Ctx(ctx).Info("keep running and start waiting for signals")
log.Ctx(ctx).Info("keep running and start waiting for signals")
s := make(chan os.Signal, 1)
signal.Notify(s, sgs...)
<-s
Expand Down
Loading

0 comments on commit 8bdcbd2

Please sign in to comment.