Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(webui): enhance flow ui #239

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ build-controller:
build-btfhack:
CGO_ENABLED=0 go build -o bin/btfhack -ldflags $(ldflags) ./cmd/btfhack

.PHONY: build-btfhack
.PHONY: build-webconsole
build-webconsole:
cd webui && CGO_ENABLED=0 go build -o ../bin/webconsole -ldflags $(ldflags) .

Expand Down
43 changes: 33 additions & 10 deletions pkg/controller/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ func (s *Server) PingMesh(ctx *gin.Context) {
}

func (s *Server) GetFlowGraph(ctx *gin.Context) {
var ts time.Time
t := ctx.Query("time")
var ts, fs time.Time
f := ctx.Query("from")
t := ctx.Query("to")
if t != "" {
ti, err := strconv.Atoi(t)
if err != nil {
Expand All @@ -245,33 +246,55 @@ func (s *Server) GetFlowGraph(ctx *gin.Context) {
} else {
ts = time.Now()
}
result, _, err := s.controller.QueryPrometheus(ctx, "kubeskoop_flow_bytes", ts)
if f != "" {
ti, err := strconv.Atoi(f)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
}
fs = time.Unix(int64(ti), 0)
} else {
fs = ts.Add(-15 * time.Minute)
}

r := int(ts.Sub(fs).Seconds())

result, _, err := s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_bytes[%ds])", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector := result.(model.Vector)

podInfo, nodeInfo, err := s.controller.GetPodNodeInfoFromMetrics(ctx, ts)
g, err := graph.FromVector(vector)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error get pod info from metrics: %v", err)})
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error convert flow metrics to graph: %v", err)})
return
}
g.SetEdgeBytesFromVector(vector)

g, err := graph.FromVector(vector, podInfo, nodeInfo)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_packets[%ds])", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error convert flow metrics to graph: %v", err)})
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
g.SetEdgeBytesFromVector(vector)
vector = result.(model.Vector)
g.SetEdgePacketsFromVector(vector)

result, _, err = s.controller.QueryPrometheus(ctx, "kubeskoop_flow_packets", ts)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_packetloss_total[%ds])", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.SetEdgePacketsFromVector(vector)
g.SetEdgeDroppedFromVector(vector)

result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_tcpretrans_total[%ds])", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.SetEdgeRetransFromVector(vector)

