diff --git a/go.mod b/go.mod
index dfef27b0..897d279e 100644
--- a/go.mod
+++ b/go.mod
@@ -14,6 +14,7 @@ require (
github.com/cilium/ebpf v0.9.3
github.com/containerd/containerd v1.6.15
github.com/docker/docker v20.10.22+incompatible
+ github.com/fsnotify/fsnotify v1.6.0
github.com/golang/snappy v0.0.4
github.com/google/gops v0.3.26
github.com/google/uuid v1.3.0
@@ -100,7 +101,6 @@ require (
github.com/emicklei/go-restful v2.11.2-0.20200112161605-a7c079c43d51+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fatih/color v1.13.0 // indirect
- github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
diff --git a/pkg/exporter/cmd/diag_event.go b/pkg/exporter/cmd/diag_event.go
index b45b1607..953cbff1 100644
--- a/pkg/exporter/cmd/diag_event.go
+++ b/pkg/exporter/cmd/diag_event.go
@@ -50,7 +50,7 @@ var (
continue
}
- go pb.Start(cmd.Context())
+ go pb.Start(cmd.Context(), "")
go func() {
for evt := range ch {
ets, err := nettop.GetEntityByNetns(int(evt.Netns))
diff --git a/pkg/exporter/cmd/eventserver.go b/pkg/exporter/cmd/eventserver.go
index 22093c9f..7c79123d 100644
--- a/pkg/exporter/cmd/eventserver.go
+++ b/pkg/exporter/cmd/eventserver.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
+ "github.com/samber/lo"
+
"sync"
"time"
@@ -19,28 +21,26 @@ import (
type EServer struct {
proto.UnimplementedInspectorServer
- probes map[string]proto.EventProbe
- cpool map[string]chan<- proto.RawEvent
- mtx sync.Mutex
- ctx context.Context
- control chan struct{}
- config EventConfig
+ probes map[string]proto.EventProbe
+ subscribers map[string]chan<- proto.RawEvent
+ mtx sync.Mutex
+ ctx context.Context
+ control chan struct{}
+ config EventConfig
+ eventChan chan proto.RawEvent
+ lokiDatach chan proto.RawEvent
+ lokiIngester *lokiwrapper.Ingester
}
func NewEServer(ctx context.Context, config EventConfig) *EServer {
es := &EServer{
- probes: make(map[string]proto.EventProbe),
- cpool: make(map[string]chan<- proto.RawEvent),
- config: config,
- mtx: sync.Mutex{},
- ctx: ctx,
- control: make(chan struct{}),
- }
-
- if len(config.Probes) == 0 {
- // if no probes configured, keep loop channel empty
- slog.Ctx(ctx).Info("new eserver with no probe required")
- return es
+ probes: make(map[string]proto.EventProbe),
+ subscribers: make(map[string]chan<- proto.RawEvent),
+ config: config,
+ mtx: sync.Mutex{},
+ ctx: ctx,
+ control: make(chan struct{}),
+ eventChan: make(chan proto.RawEvent),
}
for _, p := range config.Probes {
@@ -50,28 +50,126 @@ func NewEServer(ctx context.Context, config EventConfig) *EServer {
continue
}
es.probes[p] = ep
- go ep.Start(ctx)
+ err := ep.Register(es.eventChan)
+ if err != nil {
+ slog.Ctx(ctx).Warn("probe register failed", "probe", p)
+ continue
+ }
+ go ep.Start(ctx, proto.ProbeTypeEvent)
slog.Ctx(ctx).Debug("eserver start", "subject", p)
}
// start cache loop
- slog.Ctx(ctx).Debug("new eserver start cache loop")
+ slog.Ctx(ctx).Debug("new eserver start dispatch loop")
go es.dispatcher(ctx, es.control)
+ err := es.enableLoki()
+ if err != nil {
+ slog.Ctx(ctx).Warn("enable loki failed", "err", err)
+ }
+ return es
+}
+
+func (e *EServer) enableLoki() error {
+ if e.lokiIngester != nil {
+ return nil
+ }
+
// handle grafana loki ingester preparation
- if config.LokiEnable && config.LokiAddress != "" {
+ if e.config.LokiEnable && e.config.LokiAddress != "" {
+ slog.Ctx(e.ctx).Debug("enabling loki ingester", "address", e.config.LokiAddress)
datach := make(chan proto.RawEvent)
- ingester, err := lokiwrapper.NewLokiIngester(ctx, config.LokiAddress, nettop.GetNodeName())
+ ingester, err := lokiwrapper.NewLokiIngester(e.ctx, e.config.LokiAddress, nettop.GetNodeName())
if err != nil {
- slog.Ctx(ctx).Info("new loki ingester", "err", err, "client", ingester.Name())
+ slog.Ctx(e.ctx).Info("new loki ingester", "err", err, "client", ingester.Name())
} else {
- es.subscribe(ingester.Name(), datach)
- go ingester.Watch(ctx, datach)
+ e.subscribe(ingester.Name(), datach)
+ go ingester.Watch(e.ctx, datach)
}
+ e.lokiDatach = datach
+ e.lokiIngester = ingester
+ }
+ return nil
+}
+
+func (e *EServer) disableLoki() error {
+ if e.lokiIngester == nil {
+ return nil
}
- return es
+ slog.Ctx(e.ctx).Debug("disabling loki ingester")
+ e.unsubscribe(e.lokiIngester.Name())
+
+ err := e.lokiIngester.Close()
+ if err != nil {
+ return err
+ }
+
+ close(e.lokiDatach)
+ e.lokiIngester = nil
+ e.lokiDatach = nil
+ return nil
+}
+
+func (e *EServer) Reload(config EventConfig) error {
+ enabled := lo.Keys(e.probes)
+ toClose, toStart := lo.Difference(enabled, config.Probes)
+ slog.Ctx(e.ctx).Info("reload event probes", "close", toClose, "enable", toStart)
+
+ for _, n := range toClose {
+ p, ok := e.probes[n]
+ if !ok {
+ slog.Ctx(e.ctx).Warn("probe not found in enabled probes, skip.", "probe", n)
+ continue
+ }
+
+ err := p.Close(proto.ProbeTypeEvent)
+ if err != nil {
+ slog.Ctx(e.ctx).Warn("close probe error", "probe", n, "err", err)
+ continue
+ }
+
+ // clear event channel
+ err = p.Register(nil)
+ if err != nil {
+ slog.Ctx(e.ctx).Warn("unregister probe error", "probe", n, "err", err)
+ continue
+ }
+
+ delete(e.probes, n)
+ }
+
+ for _, n := range toStart {
+ p := probe.GetEventProbe(n)
+ if p == nil {
+ slog.Ctx(e.ctx).Info("get event probe nil", "probe", p)
+ continue
+ }
+ e.probes[n] = p
+ go p.Start(e.ctx, proto.ProbeTypeEvent)
+ slog.Ctx(e.ctx).Debug("eserver start", "subject", p)
+
+ err := p.Register(e.eventChan)
+ if err != nil {
+ slog.Ctx(e.ctx).Info("register receiver", "probe", p, "err", err)
+ continue
+ }
+ }
+
+ e.config = config
+
+ if config.LokiEnable {
+ if err := e.enableLoki(); err != nil {
+ slog.Ctx(e.ctx).Warn("enable loki error", "err", err)
+ }
+ } else {
+ if err := e.disableLoki(); err != nil {
+ slog.Ctx(e.ctx).Warn("disable loki error", "err", err)
+ }
+ }
+
+ return nil
}
func (e *EServer) WatchEvent(_ *proto.WatchRequest, srv proto.Inspector_WatchEventServer) error {
@@ -109,35 +207,23 @@ func (e *EServer) subscribe(client string, ch chan<- proto.RawEvent) {
e.mtx.Lock()
defer e.mtx.Unlock()
- e.cpool[client] = ch
+ e.subscribers[client] = ch
}
func (e *EServer) unsubscribe(client string) {
e.mtx.Lock()
defer e.mtx.Unlock()
- delete(e.cpool, client)
+ delete(e.subscribers, client)
}
func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) {
- pbs := e.probes
- receiver := make(chan proto.RawEvent)
- for p, pb := range pbs {
- err := pb.Register(receiver)
- if err != nil {
- slog.Ctx(ctx).Info("register receiver", "probe", p, "err", err)
- continue
- }
- }
-
- slog.Ctx(ctx).Debug("dispatcher", "probes", pbs)
for {
select {
-
case <-stopc:
- slog.Ctx(ctx).Debug("dispatcher exit of sop signal", "probes", pbs)
+ slog.Ctx(ctx).Debug("event dispatcher exited because of stop signal")
return
- case evt := <-receiver:
+ case evt := <-e.eventChan:
err := e.broadcast(evt)
if err != nil {
slog.Ctx(ctx).Info("dispatcher broadcast", "err", err, "event", evt)
@@ -149,7 +235,7 @@ func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) {
}
func (e *EServer) broadcast(evt proto.RawEvent) error {
- pbs := e.cpool
+ pbs := e.subscribers
ctx, cancelf := context.WithTimeout(e.ctx, 5*time.Second)
defer cancelf()
diff --git a/pkg/exporter/cmd/metricserver.go b/pkg/exporter/cmd/metricserver.go
index 45458360..04a6190b 100644
--- a/pkg/exporter/cmd/metricserver.go
+++ b/pkg/exporter/cmd/metricserver.go
@@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
+ "sync"
+
+ "github.com/samber/lo"
"strings"
"time"
@@ -34,16 +37,12 @@ var (
func NewMServer(ctx context.Context, config MetricConfig) *MServer {
ms := &MServer{
ctx: ctx,
+ mtx: sync.Mutex{},
descs: make(map[string]*prometheus.Desc),
config: config,
probes: make(map[string]proto.MetricProbe),
metricCache: cache.New(3*cacheUpdateInterval, 10*cacheUpdateInterval),
- }
-
- if len(config.Probes) == 0 {
- // if no probes configured, keep loop channel empty
- slog.Ctx(ctx).Info("new mserver with no probe required")
- return ms
+ loopctrl: make(chan struct{}),
}
for _, p := range config.Probes {
@@ -53,7 +52,7 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
continue
}
ms.probes[p] = mp
- go mp.Start(ctx)
+ go mp.Start(ctx, proto.ProbeTypeMetrics)
slog.Ctx(ctx).Debug("new mserver add subject", "subject", p)
}
@@ -77,7 +76,6 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
}
// start cache loop
slog.Ctx(ctx).Debug("new mserver start cache loop")
- ms.loopctrl = make(chan struct{})
go ms.collectLoop(ctx, cacheUpdateInterval, ms.loopctrl)
return ms
@@ -85,6 +83,7 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
type MServer struct {
ctx context.Context
+ mtx sync.Mutex
descs map[string]*prometheus.Desc
config MetricConfig
metricCache *cache.Cache
@@ -104,7 +103,61 @@ func (s *MServer) Close() {
}
}
+func (s *MServer) Reload(config MetricConfig) error {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ enabled := lo.Keys(s.probes)
+ toClose, toStart := lo.Difference(enabled, config.Probes)
+ slog.Ctx(s.ctx).Info("reload metric probes", "close", toClose, "enable", toStart)
+
+ for _, n := range toClose {
+ p, ok := s.probes[n]
+ if !ok {
+ slog.Ctx(s.ctx).Warn("probe not found in enabled probes, skip.", "probe", n)
+ continue
+ }
+
+ err := p.Close(proto.ProbeTypeMetrics)
+ if err != nil {
+ slog.Ctx(s.ctx).Warn("close probe error", "probe", n, "err", err)
+ continue
+ }
+ delete(s.probes, n)
+ }
+
+ for _, n := range toStart {
+ p := probe.GetProbe(n)
+ if p == nil {
+ slog.Ctx(s.ctx).Info("get metric probe nil", "probe", n)
+ continue
+ }
+ s.probes[n] = p
+ go p.Start(s.ctx, proto.ProbeTypeMetrics)
+ slog.Ctx(s.ctx).Debug("new mserver add subject", "subject", n)
+ }
+
+ for sub, mp := range s.probes {
+ mnames := mp.GetMetricNames()
+ for _, mname := range mnames {
+ if !strings.HasPrefix(mname, sub) {
+ continue
+ }
+ if s.config.Verbose {
+ s.descs[mname] = getDescOfMetricVerbose(sub, mname, s.additionalLabels)
+ } else {
+ s.descs[mname] = getDescOfMetric(sub, mname)
+ }
+ }
+ }
+
+ s.config = config
+ return nil
+}
+
func (s *MServer) Collect(ch chan<- prometheus.Metric) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
slog.Ctx(s.ctx).Debug("metric server collect request in", "metric count", len(s.descs))
for mname, desc := range s.descs {
data, err := s.collectOnceCache(s.ctx, mname)
@@ -204,6 +257,11 @@ func (s *MServer) collectLoop(ctx context.Context, interval time.Duration, stopc
// collectWorkerSerial collect metric data in serial
func (s *MServer) collectWorkerSerial(ctx context.Context) error {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+ if len(s.probes) == 0 {
+ return nil
+ }
slog.Ctx(s.ctx).Debug("collect worker serial start")
workdone := make(chan struct{})
cstart := time.Now()
diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go
index 0d252fa7..ff03e029 100644
--- a/pkg/exporter/cmd/server.go
+++ b/pkg/exporter/cmd/server.go
@@ -13,6 +13,8 @@ import (
"os/signal"
"syscall"
+ "github.com/fsnotify/fsnotify"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/alibaba/kubeskoop/pkg/exporter/proto"
@@ -68,10 +70,14 @@ var (
defer nettop.StopCache()
// config hot reload process
- // insp.v.OnConfigChange(func(e fsnotify.Event) {
-
- // })
- // insp.v.WatchConfig()
+ insp.v.OnConfigChange(func(e fsnotify.Event) {
+ log.Ctx(insp.ctx).Info("Start reload config")
+ if err := insp.reload(); err != nil {
+ log.Ctx(insp.ctx).Warn("Reload config error", "err", err)
+ }
+ log.Ctx(insp.ctx).Info("Config reload succeed.")
+ })
+ insp.v.WatchConfig()
// block here
err = insp.start()
@@ -120,9 +126,11 @@ type EventConfig struct {
}
type inspServer struct {
- v viper.Viper
- config inspServerConfig
- ctx context.Context
+ v viper.Viper
+ config inspServerConfig
+ ctx context.Context
+ mserver *MServer
+ eserver *EServer
}
func (i *inspServer) MergeConfig() error {
@@ -148,22 +156,43 @@ func (i *inspServer) MergeConfig() error {
return nil
}
+func (i *inspServer) reload() error {
+ cfg := inspServerConfig{}
+ err := i.v.Unmarshal(&cfg)
+ if err != nil {
+ return err
+ }
+
+ err = i.mserver.Reload(cfg.Mconfig)
+ if err != nil {
+ return fmt.Errorf("reload metric server error: %s", err)
+ }
+
+ err = i.eserver.Reload(cfg.Econfig)
+ if err != nil {
+ return fmt.Errorf("reload event server error: %s", err)
+ }
+
+ i.config = cfg
+ return nil
+}
+
func (i *inspServer) start() error {
if err := gops.Listen(gops.Options{}); err != nil {
log.Ctx(i.ctx).Info("start gops", "err", err)
}
go func() {
- ms := NewMServer(i.ctx, i.config.Mconfig)
- defer ms.Close()
+ i.mserver = NewMServer(i.ctx, i.config.Mconfig)
+ defer i.mserver.Close()
r := prometheus.NewRegistry()
- r.MustRegister(ms)
+ r.MustRegister(i.mserver)
handler := promhttp.HandlerFor(prometheus.Gatherers{
r,
}, promhttp.HandlerOpts{})
http.Handle("/metrics", handler)
- http.Handle("/", http.HandlerFunc(defaulPage))
+ http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/config", http.HandlerFunc(i.configPage))
http.Handle("/status", http.HandlerFunc(status))
if i.config.DebugMode {
@@ -185,8 +214,8 @@ func (i *inspServer) start() error {
go func() {
s := grpc.NewServer()
- e := NewEServer(i.ctx, i.config.Econfig)
- proto.RegisterInspectorServer(s, e)
+ i.eserver = NewEServer(i.ctx, i.config.Econfig)
+ proto.RegisterInspectorServer(s, i.eserver)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", i.config.Econfig.Port))
if err != nil {
log.Ctx(i.ctx).Warn("inspector start event server", "port", i.config.Econfig.Port, "err", err)
@@ -211,7 +240,7 @@ func WaitSignals(ctx context.Context, sgs ...os.Signal) {
<-s
}
-func defaulPage(w http.ResponseWriter, _ *http.Request) {
+func defaultPage(w http.ResponseWriter, _ *http.Request) {
// nolint
w.Write([]byte(`
Net Exporter
diff --git a/pkg/exporter/loki/client.go b/pkg/exporter/loki/client.go
index d5a2eb08..1bb6ece2 100644
--- a/pkg/exporter/loki/client.go
+++ b/pkg/exporter/loki/client.go
@@ -95,6 +95,12 @@ func (i *Ingester) Name() string {
return i.name
}
+func (i *Ingester) Close() error {
+ i.quit <- struct{}{}
+ i.waitGroup.Wait()
+ return nil
+}
+
type Ingester struct {
name string
PushURL string
diff --git a/pkg/exporter/nettop/cache.go b/pkg/exporter/nettop/cache.go
index 9889abf0..c7d3cfa0 100644
--- a/pkg/exporter/nettop/cache.go
+++ b/pkg/exporter/nettop/cache.go
@@ -158,7 +158,7 @@ func (e *Entity) GetPids() []int {
}
func StartCache(ctx context.Context) error {
- slog.Ctx(ctx).Info("nettop cache loop statrt", "interval", cacheUpdateInterval)
+ slog.Ctx(ctx).Info("nettop cache loop start", "interval", cacheUpdateInterval)
return cacheDaemonLoop(ctx, control)
}
diff --git a/pkg/exporter/probe/flow/flow.go b/pkg/exporter/probe/flow/flow.go
index da3cf859..e8d7b226 100644
--- a/pkg/exporter/probe/flow/flow.go
+++ b/pkg/exporter/probe/flow/flow.go
@@ -39,7 +39,7 @@ func GetProbe() proto.MetricProbe {
return probe
}
-func (f *Probe) Start(_ context.Context) {
+func (f *Probe) Start(_ context.Context, _ proto.ProbeType) {
log.Info("flow probe starting...")
eth0, err := netlink.LinkByName(dev)
@@ -197,7 +197,7 @@ func replaceQdisc(link netlink.Link) error {
return netlink.QdiscReplace(qdisc)
}
-func (f *Probe) Close() error {
+func (f *Probe) Close(_ proto.ProbeType) error {
if f.enable {
return bpfObjs.Close()
}
diff --git a/pkg/exporter/probe/metric.go b/pkg/exporter/probe/metric.go
index 991fdfe7..1739c26b 100644
--- a/pkg/exporter/probe/metric.go
+++ b/pkg/exporter/probe/metric.go
@@ -43,7 +43,7 @@ func init() {
availmprobes["socketlatency"] = tracesocketlatency.GetProbe()
availmprobes["packetloss"] = tracepacketloss.GetProbe()
availmprobes["net_softirq"] = tracenetsoftirq.GetProbe()
- availmprobes["netiftxlat"] = tracenetif.GetProbe()
+ availmprobes["netiftxlatency"] = tracenetif.GetProbe()
availmprobes["kernellatency"] = tracekernel.GetProbe()
availmprobes["tcpsummary"] = proctcpsummary.GetProbe()
availmprobes["virtcmdlatency"] = tracevirtcmdlat.GetProbe()
diff --git a/pkg/exporter/probe/nlconntrack/conntrackevents.go b/pkg/exporter/probe/nlconntrack/conntrackevents.go
index 5f85f874..6be421e0 100644
--- a/pkg/exporter/probe/nlconntrack/conntrackevents.go
+++ b/pkg/exporter/probe/nlconntrack/conntrackevents.go
@@ -58,7 +58,7 @@ func (p *Probe) Ready() bool {
return true
}
-func (p *Probe) Close() error {
+func (p *Probe) Close(_ proto.ProbeType) error {
p.mtx.Lock()
defer p.mtx.Unlock()
@@ -71,8 +71,7 @@ func (p *Probe) GetEventNames() []string {
return events
}
-func (p *Probe) Start(ctx context.Context) {
-
+func (p *Probe) Start(ctx context.Context, _ proto.ProbeType) {
p.mtx.Lock()
p.enable = true
p.mtx.Unlock()
diff --git a/pkg/exporter/probe/nlqdisc/nlqdiscstats.go b/pkg/exporter/probe/nlqdisc/nlqdiscstats.go
index b6e1644e..f8337ccc 100644
--- a/pkg/exporter/probe/nlqdisc/nlqdiscstats.go
+++ b/pkg/exporter/probe/nlqdisc/nlqdiscstats.go
@@ -10,6 +10,8 @@ import (
"sync"
"syscall"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/mdlayher/netlink"
@@ -69,7 +71,7 @@ func (p *Probe) Name() string {
return ModuleName
}
-func (p *Probe) Start(_ context.Context) {
+func (p *Probe) Start(_ context.Context, _ proto.ProbeType) {
}
func (p *Probe) Ready() bool {
@@ -96,7 +98,7 @@ func (p *Probe) GetMetricNames() []string {
return res
}
-func (p *Probe) Close() error {
+func (p *Probe) Close(_ proto.ProbeType) error {
p.mtx.Lock()
defer p.mtx.Unlock()
diff --git a/pkg/exporter/probe/procfd/procfd.go b/pkg/exporter/probe/procfd/procfd.go
index 3b1c4ec0..003444e1 100644
--- a/pkg/exporter/probe/procfd/procfd.go
+++ b/pkg/exporter/probe/procfd/procfd.go
@@ -6,6 +6,8 @@ import (
"os"
"strings"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"golang.org/x/exp/slog"
@@ -28,11 +30,11 @@ func GetProbe() *ProcFD {
return probe
}
-func (s *ProcFD) Close() error {
+func (s *ProcFD) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcFD) Start(_ context.Context) {
+func (s *ProcFD) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcFD) Ready() bool {
diff --git a/pkg/exporter/probe/procio/procio.go b/pkg/exporter/probe/procio/procio.go
index 656292d2..6122e4ad 100644
--- a/pkg/exporter/probe/procio/procio.go
+++ b/pkg/exporter/probe/procio/procio.go
@@ -7,6 +7,8 @@ import (
"os"
"strings"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/prometheus/procfs"
@@ -35,11 +37,11 @@ func GetProbe() *ProcIO {
return probe
}
-func (s *ProcIO) Close() error {
+func (s *ProcIO) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcIO) Start(_ context.Context) {
+func (s *ProcIO) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcIO) Ready() bool {
diff --git a/pkg/exporter/probe/procipvs/ipvsservicestats.go b/pkg/exporter/probe/procipvs/ipvsservicestats.go
index 641f7f8d..58267f27 100644
--- a/pkg/exporter/probe/procipvs/ipvsservicestats.go
+++ b/pkg/exporter/probe/procipvs/ipvsservicestats.go
@@ -9,6 +9,8 @@ import (
"os"
"strconv"
"strings"
+
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
)
const maxBufferSize = 1024 * 1024
@@ -52,11 +54,11 @@ func (p *ProcIPVS) Name() string {
return ModuleName
}
-func (p *ProcIPVS) Close() error {
+func (p *ProcIPVS) Close(_ proto.ProbeType) error {
return nil
}
-func (p *ProcIPVS) Start(_ context.Context) {
+func (p *ProcIPVS) Start(_ context.Context, _ proto.ProbeType) {
}
func (p *ProcIPVS) Ready() bool {
diff --git a/pkg/exporter/probe/procnetdev/procnetdev.go b/pkg/exporter/probe/procnetdev/procnetdev.go
index 1efb82de..89df0cd2 100644
--- a/pkg/exporter/probe/procnetdev/procnetdev.go
+++ b/pkg/exporter/probe/procnetdev/procnetdev.go
@@ -7,6 +7,8 @@ import (
"strings"
"sync"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/prometheus/procfs"
@@ -43,11 +45,11 @@ func GetProbe() *ProcNetdev {
return probe
}
-func (s *ProcNetdev) Close() error {
+func (s *ProcNetdev) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcNetdev) Start(_ context.Context) {
+func (s *ProcNetdev) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcNetdev) Ready() bool {
diff --git a/pkg/exporter/probe/procnetstat/procnetstat.go b/pkg/exporter/probe/procnetstat/procnetstat.go
index 8838ed1c..2d86faed 100644
--- a/pkg/exporter/probe/procnetstat/procnetstat.go
+++ b/pkg/exporter/probe/procnetstat/procnetstat.go
@@ -10,6 +10,8 @@ import (
"strconv"
"strings"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"golang.org/x/exp/slog"
@@ -103,11 +105,11 @@ func GetProbe() *ProcNetstat {
return probe
}
-func (s *ProcNetstat) Close() error {
+func (s *ProcNetstat) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcNetstat) Start(_ context.Context) {
+func (s *ProcNetstat) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcNetstat) Ready() bool {
diff --git a/pkg/exporter/probe/procsnmp/procsnmp.go b/pkg/exporter/probe/procsnmp/procsnmp.go
index 9c2f7338..1cfb81db 100644
--- a/pkg/exporter/probe/procsnmp/procsnmp.go
+++ b/pkg/exporter/probe/procsnmp/procsnmp.go
@@ -10,6 +10,8 @@ import (
"strings"
"sync"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"golang.org/x/exp/slog"
@@ -80,11 +82,11 @@ func GetProbe() *ProcSNMP {
type ProcSNMP struct {
}
-func (s *ProcSNMP) Close() error {
+func (s *ProcSNMP) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcSNMP) Start(_ context.Context) {
+func (s *ProcSNMP) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcSNMP) Ready() bool {
diff --git a/pkg/exporter/probe/procsock/procsock.go b/pkg/exporter/probe/procsock/procsock.go
index 594dc7ea..371a670d 100644
--- a/pkg/exporter/probe/procsock/procsock.go
+++ b/pkg/exporter/probe/procsock/procsock.go
@@ -7,6 +7,8 @@ import (
"errors"
"fmt"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"io"
"os"
"strconv"
@@ -36,11 +38,11 @@ func GetProbe() *ProcSock {
return probe
}
-func (s *ProcSock) Close() error {
+func (s *ProcSock) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcSock) Start(_ context.Context) {
+func (s *ProcSock) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcSock) Ready() bool {
diff --git a/pkg/exporter/probe/procsoftnet/procsoftnet.go b/pkg/exporter/probe/procsoftnet/procsoftnet.go
index e2bcd693..be959b51 100644
--- a/pkg/exporter/probe/procsoftnet/procsoftnet.go
+++ b/pkg/exporter/probe/procsoftnet/procsoftnet.go
@@ -5,6 +5,8 @@ import (
"context"
"fmt"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"io"
"os"
"strconv"
@@ -39,11 +41,11 @@ func GetProbe() *ProcSoftnet {
return probe
}
-func (s *ProcSoftnet) Close() error {
+func (s *ProcSoftnet) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcSoftnet) Start(_ context.Context) {
+func (s *ProcSoftnet) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcSoftnet) Ready() bool {
diff --git a/pkg/exporter/probe/proctcpsummary/proctcp.go b/pkg/exporter/probe/proctcpsummary/proctcp.go
index 1d46856c..c2e1f93a 100644
--- a/pkg/exporter/probe/proctcpsummary/proctcp.go
+++ b/pkg/exporter/probe/proctcpsummary/proctcp.go
@@ -6,6 +6,8 @@ import (
"encoding/hex"
"fmt"
+ "github.com/alibaba/kubeskoop/pkg/exporter/proto"
+
"io"
"net"
"os"
@@ -79,11 +81,11 @@ func GetProbe() *ProcTCP {
return probe
}
-func (s *ProcTCP) Close() error {
+func (s *ProcTCP) Close(_ proto.ProbeType) error {
return nil
}
-func (s *ProcTCP) Start(_ context.Context) {
+func (s *ProcTCP) Start(_ context.Context, _ proto.ProbeType) {
}
func (s *ProcTCP) Ready() bool {
diff --git a/pkg/exporter/probe/tracebiolatency/tracebiolatency.go b/pkg/exporter/probe/tracebiolatency/tracebiolatency.go
index 3968f11c..7c973a30 100644
--- a/pkg/exporter/probe/tracebiolatency/tracebiolatency.go
+++ b/pkg/exporter/probe/tracebiolatency/tracebiolatency.go
@@ -61,12 +61,14 @@ func (p *BiolatencyProbe) Ready() bool {
return p.enable
}
-func (p *BiolatencyProbe) Close() error {
+func (p *BiolatencyProbe) Close(_ proto.ProbeType) error {
if p.enable {
for _, link := range links {
link.Close()
}
links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
}
if perfReader != nil {
@@ -81,7 +83,7 @@ func (p *BiolatencyProbe) GetEventNames() []string {
return events
}
-func (p *BiolatencyProbe) Start(ctx context.Context) {
+func (p *BiolatencyProbe) Start(ctx context.Context, _ proto.ProbeType) {
p.once.Do(func() {
err := start()
if err != nil {
@@ -92,7 +94,7 @@ func (p *BiolatencyProbe) Start(ctx context.Context) {
})
if !p.enable {
- // if load failed, do nat start process
+ // if load failed, do not start process
return
}
diff --git a/pkg/exporter/probe/tracekernel/tracekernel.go b/pkg/exporter/probe/tracekernel/tracekernel.go
index d645a97d..cd24bb2e 100644
--- a/pkg/exporter/probe/tracekernel/tracekernel.go
+++ b/pkg/exporter/probe/tracekernel/tracekernel.go
@@ -46,7 +46,7 @@ const (
var (
ModuleName = "insp_kernellatency" // nolint
- probe = &KernelLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}}
+ probe = &KernelLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}}
objs = bpfObjects{}
links = []link.Link{}
@@ -66,10 +66,11 @@ func init() {
}
type KernelLatencyProbe struct {
- enable bool
- sub chan<- proto.RawEvent
- once sync.Once
- mtx sync.Mutex
+ enable bool
+ sub chan<- proto.RawEvent
+ once sync.Once
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *KernelLatencyProbe) Name() string {
@@ -84,14 +85,28 @@ func (p *KernelLatencyProbe) GetEventNames() []string {
return events
}
-func (p *KernelLatencyProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
+func (p *KernelLatencyProbe) Close(probeType proto.ProbeType) error {
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
+ }
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
}
+ for _, link := range links {
+ link.Close()
+ }
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+ metricsMap = map[string]map[uint32]uint64{}
+
+ delete(p.enabledProbes, probeType)
return nil
}
@@ -111,10 +126,12 @@ func (p *KernelLatencyProbe) Collect(_ context.Context) (map[string]map[uint32]u
return metricsMap, nil
}
-func (p *KernelLatencyProbe) Start(ctx context.Context) {
+func (p *KernelLatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) {
if p.enable {
+ p.enabledProbes[probeType] = true
return
}
+
p.once.Do(func() {
err := loadSync()
if err != nil {
@@ -125,9 +142,10 @@ func (p *KernelLatencyProbe) Start(ctx context.Context) {
})
if !p.enable {
- // if load failed, do nat start process
+ // if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
go p.startRX(ctx)
// go p.startTX(ctx)
diff --git a/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go b/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go
index f72168b7..b0c7c14e 100644
--- a/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go
+++ b/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go
@@ -34,7 +34,7 @@ const (
var (
ModuleName = "insp_netiftxlat" // nolint
- probe = &NetifTxlatencyProbe{once: sync.Once{}}
+ probe = &NetifTxlatencyProbe{once: sync.Once{}, enabledProbes: map[proto.ProbeType]bool{}}
links = []link.Link{}
events = []string{"TXLAT_QDISC_100MS", "TXLAT_NETDEV_100MS"}
metrics = []string{TXLAT_QDISC_SLOW, TXLAT_NETDEV_SLOW}
@@ -54,18 +54,20 @@ func init() {
}
type NetifTxlatencyProbe struct {
- enable bool
- once sync.Once
- sub chan<- proto.RawEvent
- mtx sync.Mutex
+ enable bool
+ once sync.Once
+ sub chan<- proto.RawEvent
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *NetifTxlatencyProbe) Name() string {
return ModuleName
}
-func (p *NetifTxlatencyProbe) Start(ctx context.Context) {
+func (p *NetifTxlatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) {
if p.enable {
+ p.enabledProbes[probeType] = true
return
}
@@ -80,9 +82,10 @@ func (p *NetifTxlatencyProbe) Start(ctx context.Context) {
})
if !p.enable {
- // if load failed, do nat start process
+ // if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
slog.Debug("start probe", "module", ModuleName)
if perfReader == nil {
@@ -152,19 +155,32 @@ func (p *NetifTxlatencyProbe) Ready() bool {
return p.enable
}
-func (p *NetifTxlatencyProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
- }
-
+func (p *NetifTxlatencyProbe) Close(probeType proto.ProbeType) error {
if perfReader != nil {
perfReader.Close()
perfReader = nil
}
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
+ }
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
+ }
+
+ for _, link := range links {
+ link.Close()
+ }
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+ metricsMap = map[string]map[uint32]uint64{}
+
return nil
}
diff --git a/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go b/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go
index 44d53d10..ebf7525a 100644
--- a/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go
+++ b/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go
@@ -30,7 +30,7 @@ const (
var (
ModuleName = "insp_netsoftirq" // nolint
- probe = &NetSoftirqProbe{once: sync.Once{}, mtx: sync.Mutex{}}
+ probe = &NetSoftirqProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}}
objs = bpfObjects{}
links = []link.Link{}
metricsMap = map[string]map[uint32]uint64{}
@@ -52,10 +52,11 @@ func init() {
}
type NetSoftirqProbe struct {
- enable bool
- sub chan<- proto.RawEvent
- once sync.Once
- mtx sync.Mutex
+ enable bool
+ sub chan<- proto.RawEvent
+ once sync.Once
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *NetSoftirqProbe) Name() string {
@@ -75,23 +76,37 @@ func (p *NetSoftirqProbe) GetMetricNames() []string {
}
func (p *NetSoftirqProbe) Collect(_ context.Context) (map[string]map[uint32]uint64, error) {
-
return metricsMap, nil
}
-func (p *NetSoftirqProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
+func (p *NetSoftirqProbe) Close(probeType proto.ProbeType) error {
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
+ }
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
}
+ for _, link := range links {
+ link.Close()
+ }
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+ metricsMap = map[string]map[uint32]uint64{}
+
+ delete(p.enabledProbes, probeType)
return nil
}
-func (p *NetSoftirqProbe) Start(ctx context.Context) {
+func (p *NetSoftirqProbe) Start(ctx context.Context, probeType proto.ProbeType) {
if p.enable {
+ p.enabledProbes[probeType] = true
return
}
@@ -108,6 +123,7 @@ func (p *NetSoftirqProbe) Start(ctx context.Context) {
// if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
reader, err := perf.NewReader(objs.bpfMaps.InspSoftirqEvents, int(unsafe.Sizeof(bpfInspSoftirqEventT{})))
if err != nil {
diff --git a/pkg/exporter/probe/tracepacketloss/packetloss.go b/pkg/exporter/probe/tracepacketloss/packetloss.go
index fb2bf260..e237867a 100644
--- a/pkg/exporter/probe/tracepacketloss/packetloss.go
+++ b/pkg/exporter/probe/tracepacketloss/packetloss.go
@@ -48,7 +48,7 @@ var (
tcprcvSymbol = "tcp_v4_rcv"
tcpdorcvSymbol = "tcp_v4_do_rcv"
- probe = &PacketLossProbe{}
+ probe = &PacketLossProbe{enabledProbes: map[proto.ProbeType]bool{}}
objs = bpfObjects{}
links = []link.Link{}
events = []string{PACKETLOSS}
@@ -94,10 +94,11 @@ func GetProbe() *PacketLossProbe {
}
type PacketLossProbe struct {
- enable bool
- sub chan<- proto.RawEvent
- once sync.Once
- mtx sync.Mutex
+ enable bool
+ sub chan<- proto.RawEvent
+ once sync.Once
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *PacketLossProbe) Name() string {
@@ -120,14 +121,27 @@ func (p *PacketLossProbe) GetMetricNames() []string {
return res
}
-func (p *PacketLossProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
+func (p *PacketLossProbe) Close(probeType proto.ProbeType) error {
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
+ }
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
+ }
+
+ for _, link := range links {
+ link.Close()
}
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+ delete(p.enabledProbes, probeType)
return nil
}
@@ -215,8 +229,9 @@ func (p *PacketLossProbe) Collect(ctx context.Context) (map[string]map[uint32]ui
return resMap, nil
}
-func (p *PacketLossProbe) Start(ctx context.Context) {
+func (p *PacketLossProbe) Start(ctx context.Context, probeType proto.ProbeType) {
if p.enable {
+ p.enabledProbes[probeType] = true
return
}
@@ -233,6 +248,7 @@ func (p *PacketLossProbe) Start(ctx context.Context) {
// if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
reader, err := perf.NewReader(objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{})))
if err != nil {
diff --git a/pkg/exporter/probe/tracesocketlatency/socketlatency.go b/pkg/exporter/probe/tracesocketlatency/socketlatency.go
index ad41f198..1a4b407a 100644
--- a/pkg/exporter/probe/tracesocketlatency/socketlatency.go
+++ b/pkg/exporter/probe/tracesocketlatency/socketlatency.go
@@ -55,7 +55,7 @@ const (
)
var (
- probe = &SocketLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}}
+ probe = &SocketLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}}
objs = bpfObjects{}
links = []link.Link{}
events = []string{SOCKETLAT_READSLOW, SOCKETLAT_SENDSLOW}
@@ -68,10 +68,11 @@ func GetProbe() *SocketLatencyProbe {
}
type SocketLatencyProbe struct {
- enable bool
- sub chan<- proto.RawEvent
- once sync.Once
- mtx sync.Mutex
+ enable bool
+ sub chan<- proto.RawEvent
+ once sync.Once
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *SocketLatencyProbe) Name() string {
@@ -86,14 +87,27 @@ func (p *SocketLatencyProbe) GetEventNames() []string {
return events
}
-func (p *SocketLatencyProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
+func (p *SocketLatencyProbe) Close(probeType proto.ProbeType) error {
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
+ }
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
+ }
+
+ for _, link := range links {
+ link.Close()
}
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+ delete(p.enabledProbes, probeType)
return nil
}
@@ -113,9 +127,10 @@ func (p *SocketLatencyProbe) Register(receiver chan<- proto.RawEvent) error {
return nil
}
-func (p *SocketLatencyProbe) Start(ctx context.Context) {
+func (p *SocketLatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) {
// metric and events both start probe
if p.enable {
+ p.enabledProbes[probeType] = true
return
}
p.once.Do(func() {
@@ -128,9 +143,10 @@ func (p *SocketLatencyProbe) Start(ctx context.Context) {
})
if !p.enable {
- // if load failed, do nat start process
+ // if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
p.startEventPoll(ctx)
}
diff --git a/pkg/exporter/probe/tracetcpreset/tracetcpreset.go b/pkg/exporter/probe/tracetcpreset/tracetcpreset.go
index 2b1dc1fc..a482c404 100644
--- a/pkg/exporter/probe/tracetcpreset/tracetcpreset.go
+++ b/pkg/exporter/probe/tracetcpreset/tracetcpreset.go
@@ -36,7 +36,7 @@ const (
var (
ModuleName = "insp_tcpreset" // nolint
objs = bpfObjects{}
- probe = &TCPResetProbe{once: sync.Once{}, mtx: sync.Mutex{}}
+ probe = &TCPResetProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}}
links = []link.Link{}
events = []string{TCPRESET_NOSOCK, TCPRESET_ACTIVE, TCPRESET_PROCESS, TCPRESET_RECEIVE}
@@ -47,10 +47,11 @@ func GetProbe() *TCPResetProbe {
}
type TCPResetProbe struct {
- enable bool
- sub chan<- proto.RawEvent
- once sync.Once
- mtx sync.Mutex
+ enable bool
+ sub chan<- proto.RawEvent
+ once sync.Once
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *TCPResetProbe) Name() string {
@@ -65,14 +66,27 @@ func (p *TCPResetProbe) GetEventNames() []string {
return events
}
-func (p *TCPResetProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
+func (p *TCPResetProbe) Close(probeType proto.ProbeType) error {
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
+ }
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
}
+ for _, link := range links {
+ link.Close()
+ }
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+
+ delete(p.enabledProbes, probeType)
return nil
}
@@ -84,7 +98,12 @@ func (p *TCPResetProbe) Register(receiver chan<- proto.RawEvent) error {
return nil
}
-func (p *TCPResetProbe) Start(ctx context.Context) {
+func (p *TCPResetProbe) Start(ctx context.Context, probeType proto.ProbeType) {
+ if p.enable {
+ p.enabledProbes[probeType] = true
+ return
+ }
+
p.once.Do(func() {
err := loadSync()
if err != nil {
@@ -98,6 +117,7 @@ func (p *TCPResetProbe) Start(ctx context.Context) {
// if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
reader, err := perf.NewReader(objs.bpfMaps.InspTcpresetEvents, int(unsafe.Sizeof(bpfInspTcpresetEventT{})))
if err != nil {
diff --git a/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go b/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go
index 3c13d8b2..7b68d118 100644
--- a/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go
+++ b/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go
@@ -33,7 +33,7 @@ const (
)
var (
- probe = &VirtcmdLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}}
+ probe = &VirtcmdLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}}
objs = bpfObjects{}
links = []link.Link{}
events = []string{VIRTCMDEXCUTE}
@@ -55,10 +55,11 @@ func init() {
}
type VirtcmdLatencyProbe struct {
- enable bool
- sub chan<- proto.RawEvent
- once sync.Once
- mtx sync.Mutex
+ enable bool
+ sub chan<- proto.RawEvent
+ once sync.Once
+ mtx sync.Mutex
+ enabledProbes map[proto.ProbeType]bool
}
func (p *VirtcmdLatencyProbe) Name() string {
@@ -77,14 +78,28 @@ func (p *VirtcmdLatencyProbe) GetEventNames() []string {
return events
}
-func (p *VirtcmdLatencyProbe) Close() error {
- if p.enable {
- for _, link := range links {
- link.Close()
- }
- links = []link.Link{}
+func (p *VirtcmdLatencyProbe) Close(probeType proto.ProbeType) error {
+ if !p.enable {
+ return nil
+ }
+
+ if _, ok := p.enabledProbes[probeType]; !ok {
+ return nil
}
+ if len(p.enabledProbes) > 1 {
+ delete(p.enabledProbes, probeType)
+ return nil
+ }
+
+ for _, link := range links {
+ link.Close()
+ }
+ links = []link.Link{}
+ p.enable = false
+ p.once = sync.Once{}
+ metricsMap = map[string]map[uint32]uint64{}
+ delete(p.enabledProbes, probeType)
return nil
}
@@ -100,9 +115,10 @@ func (p *VirtcmdLatencyProbe) Register(receiver chan<- proto.RawEvent) error {
return nil
}
-func (p *VirtcmdLatencyProbe) Start(ctx context.Context) {
+func (p *VirtcmdLatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) {
// metric and events both start probe
if p.enable {
+ p.enabledProbes[probeType] = true
return
}
p.once.Do(func() {
@@ -115,9 +131,10 @@ func (p *VirtcmdLatencyProbe) Start(ctx context.Context) {
})
if !p.enable {
- // if load failed, do nat start process
+ // if load failed, do not start process
return
}
+ p.enabledProbes[probeType] = true
reader, err := perf.NewReader(objs.bpfMaps.InspVirtcmdlatEvents, int(unsafe.Sizeof(bpfInspVirtcmdlatEventT{})))
if err != nil {
diff --git a/pkg/exporter/proto/proto.go b/pkg/exporter/proto/proto.go
index 9cec39a6..9aba8ca1 100644
--- a/pkg/exporter/proto/proto.go
+++ b/pkg/exporter/proto/proto.go
@@ -6,6 +6,13 @@ import (
//go:generate protoc --go_out=. ./inspector.proto
+type ProbeType string
+
+var (
+ ProbeTypeMetrics ProbeType = "metrics"
+ ProbeTypeEvent ProbeType = "event"
+)
+
type RawEvent struct {
Netns uint32
EventType string
@@ -13,8 +20,8 @@ type RawEvent struct {
}
type Probe interface {
- Start(ctx context.Context)
- Close() error
+ Start(ctx context.Context, probeType ProbeType)
+ Close(probeType ProbeType) error
Ready() bool
Name() string
}