From 233b41ba6af37f32f7109da2ba46e3c74c7add3d Mon Sep 17 00:00:00 2001 From: "xiayu.lyt" Date: Thu, 31 Aug 2023 21:05:20 +0800 Subject: [PATCH] feat(exporter): support config hot reload Signed-off-by: xiayu.lyt --- go.mod | 2 +- pkg/exporter/cmd/diag_event.go | 2 +- pkg/exporter/cmd/eventserver.go | 172 +++++++++++++----- pkg/exporter/cmd/metricserver.go | 74 +++++++- pkg/exporter/cmd/server.go | 57 ++++-- pkg/exporter/loki/client.go | 6 + pkg/exporter/nettop/cache.go | 2 +- pkg/exporter/probe/flow/flow.go | 4 +- pkg/exporter/probe/metric.go | 2 +- .../probe/nlconntrack/conntrackevents.go | 5 +- pkg/exporter/probe/nlqdisc/nlqdiscstats.go | 6 +- pkg/exporter/probe/procfd/procfd.go | 6 +- pkg/exporter/probe/procio/procio.go | 6 +- .../probe/procipvs/ipvsservicestats.go | 6 +- pkg/exporter/probe/procnetdev/procnetdev.go | 6 +- pkg/exporter/probe/procnetstat/procnetstat.go | 6 +- pkg/exporter/probe/procsnmp/procsnmp.go | 6 +- pkg/exporter/probe/procsock/procsock.go | 6 +- pkg/exporter/probe/procsoftnet/procsoftnet.go | 6 +- pkg/exporter/probe/proctcpsummary/proctcp.go | 6 +- .../probe/tracebiolatency/tracebiolatency.go | 8 +- pkg/exporter/probe/tracekernel/tracekernel.go | 44 +++-- .../tracenetiftxlatency.go | 46 +++-- .../probe/tracenetsoftirq/tracenetsoftirq.go | 42 +++-- .../probe/tracepacketloss/packetloss.go | 40 ++-- .../probe/tracesocketlatency/socketlatency.go | 42 +++-- .../probe/tracetcpreset/tracetcpreset.go | 44 +++-- .../probe/tracevirtcmdlat/tracevirtcmdlat.go | 43 +++-- pkg/exporter/proto/proto.go | 11 +- 29 files changed, 516 insertions(+), 190 deletions(-) diff --git a/go.mod b/go.mod index dfef27b0..897d279e 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/cilium/ebpf v0.9.3 github.com/containerd/containerd v1.6.15 github.com/docker/docker v20.10.22+incompatible + github.com/fsnotify/fsnotify v1.6.0 github.com/golang/snappy v0.0.4 github.com/google/gops v0.3.26 github.com/google/uuid v1.3.0 @@ -100,7 +101,6 @@ require ( github.com/emicklei/go-restful v2.11.2-0.20200112161605-a7c079c43d51+incompatible // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect diff --git a/pkg/exporter/cmd/diag_event.go b/pkg/exporter/cmd/diag_event.go index b45b1607..953cbff1 100644 --- a/pkg/exporter/cmd/diag_event.go +++ b/pkg/exporter/cmd/diag_event.go @@ -50,7 +50,7 @@ var ( continue } - go pb.Start(cmd.Context()) + go pb.Start(cmd.Context(), "") go func() { for evt := range ch { ets, err := nettop.GetEntityByNetns(int(evt.Netns)) diff --git a/pkg/exporter/cmd/eventserver.go b/pkg/exporter/cmd/eventserver.go index 22093c9f..7c79123d 100644 --- a/pkg/exporter/cmd/eventserver.go +++ b/pkg/exporter/cmd/eventserver.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/samber/lo" + "sync" "time" @@ -19,28 +21,26 @@ import ( type EServer struct { proto.UnimplementedInspectorServer - probes map[string]proto.EventProbe - cpool map[string]chan<- proto.RawEvent - mtx sync.Mutex - ctx context.Context - control chan struct{} - config EventConfig + probes map[string]proto.EventProbe + subscribers map[string]chan<- proto.RawEvent + mtx sync.Mutex + ctx context.Context + control chan struct{} + config EventConfig + eventChan chan proto.RawEvent + lokiDatach chan proto.RawEvent + lokiIngester *lokiwrapper.Ingester } func NewEServer(ctx context.Context, config EventConfig) *EServer { es := &EServer{ - probes: make(map[string]proto.EventProbe), - cpool: make(map[string]chan<- proto.RawEvent), - config: config, - mtx: sync.Mutex{}, - ctx: ctx, - control: make(chan struct{}), - } - - if len(config.Probes) == 0 { - // if no probes configured, keep loop channel empty - slog.Ctx(ctx).Info("new eserver with no probe required") - return es + probes: make(map[string]proto.EventProbe), + subscribers: make(map[string]chan<- proto.RawEvent), + config: config, + mtx: sync.Mutex{}, + ctx: ctx, + control: make(chan struct{}), + eventChan: make(chan proto.RawEvent), } for _, p := range config.Probes { @@ -50,28 +50,126 @@ func NewEServer(ctx context.Context, config EventConfig) *EServer { continue } es.probes[p] = ep - go ep.Start(ctx) + err := ep.Register(es.eventChan) + if err != nil { + slog.Ctx(ctx).Warn("probe register failed", "probe", p) + continue + } + go ep.Start(ctx, proto.ProbeTypeEvent) slog.Ctx(ctx).Debug("eserver start", "subject", p) } // start cache loop - slog.Ctx(ctx).Debug("new eserver start cache loop") + slog.Ctx(ctx).Debug("new eserver start dispatch loop") go es.dispatcher(ctx, es.control) + err := es.enableLoki() + if err != nil { + slog.Ctx(ctx).Warn("enable loki failed", "err", err) + } + return es +} + +func (e *EServer) enableLoki() error { + if e.lokiIngester != nil { + return nil + } + // handle grafana loki ingester preparation - if config.LokiEnable && config.LokiAddress != "" { + if e.config.LokiEnable && e.config.LokiAddress != "" { + slog.Ctx(e.ctx).Debug("enabling loki ingester", "address", e.config.LokiAddress) datach := make(chan proto.RawEvent) - ingester, err := lokiwrapper.NewLokiIngester(ctx, config.LokiAddress, nettop.GetNodeName()) + ingester, err := lokiwrapper.NewLokiIngester(e.ctx, e.config.LokiAddress, nettop.GetNodeName()) if err != nil { - slog.Ctx(ctx).Info("new loki ingester", "err", err, "client", ingester.Name()) + slog.Ctx(e.ctx).Info("new loki ingester", "err", err, "client", ingester.Name()) } else { - es.subscribe(ingester.Name(), datach) - go ingester.Watch(ctx, datach) + e.subscribe(ingester.Name(), datach) + go ingester.Watch(e.ctx, datach) } + e.lokiDatach = datach + e.lokiIngester = ingester + } + return nil +} + +func (e *EServer) disableLoki() error { + if e.lokiIngester == nil { + return nil } - return es + slog.Ctx(e.ctx).Debug("disabling loki ingester") + e.unsubscribe(e.lokiIngester.Name()) + + err := e.lokiIngester.Close() + if err != nil { + return err + } + + close(e.lokiDatach) + e.lokiIngester = nil + e.lokiDatach = nil + return nil +} + +func (e *EServer) Reload(config EventConfig) error { + enabled := lo.Keys(e.probes) + toClose, toStart := lo.Difference(enabled, config.Probes) + slog.Ctx(e.ctx).Info("reload event probes", "close", toClose, "enable", toStart) + + for _, n := range toClose { + p, ok := e.probes[n] + if !ok { + slog.Ctx(e.ctx).Warn("probe not found in enabled probes, skip.", "probe", n) + continue + } + + err := p.Close(proto.ProbeTypeEvent) + if err != nil { + slog.Ctx(e.ctx).Warn("close probe error", "probe", n, "err", err) + continue + } + + // clear event channel + err = p.Register(nil) + if err != nil { + slog.Ctx(e.ctx).Warn("unregister probe error", "probe", n, "err", err) + continue + } + + delete(e.probes, n) + } + + for _, n := range toStart { + p := probe.GetEventProbe(n) + if p == nil { + slog.Ctx(e.ctx).Info("get event probe nil", "probe", p) + continue + } + e.probes[n] = p + go p.Start(e.ctx, proto.ProbeTypeEvent) + slog.Ctx(e.ctx).Debug("eserver start", "subject", p) + + err := p.Register(e.eventChan) + if err != nil { + slog.Ctx(e.ctx).Info("register receiver", "probe", p, "err", err) + continue + } + } + + e.config = config + + if config.LokiEnable { + if err := e.enableLoki(); err != nil { + slog.Ctx(e.ctx).Warn("enable loki error", "err", err) + } + } else { + if err := e.disableLoki(); err != nil { + slog.Ctx(e.ctx).Warn("disable loki error", "err", err) + } + } + + return nil } func (e *EServer) WatchEvent(_ *proto.WatchRequest, srv proto.Inspector_WatchEventServer) error { @@ -109,35 +207,23 @@ func (e *EServer) subscribe(client string, ch chan<- proto.RawEvent) { e.mtx.Lock() defer e.mtx.Unlock() - e.cpool[client] = ch + e.subscribers[client] = ch } func (e *EServer) unsubscribe(client string) { e.mtx.Lock() defer e.mtx.Unlock() - delete(e.cpool, client) + delete(e.subscribers, client) } func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) { - pbs := e.probes - receiver := make(chan proto.RawEvent) - for p, pb := range pbs { - err := pb.Register(receiver) - if err != nil { - slog.Ctx(ctx).Info("register receiver", "probe", p, "err", err) - continue - } - } - - slog.Ctx(ctx).Debug("dispatcher", "probes", pbs) for { select { - case <-stopc: - slog.Ctx(ctx).Debug("dispatcher exit of sop signal", "probes", pbs) + slog.Ctx(ctx).Debug("event dispatcher exited because of stop signal") return - case evt := <-receiver: + case evt := <-e.eventChan: err := e.broadcast(evt) if err != nil { slog.Ctx(ctx).Info("dispatcher broadcast", "err", err, "event", evt) @@ -149,7 +235,7 @@ func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) { } func (e *EServer) broadcast(evt proto.RawEvent) error { - pbs := e.cpool + pbs := e.subscribers ctx, cancelf := context.WithTimeout(e.ctx, 5*time.Second) defer cancelf() diff --git a/pkg/exporter/cmd/metricserver.go b/pkg/exporter/cmd/metricserver.go index 45458360..04a6190b 100644 --- a/pkg/exporter/cmd/metricserver.go +++ b/pkg/exporter/cmd/metricserver.go @@ -3,6 +3,9 @@ package cmd import ( "context" "fmt" + "sync" + + "github.com/samber/lo" "strings" "time" @@ -34,16 +37,12 @@ var ( func NewMServer(ctx context.Context, config MetricConfig) *MServer { ms := &MServer{ ctx: ctx, + mtx: sync.Mutex{}, descs: make(map[string]*prometheus.Desc), config: config, probes: make(map[string]proto.MetricProbe), metricCache: cache.New(3*cacheUpdateInterval, 10*cacheUpdateInterval), - } - - if len(config.Probes) == 0 { - // if no probes configured, keep loop channel empty - slog.Ctx(ctx).Info("new mserver with no probe required") - return ms + loopctrl: make(chan struct{}), } for _, p := range config.Probes { @@ -53,7 +52,7 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer { continue } ms.probes[p] = mp - go mp.Start(ctx) + go mp.Start(ctx, proto.ProbeTypeMetrics) slog.Ctx(ctx).Debug("new mserver add subject", "subject", p) } @@ -77,7 +76,6 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer { } // start cache loop slog.Ctx(ctx).Debug("new mserver start cache loop") - ms.loopctrl = make(chan struct{}) go ms.collectLoop(ctx, cacheUpdateInterval, ms.loopctrl) return ms @@ -85,6 +83,7 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer { type MServer struct { ctx context.Context + mtx sync.Mutex descs map[string]*prometheus.Desc config MetricConfig metricCache *cache.Cache @@ -104,7 +103,61 @@ func (s *MServer) Close() { } } +func (s *MServer) Reload(config MetricConfig) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + enabled := lo.Keys(s.probes) + toClose, toStart := lo.Difference(enabled, config.Probes) + slog.Ctx(s.ctx).Info("reload metric probes", "close", toClose, "enable", toStart) + + for _, n := range toClose { + p, ok := s.probes[n] + if !ok { + slog.Ctx(s.ctx).Warn("probe not found in enabled probes, skip.", "probe", n) + continue + } + + err := p.Close(proto.ProbeTypeMetrics) + if err != nil { + slog.Ctx(s.ctx).Warn("close probe error", "probe", n, "err", err) + continue + } + delete(s.probes, n) + } + + for _, n := range toStart { + p := probe.GetProbe(n) + if p == nil { + slog.Ctx(s.ctx).Info("get metric probe nil", "probe", n) + continue + } + s.probes[n] = p + go p.Start(s.ctx, proto.ProbeTypeMetrics) + slog.Ctx(s.ctx).Debug("new mserver add subject", "subject", n) + } + + for sub, mp := range s.probes { + mnames := mp.GetMetricNames() + for _, mname := range mnames { + if !strings.HasPrefix(mname, sub) { + continue + } + if s.config.Verbose { + s.descs[mname] = getDescOfMetricVerbose(sub, mname, s.additionalLabels) + } else { + s.descs[mname] = getDescOfMetric(sub, mname) + } + } + } + + s.config = config + return nil +} + func (s *MServer) Collect(ch chan<- prometheus.Metric) { + s.mtx.Lock() + defer s.mtx.Unlock() slog.Ctx(s.ctx).Debug("metric server collect request in", "metric count", len(s.descs)) for mname, desc := range s.descs { data, err := s.collectOnceCache(s.ctx, mname) @@ -204,6 +257,11 @@ func (s *MServer) collectLoop(ctx context.Context, interval time.Duration, stopc // collectWorkerSerial collect metric data in serial func (s *MServer) collectWorkerSerial(ctx context.Context) error { + s.mtx.Lock() + defer s.mtx.Unlock() + if len(s.probes) == 0 { + return nil + } slog.Ctx(s.ctx).Debug("collect worker serial start") workdone := make(chan struct{}) cstart := time.Now() diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go index 0d252fa7..ff03e029 100644 --- a/pkg/exporter/cmd/server.go +++ b/pkg/exporter/cmd/server.go @@ -13,6 +13,8 @@ import ( "os/signal" "syscall" + "github.com/fsnotify/fsnotify" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/proto" @@ -68,10 +70,14 @@ var ( defer nettop.StopCache() // config hot reload process - // insp.v.OnConfigChange(func(e fsnotify.Event) { - - // }) - // insp.v.WatchConfig() + insp.v.OnConfigChange(func(e fsnotify.Event) { + log.Ctx(insp.ctx).Info("Start reload config") + if err := insp.reload(); err != nil { + log.Ctx(insp.ctx).Warn("Reload config error", "err", err) + } + log.Ctx(insp.ctx).Info("Config reload succeed.") + }) + insp.v.WatchConfig() // block here err = insp.start() @@ -120,9 +126,11 @@ type EventConfig struct { } type inspServer struct { - v viper.Viper - config inspServerConfig - ctx context.Context + v viper.Viper + config inspServerConfig + ctx context.Context + mserver *MServer + eserver *EServer } func (i *inspServer) MergeConfig() error { @@ -148,22 +156,43 @@ func (i *inspServer) MergeConfig() error { return nil } +func (i *inspServer) reload() error { + cfg := inspServerConfig{} + err := i.v.Unmarshal(&cfg) + if err != nil { + return err + } + + err = i.mserver.Reload(cfg.Mconfig) + if err != nil { + return fmt.Errorf("reload metric server error: %s", err) + } + + err = i.eserver.Reload(cfg.Econfig) + if err != nil { + return fmt.Errorf("reload event server error: %s", err) + } + + i.config = cfg + return nil +} + func (i *inspServer) start() error { if err := gops.Listen(gops.Options{}); err != nil { log.Ctx(i.ctx).Info("start gops", "err", err) } go func() { - ms := NewMServer(i.ctx, i.config.Mconfig) - defer ms.Close() + i.mserver = NewMServer(i.ctx, i.config.Mconfig) + defer i.mserver.Close() r := prometheus.NewRegistry() - r.MustRegister(ms) + r.MustRegister(i.mserver) handler := promhttp.HandlerFor(prometheus.Gatherers{ r, }, promhttp.HandlerOpts{}) http.Handle("/metrics", handler) - http.Handle("/", http.HandlerFunc(defaulPage)) + http.Handle("/", http.HandlerFunc(defaultPage)) http.Handle("/config", http.HandlerFunc(i.configPage)) http.Handle("/status", http.HandlerFunc(status)) if i.config.DebugMode { @@ -185,8 +214,8 @@ func (i *inspServer) start() error { go func() { s := grpc.NewServer() - e := NewEServer(i.ctx, i.config.Econfig) - proto.RegisterInspectorServer(s, e) + i.eserver = NewEServer(i.ctx, i.config.Econfig) + proto.RegisterInspectorServer(s, i.eserver) listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", i.config.Econfig.Port)) if err != nil { log.Ctx(i.ctx).Warn("inspector start event server", "port", i.config.Econfig.Port, "err", err) @@ -211,7 +240,7 @@ func WaitSignals(ctx context.Context, sgs ...os.Signal) { <-s } -func defaulPage(w http.ResponseWriter, _ *http.Request) { +func defaultPage(w http.ResponseWriter, _ *http.Request) { // nolint w.Write([]byte(` Net Exporter diff --git a/pkg/exporter/loki/client.go b/pkg/exporter/loki/client.go index d5a2eb08..1bb6ece2 100644 --- a/pkg/exporter/loki/client.go +++ b/pkg/exporter/loki/client.go @@ -95,6 +95,12 @@ func (i *Ingester) Name() string { return i.name } +func (i *Ingester) Close() error { + i.quit <- struct{}{} + i.waitGroup.Wait() + return nil +} + type Ingester struct { name string PushURL string diff --git a/pkg/exporter/nettop/cache.go b/pkg/exporter/nettop/cache.go index 9889abf0..c7d3cfa0 100644 --- a/pkg/exporter/nettop/cache.go +++ b/pkg/exporter/nettop/cache.go @@ -158,7 +158,7 @@ func (e *Entity) GetPids() []int { } func StartCache(ctx context.Context) error { - slog.Ctx(ctx).Info("nettop cache loop statrt", "interval", cacheUpdateInterval) + slog.Ctx(ctx).Info("nettop cache loop start", "interval", cacheUpdateInterval) return cacheDaemonLoop(ctx, control) } diff --git a/pkg/exporter/probe/flow/flow.go b/pkg/exporter/probe/flow/flow.go index da3cf859..e8d7b226 100644 --- a/pkg/exporter/probe/flow/flow.go +++ b/pkg/exporter/probe/flow/flow.go @@ -39,7 +39,7 @@ func GetProbe() proto.MetricProbe { return probe } -func (f *Probe) Start(_ context.Context) { +func (f *Probe) Start(_ context.Context, _ proto.ProbeType) { log.Info("flow probe starting...") eth0, err := netlink.LinkByName(dev) @@ -197,7 +197,7 @@ func replaceQdisc(link netlink.Link) error { return netlink.QdiscReplace(qdisc) } -func (f *Probe) Close() error { +func (f *Probe) Close(_ proto.ProbeType) error { if f.enable { return bpfObjs.Close() } diff --git a/pkg/exporter/probe/metric.go b/pkg/exporter/probe/metric.go index 991fdfe7..1739c26b 100644 --- a/pkg/exporter/probe/metric.go +++ b/pkg/exporter/probe/metric.go @@ -43,7 +43,7 @@ func init() { availmprobes["socketlatency"] = tracesocketlatency.GetProbe() availmprobes["packetloss"] = tracepacketloss.GetProbe() availmprobes["net_softirq"] = tracenetsoftirq.GetProbe() - availmprobes["netiftxlat"] = tracenetif.GetProbe() + availmprobes["netiftxlatency"] = tracenetif.GetProbe() availmprobes["kernellatency"] = tracekernel.GetProbe() availmprobes["tcpsummary"] = proctcpsummary.GetProbe() availmprobes["virtcmdlatency"] = tracevirtcmdlat.GetProbe() diff --git a/pkg/exporter/probe/nlconntrack/conntrackevents.go b/pkg/exporter/probe/nlconntrack/conntrackevents.go index 5f85f874..6be421e0 100644 --- a/pkg/exporter/probe/nlconntrack/conntrackevents.go +++ b/pkg/exporter/probe/nlconntrack/conntrackevents.go @@ -58,7 +58,7 @@ func (p *Probe) Ready() bool { return true } -func (p *Probe) Close() error { +func (p *Probe) Close(_ proto.ProbeType) error { p.mtx.Lock() defer p.mtx.Unlock() @@ -71,8 +71,7 @@ func (p *Probe) GetEventNames() []string { return events } -func (p *Probe) Start(ctx context.Context) { - +func (p *Probe) Start(ctx context.Context, _ proto.ProbeType) { p.mtx.Lock() p.enable = true p.mtx.Unlock() diff --git a/pkg/exporter/probe/nlqdisc/nlqdiscstats.go b/pkg/exporter/probe/nlqdisc/nlqdiscstats.go index b6e1644e..f8337ccc 100644 --- a/pkg/exporter/probe/nlqdisc/nlqdiscstats.go +++ b/pkg/exporter/probe/nlqdisc/nlqdiscstats.go @@ -10,6 +10,8 @@ import ( "sync" "syscall" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/mdlayher/netlink" @@ -69,7 +71,7 @@ func (p *Probe) Name() string { return ModuleName } -func (p *Probe) Start(_ context.Context) { +func (p *Probe) Start(_ context.Context, _ proto.ProbeType) { } func (p *Probe) Ready() bool { @@ -96,7 +98,7 @@ func (p *Probe) GetMetricNames() []string { return res } -func (p *Probe) Close() error { +func (p *Probe) Close(_ proto.ProbeType) error { p.mtx.Lock() defer p.mtx.Unlock() diff --git a/pkg/exporter/probe/procfd/procfd.go b/pkg/exporter/probe/procfd/procfd.go index 3b1c4ec0..003444e1 100644 --- a/pkg/exporter/probe/procfd/procfd.go +++ b/pkg/exporter/probe/procfd/procfd.go @@ -6,6 +6,8 @@ import ( "os" "strings" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "golang.org/x/exp/slog" @@ -28,11 +30,11 @@ func GetProbe() *ProcFD { return probe } -func (s *ProcFD) Close() error { +func (s *ProcFD) Close(_ proto.ProbeType) error { return nil } -func (s *ProcFD) Start(_ context.Context) { +func (s *ProcFD) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcFD) Ready() bool { diff --git a/pkg/exporter/probe/procio/procio.go b/pkg/exporter/probe/procio/procio.go index 656292d2..6122e4ad 100644 --- a/pkg/exporter/probe/procio/procio.go +++ b/pkg/exporter/probe/procio/procio.go @@ -7,6 +7,8 @@ import ( "os" "strings" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/prometheus/procfs" @@ -35,11 +37,11 @@ func GetProbe() *ProcIO { return probe } -func (s *ProcIO) Close() error { +func (s *ProcIO) Close(_ proto.ProbeType) error { return nil } -func (s *ProcIO) Start(_ context.Context) { +func (s *ProcIO) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcIO) Ready() bool { diff --git a/pkg/exporter/probe/procipvs/ipvsservicestats.go b/pkg/exporter/probe/procipvs/ipvsservicestats.go index 641f7f8d..58267f27 100644 --- a/pkg/exporter/probe/procipvs/ipvsservicestats.go +++ b/pkg/exporter/probe/procipvs/ipvsservicestats.go @@ -9,6 +9,8 @@ import ( "os" "strconv" "strings" + + "github.com/alibaba/kubeskoop/pkg/exporter/proto" ) const maxBufferSize = 1024 * 1024 @@ -52,11 +54,11 @@ func (p *ProcIPVS) Name() string { return ModuleName } -func (p *ProcIPVS) Close() error { +func (p *ProcIPVS) Close(_ proto.ProbeType) error { return nil } -func (p *ProcIPVS) Start(_ context.Context) { +func (p *ProcIPVS) Start(_ context.Context, _ proto.ProbeType) { } func (p *ProcIPVS) Ready() bool { diff --git a/pkg/exporter/probe/procnetdev/procnetdev.go b/pkg/exporter/probe/procnetdev/procnetdev.go index 1efb82de..89df0cd2 100644 --- a/pkg/exporter/probe/procnetdev/procnetdev.go +++ b/pkg/exporter/probe/procnetdev/procnetdev.go @@ -7,6 +7,8 @@ import ( "strings" "sync" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/prometheus/procfs" @@ -43,11 +45,11 @@ func GetProbe() *ProcNetdev { return probe } -func (s *ProcNetdev) Close() error { +func (s *ProcNetdev) Close(_ proto.ProbeType) error { return nil } -func (s *ProcNetdev) Start(_ context.Context) { +func (s *ProcNetdev) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcNetdev) Ready() bool { diff --git a/pkg/exporter/probe/procnetstat/procnetstat.go b/pkg/exporter/probe/procnetstat/procnetstat.go index 8838ed1c..2d86faed 100644 --- a/pkg/exporter/probe/procnetstat/procnetstat.go +++ b/pkg/exporter/probe/procnetstat/procnetstat.go @@ -10,6 +10,8 @@ import ( "strconv" "strings" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "golang.org/x/exp/slog" @@ -103,11 +105,11 @@ func GetProbe() *ProcNetstat { return probe } -func (s *ProcNetstat) Close() error { +func (s *ProcNetstat) Close(_ proto.ProbeType) error { return nil } -func (s *ProcNetstat) Start(_ context.Context) { +func (s *ProcNetstat) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcNetstat) Ready() bool { diff --git a/pkg/exporter/probe/procsnmp/procsnmp.go b/pkg/exporter/probe/procsnmp/procsnmp.go index 9c2f7338..1cfb81db 100644 --- a/pkg/exporter/probe/procsnmp/procsnmp.go +++ b/pkg/exporter/probe/procsnmp/procsnmp.go @@ -10,6 +10,8 @@ import ( "strings" "sync" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "golang.org/x/exp/slog" @@ -80,11 +82,11 @@ func GetProbe() *ProcSNMP { type ProcSNMP struct { } -func (s *ProcSNMP) Close() error { +func (s *ProcSNMP) Close(_ proto.ProbeType) error { return nil } -func (s *ProcSNMP) Start(_ context.Context) { +func (s *ProcSNMP) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcSNMP) Ready() bool { diff --git a/pkg/exporter/probe/procsock/procsock.go b/pkg/exporter/probe/procsock/procsock.go index 594dc7ea..371a670d 100644 --- a/pkg/exporter/probe/procsock/procsock.go +++ b/pkg/exporter/probe/procsock/procsock.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "io" "os" "strconv" @@ -36,11 +38,11 @@ func GetProbe() *ProcSock { return probe } -func (s *ProcSock) Close() error { +func (s *ProcSock) Close(_ proto.ProbeType) error { return nil } -func (s *ProcSock) Start(_ context.Context) { +func (s *ProcSock) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcSock) Ready() bool { diff --git a/pkg/exporter/probe/procsoftnet/procsoftnet.go b/pkg/exporter/probe/procsoftnet/procsoftnet.go index e2bcd693..be959b51 100644 --- a/pkg/exporter/probe/procsoftnet/procsoftnet.go +++ b/pkg/exporter/probe/procsoftnet/procsoftnet.go @@ -5,6 +5,8 @@ import ( "context" "fmt" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "io" "os" "strconv" @@ -39,11 +41,11 @@ func GetProbe() *ProcSoftnet { return probe } -func (s *ProcSoftnet) Close() error { +func (s *ProcSoftnet) Close(_ proto.ProbeType) error { return nil } -func (s *ProcSoftnet) Start(_ context.Context) { +func (s *ProcSoftnet) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcSoftnet) Ready() bool { diff --git a/pkg/exporter/probe/proctcpsummary/proctcp.go b/pkg/exporter/probe/proctcpsummary/proctcp.go index 1d46856c..c2e1f93a 100644 --- a/pkg/exporter/probe/proctcpsummary/proctcp.go +++ b/pkg/exporter/probe/proctcpsummary/proctcp.go @@ -6,6 +6,8 @@ import ( "encoding/hex" "fmt" + "github.com/alibaba/kubeskoop/pkg/exporter/proto" + "io" "net" "os" @@ -79,11 +81,11 @@ func GetProbe() *ProcTCP { return probe } -func (s *ProcTCP) Close() error { +func (s *ProcTCP) Close(_ proto.ProbeType) error { return nil } -func (s *ProcTCP) Start(_ context.Context) { +func (s *ProcTCP) Start(_ context.Context, _ proto.ProbeType) { } func (s *ProcTCP) Ready() bool { diff --git a/pkg/exporter/probe/tracebiolatency/tracebiolatency.go b/pkg/exporter/probe/tracebiolatency/tracebiolatency.go index 3968f11c..7c973a30 100644 --- a/pkg/exporter/probe/tracebiolatency/tracebiolatency.go +++ b/pkg/exporter/probe/tracebiolatency/tracebiolatency.go @@ -61,12 +61,14 @@ func (p *BiolatencyProbe) Ready() bool { return p.enable } -func (p *BiolatencyProbe) Close() error { +func (p *BiolatencyProbe) Close(_ proto.ProbeType) error { if p.enable { for _, link := range links { link.Close() } links = []link.Link{} + p.enable = false + p.once = sync.Once{} } if perfReader != nil { @@ -81,7 +83,7 @@ func (p *BiolatencyProbe) GetEventNames() []string { return events } -func (p *BiolatencyProbe) Start(ctx context.Context) { +func (p *BiolatencyProbe) Start(ctx context.Context, _ proto.ProbeType) { p.once.Do(func() { err := start() if err != nil { @@ -92,7 +94,7 @@ func (p *BiolatencyProbe) Start(ctx context.Context) { }) if !p.enable { - // if load failed, do nat start process + // if load failed, do not start process return } diff --git a/pkg/exporter/probe/tracekernel/tracekernel.go b/pkg/exporter/probe/tracekernel/tracekernel.go index d645a97d..cd24bb2e 100644 --- a/pkg/exporter/probe/tracekernel/tracekernel.go +++ b/pkg/exporter/probe/tracekernel/tracekernel.go @@ -46,7 +46,7 @@ const ( var ( ModuleName = "insp_kernellatency" // nolint - probe = &KernelLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}} + probe = &KernelLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}} objs = bpfObjects{} links = []link.Link{} @@ -66,10 +66,11 @@ func init() { } type KernelLatencyProbe struct { - enable bool - sub chan<- proto.RawEvent - once sync.Once - mtx sync.Mutex + enable bool + sub chan<- proto.RawEvent + once sync.Once + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *KernelLatencyProbe) Name() string { @@ -84,14 +85,28 @@ func (p *KernelLatencyProbe) GetEventNames() []string { return events } -func (p *KernelLatencyProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} +func (p *KernelLatencyProbe) Close(probeType proto.ProbeType) error { + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil + } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil } + for _, link := range links { + link.Close() + } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + metricsMap = map[string]map[uint32]uint64{} + + delete(p.enabledProbes, probeType) return nil } @@ -111,10 +126,12 @@ func (p *KernelLatencyProbe) Collect(_ context.Context) (map[string]map[uint32]u return metricsMap, nil } -func (p *KernelLatencyProbe) Start(ctx context.Context) { +func (p *KernelLatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) { if p.enable { + p.enabledProbes[probeType] = true return } + p.once.Do(func() { err := loadSync() if err != nil { @@ -125,9 +142,10 @@ func (p *KernelLatencyProbe) Start(ctx context.Context) { }) if !p.enable { - // if load failed, do nat start process + // if load failed, do not start process return } + p.enabledProbes[probeType] = true go p.startRX(ctx) // go p.startTX(ctx) diff --git a/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go b/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go index f72168b7..b0c7c14e 100644 --- a/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go +++ b/pkg/exporter/probe/tracenetiftxlatency/tracenetiftxlatency.go @@ -34,7 +34,7 @@ const ( var ( ModuleName = "insp_netiftxlat" // nolint - probe = &NetifTxlatencyProbe{once: sync.Once{}} + probe = &NetifTxlatencyProbe{once: sync.Once{}, enabledProbes: map[proto.ProbeType]bool{}} links = []link.Link{} events = []string{"TXLAT_QDISC_100MS", "TXLAT_NETDEV_100MS"} metrics = []string{TXLAT_QDISC_SLOW, TXLAT_NETDEV_SLOW} @@ -54,18 +54,20 @@ func init() { } type NetifTxlatencyProbe struct { - enable bool - once sync.Once - sub chan<- proto.RawEvent - mtx sync.Mutex + enable bool + once sync.Once + sub chan<- proto.RawEvent + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *NetifTxlatencyProbe) Name() string { return ModuleName } -func (p *NetifTxlatencyProbe) Start(ctx context.Context) { +func (p *NetifTxlatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) { if p.enable { + p.enabledProbes[probeType] = true return } @@ -80,9 +82,10 @@ func (p *NetifTxlatencyProbe) Start(ctx context.Context) { }) if !p.enable { - // if load failed, do nat start process + // if load failed, do not start process return } + p.enabledProbes[probeType] = true slog.Debug("start probe", "module", ModuleName) if perfReader == nil { @@ -152,19 +155,32 @@ func (p *NetifTxlatencyProbe) Ready() bool { return p.enable } -func (p *NetifTxlatencyProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} - } - +func (p *NetifTxlatencyProbe) Close(probeType proto.ProbeType) error { if perfReader != nil { perfReader.Close() perfReader = nil } + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil + } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil + } + + for _, link := range links { + link.Close() + } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + metricsMap = map[string]map[uint32]uint64{} + return nil } diff --git a/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go b/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go index 44d53d10..ebf7525a 100644 --- a/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go +++ b/pkg/exporter/probe/tracenetsoftirq/tracenetsoftirq.go @@ -30,7 +30,7 @@ const ( var ( ModuleName = "insp_netsoftirq" // nolint - probe = &NetSoftirqProbe{once: sync.Once{}, mtx: sync.Mutex{}} + probe = &NetSoftirqProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}} objs = bpfObjects{} links = []link.Link{} metricsMap = map[string]map[uint32]uint64{} @@ -52,10 +52,11 @@ func init() { } type NetSoftirqProbe struct { - enable bool - sub chan<- proto.RawEvent - once sync.Once - mtx sync.Mutex + enable bool + sub chan<- proto.RawEvent + once sync.Once + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *NetSoftirqProbe) Name() string { @@ -75,23 +76,37 @@ func (p *NetSoftirqProbe) GetMetricNames() []string { } func (p *NetSoftirqProbe) Collect(_ context.Context) (map[string]map[uint32]uint64, error) { - return metricsMap, nil } -func (p *NetSoftirqProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} +func (p *NetSoftirqProbe) Close(probeType proto.ProbeType) error { + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil + } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil } + for _, link := range links { + link.Close() + } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + metricsMap = map[string]map[uint32]uint64{} + + delete(p.enabledProbes, probeType) return nil } -func (p *NetSoftirqProbe) Start(ctx context.Context) { +func (p *NetSoftirqProbe) Start(ctx context.Context, probeType proto.ProbeType) { if p.enable { + p.enabledProbes[probeType] = true return } @@ -108,6 +123,7 @@ func (p *NetSoftirqProbe) Start(ctx context.Context) { // if load failed, do not start process return } + p.enabledProbes[probeType] = true reader, err := perf.NewReader(objs.bpfMaps.InspSoftirqEvents, int(unsafe.Sizeof(bpfInspSoftirqEventT{}))) if err != nil { diff --git a/pkg/exporter/probe/tracepacketloss/packetloss.go b/pkg/exporter/probe/tracepacketloss/packetloss.go index fb2bf260..e237867a 100644 --- a/pkg/exporter/probe/tracepacketloss/packetloss.go +++ b/pkg/exporter/probe/tracepacketloss/packetloss.go @@ -48,7 +48,7 @@ var ( tcprcvSymbol = "tcp_v4_rcv" tcpdorcvSymbol = "tcp_v4_do_rcv" - probe = &PacketLossProbe{} + probe = &PacketLossProbe{enabledProbes: map[proto.ProbeType]bool{}} objs = bpfObjects{} links = []link.Link{} events = []string{PACKETLOSS} @@ -94,10 +94,11 @@ func GetProbe() *PacketLossProbe { } type PacketLossProbe struct { - enable bool - sub chan<- proto.RawEvent - once sync.Once - mtx sync.Mutex + enable bool + sub chan<- proto.RawEvent + once sync.Once + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *PacketLossProbe) Name() string { @@ -120,14 +121,27 @@ func (p *PacketLossProbe) GetMetricNames() []string { return res } -func (p *PacketLossProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} +func (p *PacketLossProbe) Close(probeType proto.ProbeType) error { + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil + } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil + } + + for _, link := range links { + link.Close() } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + delete(p.enabledProbes, probeType) return nil } @@ -215,8 +229,9 @@ func (p *PacketLossProbe) Collect(ctx context.Context) (map[string]map[uint32]ui return resMap, nil } -func (p *PacketLossProbe) Start(ctx context.Context) { +func (p *PacketLossProbe) Start(ctx context.Context, probeType proto.ProbeType) { if p.enable { + p.enabledProbes[probeType] = true return } @@ -233,6 +248,7 @@ func (p *PacketLossProbe) Start(ctx context.Context) { // if load failed, do not start process return } + p.enabledProbes[probeType] = true reader, err := perf.NewReader(objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{}))) if err != nil { diff --git a/pkg/exporter/probe/tracesocketlatency/socketlatency.go b/pkg/exporter/probe/tracesocketlatency/socketlatency.go index ad41f198..1a4b407a 100644 --- a/pkg/exporter/probe/tracesocketlatency/socketlatency.go +++ b/pkg/exporter/probe/tracesocketlatency/socketlatency.go @@ -55,7 +55,7 @@ const ( ) var ( - probe = &SocketLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}} + probe = &SocketLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}} objs = bpfObjects{} links = []link.Link{} events = []string{SOCKETLAT_READSLOW, SOCKETLAT_SENDSLOW} @@ -68,10 +68,11 @@ func GetProbe() *SocketLatencyProbe { } type SocketLatencyProbe struct { - enable bool - sub chan<- proto.RawEvent - once sync.Once - mtx sync.Mutex + enable bool + sub chan<- proto.RawEvent + once sync.Once + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *SocketLatencyProbe) Name() string { @@ -86,14 +87,27 @@ func (p *SocketLatencyProbe) GetEventNames() []string { return events } -func (p *SocketLatencyProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} +func (p *SocketLatencyProbe) Close(probeType proto.ProbeType) error { + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil + } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil + } + + for _, link := range links { + link.Close() } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + delete(p.enabledProbes, probeType) return nil } @@ -113,9 +127,10 @@ func (p *SocketLatencyProbe) Register(receiver chan<- proto.RawEvent) error { return nil } -func (p *SocketLatencyProbe) Start(ctx context.Context) { +func (p *SocketLatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) { // metric and events both start probe if p.enable { + p.enabledProbes[probeType] = true return } p.once.Do(func() { @@ -128,9 +143,10 @@ func (p *SocketLatencyProbe) Start(ctx context.Context) { }) if !p.enable { - // if load failed, do nat start process + // if load failed, do not start process return } + p.enabledProbes[probeType] = true p.startEventPoll(ctx) } diff --git a/pkg/exporter/probe/tracetcpreset/tracetcpreset.go b/pkg/exporter/probe/tracetcpreset/tracetcpreset.go index 2b1dc1fc..a482c404 100644 --- a/pkg/exporter/probe/tracetcpreset/tracetcpreset.go +++ b/pkg/exporter/probe/tracetcpreset/tracetcpreset.go @@ -36,7 +36,7 @@ const ( var ( ModuleName = "insp_tcpreset" // nolint objs = bpfObjects{} - probe = &TCPResetProbe{once: sync.Once{}, mtx: sync.Mutex{}} + probe = &TCPResetProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}} links = []link.Link{} events = []string{TCPRESET_NOSOCK, TCPRESET_ACTIVE, TCPRESET_PROCESS, TCPRESET_RECEIVE} @@ -47,10 +47,11 @@ func GetProbe() *TCPResetProbe { } type TCPResetProbe struct { - enable bool - sub chan<- proto.RawEvent - once sync.Once - mtx sync.Mutex + enable bool + sub chan<- proto.RawEvent + once sync.Once + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *TCPResetProbe) Name() string { @@ -65,14 +66,27 @@ func (p *TCPResetProbe) GetEventNames() []string { return events } -func (p *TCPResetProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} +func (p *TCPResetProbe) Close(probeType proto.ProbeType) error { + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil + } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil } + for _, link := range links { + link.Close() + } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + + delete(p.enabledProbes, probeType) return nil } @@ -84,7 +98,12 @@ func (p *TCPResetProbe) Register(receiver chan<- proto.RawEvent) error { return nil } -func (p *TCPResetProbe) Start(ctx context.Context) { +func (p *TCPResetProbe) Start(ctx context.Context, probeType proto.ProbeType) { + if p.enable { + p.enabledProbes[probeType] = true + return + } + p.once.Do(func() { err := loadSync() if err != nil { @@ -98,6 +117,7 @@ func (p *TCPResetProbe) Start(ctx context.Context) { // if load failed, do not start process return } + p.enabledProbes[probeType] = true reader, err := perf.NewReader(objs.bpfMaps.InspTcpresetEvents, int(unsafe.Sizeof(bpfInspTcpresetEventT{}))) if err != nil { diff --git a/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go b/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go index 3c13d8b2..7b68d118 100644 --- a/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go +++ b/pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go @@ -33,7 +33,7 @@ const ( ) var ( - probe = &VirtcmdLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}} + probe = &VirtcmdLatencyProbe{once: sync.Once{}, mtx: sync.Mutex{}, enabledProbes: map[proto.ProbeType]bool{}} objs = bpfObjects{} links = []link.Link{} events = []string{VIRTCMDEXCUTE} @@ -55,10 +55,11 @@ func init() { } type VirtcmdLatencyProbe struct { - enable bool - sub chan<- proto.RawEvent - once sync.Once - mtx sync.Mutex + enable bool + sub chan<- proto.RawEvent + once sync.Once + mtx sync.Mutex + enabledProbes map[proto.ProbeType]bool } func (p *VirtcmdLatencyProbe) Name() string { @@ -77,14 +78,28 @@ func (p *VirtcmdLatencyProbe) GetEventNames() []string { return events } -func (p *VirtcmdLatencyProbe) Close() error { - if p.enable { - for _, link := range links { - link.Close() - } - links = []link.Link{} +func (p *VirtcmdLatencyProbe) Close(probeType proto.ProbeType) error { + if !p.enable { + return nil + } + + if _, ok := p.enabledProbes[probeType]; !ok { + return nil } + if len(p.enabledProbes) > 1 { + delete(p.enabledProbes, probeType) + return nil + } + + for _, link := range links { + link.Close() + } + links = []link.Link{} + p.enable = false + p.once = sync.Once{} + metricsMap = map[string]map[uint32]uint64{} + delete(p.enabledProbes, probeType) return nil } @@ -100,9 +115,10 @@ func (p *VirtcmdLatencyProbe) Register(receiver chan<- proto.RawEvent) error { return nil } -func (p *VirtcmdLatencyProbe) Start(ctx context.Context) { +func (p *VirtcmdLatencyProbe) Start(ctx context.Context, probeType proto.ProbeType) { // metric and events both start probe if p.enable { + p.enabledProbes[probeType] = true return } p.once.Do(func() { @@ -115,9 +131,10 @@ func (p *VirtcmdLatencyProbe) Start(ctx context.Context) { }) if !p.enable { - // if load failed, do nat start process + // if load failed, do not start process return } + p.enabledProbes[probeType] = true reader, err := perf.NewReader(objs.bpfMaps.InspVirtcmdlatEvents, int(unsafe.Sizeof(bpfInspVirtcmdlatEventT{}))) if err != nil { diff --git a/pkg/exporter/proto/proto.go b/pkg/exporter/proto/proto.go index 9cec39a6..9aba8ca1 100644 --- a/pkg/exporter/proto/proto.go +++ b/pkg/exporter/proto/proto.go @@ -6,6 +6,13 @@ import ( //go:generate protoc --go_out=. ./inspector.proto +type ProbeType string + +var ( + ProbeTypeMetrics ProbeType = "metrics" + ProbeTypeEvent ProbeType = "event" +) + type RawEvent struct { Netns uint32 EventType string @@ -13,8 +20,8 @@ type RawEvent struct { } type Probe interface { - Start(ctx context.Context) - Close() error + Start(ctx context.Context, probeType ProbeType) + Close(probeType ProbeType) error Ready() bool Name() string }