jstr, err := g.ToJSON()
if err != nil {
Expand Down
117 changes: 52 additions & 65 deletions pkg/controller/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,79 +26,41 @@ type Edge struct {
Protocol string `json:"protocol"`
Bytes int `json:"bytes"`
Packets int `json:"packets"`
Dropped int `json:"dropped"`
Retrans int `json:"retrans"`
}

type FlowGraph struct {
Nodes map[string]*Node `json:"nodes"`
Edges map[string]*Edge `json:"edges"`
}

type podInfo struct {
Name string
Namespace string
NodeName string
IP string
}

type nodeInfo struct {
NodeName string
IP string
}

func NewFlowGraph() *FlowGraph {
return &FlowGraph{
Nodes: make(map[string]*Node),
Edges: make(map[string]*Edge),
}
}

func toPodInfo(m model.Vector) (map[string]podInfo, error) {
ret := make(map[string]podInfo)
for _, m := range m {
if _, ok := ret[string(m.Metric["ip"])]; ok {
continue
}
ret[string(m.Metric["ip"])] = podInfo{
Name: string(m.Metric["pod_name"]),
Namespace: string(m.Metric["pod_namespace"]),
NodeName: string(m.Metric["node_name"]),
IP: string(m.Metric["ip"]),
}
}
return ret, nil
}

func toNodeInfo(m model.Vector) (map[string]nodeInfo, error) {
ret := make(map[string]nodeInfo)
for _, m := range m {
if _, ok := ret[string(m.Metric["ip"])]; ok {
continue
}
ret[string(m.Metric["ip"])] = nodeInfo{
NodeName: string(m.Metric["node_name"]),
IP: string(m.Metric["ip"]),
}
}
return ret, nil
}

func createNode(ip string, podInfo map[string]podInfo, nodeInfo map[string]nodeInfo) Node {
func createNode(t, ip, podNamespace, podName, nodeName string) Node {
n := Node{
ID: ip,
IP: ip,
}

if i, ok := podInfo[ip]; ok {
switch t {
case "pod":
n.Type = "pod"
n.Name = i.Name
n.Namespace = i.Namespace
n.NodeName = i.NodeName
} else if i, ok := nodeInfo[ip]; ok {
n.Name = podName
n.Namespace = podNamespace
n.NodeName = nodeName
case "node":
n.Type = "node"
n.NodeName = i.NodeName
} else {
n.NodeName = nodeName
default:
n.Type = "external"
}

return n
}

Expand All @@ -112,7 +74,9 @@ func getEdgeID(v *model.Sample) string {
protocol, src, sport, dst, dport)
}

func createEdge(src, dst string, v *model.Sample) Edge {
func createEdge(v *model.Sample) Edge {
src := string(v.Metric["src"])
dst := string(v.Metric["dst"])
sport, _ := strconv.Atoi(string(v.Metric["sport"]))
dport, _ := strconv.Atoi(string(v.Metric["dport"]))
protocol := string(v.Metric["protocol"])
Expand All @@ -126,26 +90,31 @@ func createEdge(src, dst string, v *model.Sample) Edge {
}
}

func FromVector(m model.Vector, podInfo model.Vector, nodeInfo model.Vector) (*FlowGraph, error) {
func FromVector(m model.Vector) (*FlowGraph, error) {
g := NewFlowGraph()
pi, err := toPodInfo(podInfo)
if err != nil {
return nil, err
}
ni, err := toNodeInfo(nodeInfo)
if err != nil {
return nil, err
}
for _, v := range m {
src := string(v.Metric["src"])
dst := string(v.Metric["dst"])
g.AddNode(createNode(src, pi, ni))
g.AddNode(createNode(dst, pi, ni))
g.AddEdge(createEdge(src, dst, v))
g.AddNodesFromSample(v)
g.AddEdge(createEdge(v))
}
return g, nil
}

func (g *FlowGraph) AddNodesFromSample(v *model.Sample) {
ip := string(v.Metric["src"])
t := string(v.Metric["src_type"])
podName := string(v.Metric["src_pod"])
podNamespace := string(v.Metric["src_namespace"])
nodeName := string(v.Metric["src_node"])
g.AddNode(createNode(t, ip, podNamespace, podName, nodeName))

ip = string(v.Metric["dst"])
t = string(v.Metric["dst_type"])
podName = string(v.Metric["dst_pod"])
podNamespace = string(v.Metric["dst_namespace"])
nodeName = string(v.Metric["dst_node"])
g.AddNode(createNode(t, ip, podNamespace, podName, nodeName))
}

func (g *FlowGraph) AddNode(n Node) {
if _, ok := g.Nodes[n.ID]; !ok {
g.Nodes[n.ID] = &n
Expand Down Expand Up @@ -176,6 +145,24 @@ func (g *FlowGraph) SetEdgePacketsFromVector(m model.Vector) {
}
}

func (g *FlowGraph) SetEdgeDroppedFromVector(m model.Vector) {
for _, v := range m {
id := getEdgeID(v)
if _, ok := g.Edges[id]; ok {
g.Edges[id].Dropped = int(v.Value)
}
}
}

func (g *FlowGraph) SetEdgeRetransFromVector(m model.Vector) {
for _, v := range m {
id := getEdgeID(v)
if _, ok := g.Edges[id]; ok {
g.Edges[id].Retrans = int(v.Value)
}
}
}

func (g *FlowGraph) ToJSON() ([]byte, error) {
ret := struct {
Nodes []*Node `json:"nodes"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/service/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"time"

log "github.com/sirupsen/logrus"

promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)
Expand All @@ -13,6 +15,7 @@ func (c *controller) QueryPrometheus(ctx context.Context, query string, ts time.
if c.promClient == nil {
return nil, nil, errors.New("prometheus client is not initialized")
}
log.Infof("Querying prometheus %s", query)
a := promv1.NewAPI(c.promClient)

return a.Query(ctx, query, ts)
Expand Down
1 change: 1 addition & 0 deletions webui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"bizcharts": "^3.5.10",
"build-plugin-ice-i18n": "^0.2.2",
"d3": "^7.8.5",
"d3-force": "^3.0.0",
"moment": "^2.28.0",
"react": "^18.2.0",
"react-d3-force-layout": "^1.0.1",
Expand Down
2 changes: 1 addition & 1 deletion webui/src/pages/config/types.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"name": "udp"
},
{
"name": "info"
"name": "tcpretrans"
},
{
"name": "qdisc"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.flowGraph {
height: '80vh';
}
Loading
Loading