Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exporter): support config hot reload #101

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/cilium/ebpf v0.9.3
github.com/containerd/containerd v1.6.15
github.com/docker/docker v20.10.22+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/snappy v0.0.4
github.com/google/gops v0.3.26
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
github.com/emicklei/go-restful v2.11.2-0.20200112161605-a7c079c43d51+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/cmd/diag_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
continue
}

go pb.Start(cmd.Context())
go pb.Start(cmd.Context(), "")
go func() {
for evt := range ch {
ets, err := nettop.GetEntityByNetns(int(evt.Netns))
Expand Down
172 changes: 129 additions & 43 deletions pkg/exporter/cmd/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/samber/lo"

"sync"
"time"

Expand All @@ -19,28 +21,26 @@ import (

type EServer struct {
proto.UnimplementedInspectorServer
probes map[string]proto.EventProbe
cpool map[string]chan<- proto.RawEvent
mtx sync.Mutex
ctx context.Context
control chan struct{}
config EventConfig
probes map[string]proto.EventProbe
subscribers map[string]chan<- proto.RawEvent
mtx sync.Mutex
ctx context.Context
control chan struct{}
config EventConfig
eventChan chan proto.RawEvent
lokiDatach chan proto.RawEvent
lokiIngester *lokiwrapper.Ingester
}

func NewEServer(ctx context.Context, config EventConfig) *EServer {
es := &EServer{
probes: make(map[string]proto.EventProbe),
cpool: make(map[string]chan<- proto.RawEvent),
config: config,
mtx: sync.Mutex{},
ctx: ctx,
control: make(chan struct{}),
}

if len(config.Probes) == 0 {
// if no probes configured, keep loop channel empty
slog.Ctx(ctx).Info("new eserver with no probe required")
return es
probes: make(map[string]proto.EventProbe),
subscribers: make(map[string]chan<- proto.RawEvent),
config: config,
mtx: sync.Mutex{},
ctx: ctx,
control: make(chan struct{}),
eventChan: make(chan proto.RawEvent),
}

for _, p := range config.Probes {
Expand All @@ -50,28 +50,126 @@ func NewEServer(ctx context.Context, config EventConfig) *EServer {
continue
}
es.probes[p] = ep
go ep.Start(ctx)
err := ep.Register(es.eventChan)
if err != nil {
slog.Ctx(ctx).Warn("probe register failed", "probe", p)
continue
}
go ep.Start(ctx, proto.ProbeTypeEvent)
slog.Ctx(ctx).Debug("eserver start", "subject", p)
}

// start cache loop
slog.Ctx(ctx).Debug("new eserver start cache loop")
slog.Ctx(ctx).Debug("new eserver start dispatch loop")
go es.dispatcher(ctx, es.control)

err := es.enableLoki()
if err != nil {
slog.Ctx(ctx).Warn("enable loki failed", "err", err)
}
return es
}

func (e *EServer) enableLoki() error {
if e.lokiIngester != nil {
return nil
}

// handle grafana loki ingester preparation
if config.LokiEnable && config.LokiAddress != "" {
if e.config.LokiEnable && e.config.LokiAddress != "" {
slog.Ctx(e.ctx).Debug("enabling loki ingester", "address", e.config.LokiAddress)
datach := make(chan proto.RawEvent)
ingester, err := lokiwrapper.NewLokiIngester(ctx, config.LokiAddress, nettop.GetNodeName())
ingester, err := lokiwrapper.NewLokiIngester(e.ctx, e.config.LokiAddress, nettop.GetNodeName())
if err != nil {
slog.Ctx(ctx).Info("new loki ingester", "err", err, "client", ingester.Name())
slog.Ctx(e.ctx).Info("new loki ingester", "err", err, "client", ingester.Name())
} else {
es.subscribe(ingester.Name(), datach)
go ingester.Watch(ctx, datach)
e.subscribe(ingester.Name(), datach)
go ingester.Watch(e.ctx, datach)
}
e.lokiDatach = datach
e.lokiIngester = ingester
}

return nil
}

func (e *EServer) disableLoki() error {
if e.lokiIngester == nil {
return nil
}

return es
slog.Ctx(e.ctx).Debug("disabling loki ingester")
e.unsubscribe(e.lokiIngester.Name())

err := e.lokiIngester.Close()
if err != nil {
return err
}

close(e.lokiDatach)
e.lokiIngester = nil
e.lokiDatach = nil
return nil
}

func (e *EServer) Reload(config EventConfig) error {
enabled := lo.Keys(e.probes)
toClose, toStart := lo.Difference(enabled, config.Probes)
slog.Ctx(e.ctx).Info("reload event probes", "close", toClose, "enable", toStart)

for _, n := range toClose {
p, ok := e.probes[n]
if !ok {
slog.Ctx(e.ctx).Warn("probe not found in enabled probes, skip.", "probe", n)
continue
}

err := p.Close(proto.ProbeTypeEvent)
if err != nil {
slog.Ctx(e.ctx).Warn("close probe error", "probe", n, "err", err)
continue
}

// clear event channel
err = p.Register(nil)
if err != nil {
slog.Ctx(e.ctx).Warn("unregister probe error", "probe", n, "err", err)
continue
}

delete(e.probes, n)
}

for _, n := range toStart {
p := probe.GetEventProbe(n)
if p == nil {
slog.Ctx(e.ctx).Info("get event probe nil", "probe", p)
continue
}
e.probes[n] = p
go p.Start(e.ctx, proto.ProbeTypeEvent)
slog.Ctx(e.ctx).Debug("eserver start", "subject", p)

err := p.Register(e.eventChan)
if err != nil {
slog.Ctx(e.ctx).Info("register receiver", "probe", p, "err", err)
continue
}
}

e.config = config

if config.LokiEnable {
if err := e.enableLoki(); err != nil {
slog.Ctx(e.ctx).Warn("enable loki error", "err", err)
}
} else {
if err := e.disableLoki(); err != nil {
slog.Ctx(e.ctx).Warn("disable loki error", "err", err)
}
}

return nil
}

func (e *EServer) WatchEvent(_ *proto.WatchRequest, srv proto.Inspector_WatchEventServer) error {
Expand Down Expand Up @@ -109,35 +207,23 @@ func (e *EServer) subscribe(client string, ch chan<- proto.RawEvent) {
e.mtx.Lock()
defer e.mtx.Unlock()

e.cpool[client] = ch
e.subscribers[client] = ch
}

func (e *EServer) unsubscribe(client string) {
e.mtx.Lock()
defer e.mtx.Unlock()

delete(e.cpool, client)
delete(e.subscribers, client)
}

func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) {
pbs := e.probes
receiver := make(chan proto.RawEvent)
for p, pb := range pbs {
err := pb.Register(receiver)
if err != nil {
slog.Ctx(ctx).Info("register receiver", "probe", p, "err", err)
continue
}
}

slog.Ctx(ctx).Debug("dispatcher", "probes", pbs)
for {
select {

case <-stopc:
slog.Ctx(ctx).Debug("dispatcher exit of sop signal", "probes", pbs)
slog.Ctx(ctx).Debug("event dispatcher exited because of stop signal")
return
case evt := <-receiver:
case evt := <-e.eventChan:
err := e.broadcast(evt)
if err != nil {
slog.Ctx(ctx).Info("dispatcher broadcast", "err", err, "event", evt)
Expand All @@ -149,7 +235,7 @@ func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) {
}

func (e *EServer) broadcast(evt proto.RawEvent) error {
pbs := e.cpool
pbs := e.subscribers

ctx, cancelf := context.WithTimeout(e.ctx, 5*time.Second)
defer cancelf()
Expand Down
74 changes: 66 additions & 8 deletions pkg/exporter/cmd/metricserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
"sync"

"github.com/samber/lo"

"strings"
"time"
Expand Down Expand Up @@ -34,16 +37,12 @@ var (
func NewMServer(ctx context.Context, config MetricConfig) *MServer {
ms := &MServer{
ctx: ctx,
mtx: sync.Mutex{},
descs: make(map[string]*prometheus.Desc),
config: config,
probes: make(map[string]proto.MetricProbe),
metricCache: cache.New(3*cacheUpdateInterval, 10*cacheUpdateInterval),
}

if len(config.Probes) == 0 {
// if no probes configured, keep loop channel empty
slog.Ctx(ctx).Info("new mserver with no probe required")
return ms
loopctrl: make(chan struct{}),
}

for _, p := range config.Probes {
Expand All @@ -53,7 +52,7 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
continue
}
ms.probes[p] = mp
go mp.Start(ctx)
go mp.Start(ctx, proto.ProbeTypeMetrics)
slog.Ctx(ctx).Debug("new mserver add subject", "subject", p)
}

Expand All @@ -77,14 +76,14 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
}
// start cache loop
slog.Ctx(ctx).Debug("new mserver start cache loop")
ms.loopctrl = make(chan struct{})
go ms.collectLoop(ctx, cacheUpdateInterval, ms.loopctrl)

return ms
}

type MServer struct {
ctx context.Context
mtx sync.Mutex
descs map[string]*prometheus.Desc
config MetricConfig
metricCache *cache.Cache
Expand All @@ -104,7 +103,61 @@ func (s *MServer) Close() {
}
}

func (s *MServer) Reload(config MetricConfig) error {
s.mtx.Lock()
defer s.mtx.Unlock()

enabled := lo.Keys(s.probes)
toClose, toStart := lo.Difference(enabled, config.Probes)
slog.Ctx(s.ctx).Info("reload metric probes", "close", toClose, "enable", toStart)

for _, n := range toClose {
p, ok := s.probes[n]
if !ok {
slog.Ctx(s.ctx).Warn("probe not found in enabled probes, skip.", "probe", n)
continue
}

err := p.Close(proto.ProbeTypeMetrics)
if err != nil {
slog.Ctx(s.ctx).Warn("close probe error", "probe", n, "err", err)
continue
}
delete(s.probes, n)
}

for _, n := range toStart {
p := probe.GetProbe(n)
if p == nil {
slog.Ctx(s.ctx).Info("get metric probe nil", "probe", n)
continue
}
s.probes[n] = p
go p.Start(s.ctx, proto.ProbeTypeMetrics)
slog.Ctx(s.ctx).Debug("new mserver add subject", "subject", n)
}

for sub, mp := range s.probes {
mnames := mp.GetMetricNames()
for _, mname := range mnames {
if !strings.HasPrefix(mname, sub) {
continue
}
if s.config.Verbose {
s.descs[mname] = getDescOfMetricVerbose(sub, mname, s.additionalLabels)
} else {
s.descs[mname] = getDescOfMetric(sub, mname)
}
}
}

s.config = config
return nil
}

func (s *MServer) Collect(ch chan<- prometheus.Metric) {
s.mtx.Lock()
defer s.mtx.Unlock()
slog.Ctx(s.ctx).Debug("metric server collect request in", "metric count", len(s.descs))
for mname, desc := range s.descs {
data, err := s.collectOnceCache(s.ctx, mname)
Expand Down Expand Up @@ -204,6 +257,11 @@ func (s *MServer) collectLoop(ctx context.Context, interval time.Duration, stopc

// collectWorkerSerial collect metric data in serial
func (s *MServer) collectWorkerSerial(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if len(s.probes) == 0 {
return nil
}
slog.Ctx(s.ctx).Debug("collect worker serial start")
workdone := make(chan struct{})
cstart := time.Now()
Expand Down
Loading
Loading