Skip to content

Commit

Permalink
feat(exporter): support config hot reload
Browse files Browse the repository at this point in the history
Signed-off-by: xiayu.lyt <xiayu.lyt@alibaba-inc.com>
  • Loading branch information
Lyt99 committed Sep 1, 2023
1 parent 19c0b92 commit d62ebec
Show file tree
Hide file tree
Showing 29 changed files with 516 additions and 190 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/cilium/ebpf v0.9.3
github.com/containerd/containerd v1.6.15
github.com/docker/docker v20.10.22+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/snappy v0.0.4
github.com/google/gops v0.3.26
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
github.com/emicklei/go-restful v2.11.2-0.20200112161605-a7c079c43d51+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/cmd/diag_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
continue
}

go pb.Start(cmd.Context())
go pb.Start(cmd.Context(), "")
go func() {
for evt := range ch {
ets, err := nettop.GetEntityByNetns(int(evt.Netns))
Expand Down
172 changes: 129 additions & 43 deletions pkg/exporter/cmd/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/samber/lo"

"sync"
"time"

Expand All @@ -19,28 +21,26 @@ import (

type EServer struct {
proto.UnimplementedInspectorServer
probes map[string]proto.EventProbe
cpool map[string]chan<- proto.RawEvent
mtx sync.Mutex
ctx context.Context
control chan struct{}
config EventConfig
probes map[string]proto.EventProbe
subscribers map[string]chan<- proto.RawEvent
mtx sync.Mutex
ctx context.Context
control chan struct{}
config EventConfig
eventChan chan proto.RawEvent
lokiDatach chan proto.RawEvent
lokiIngester *lokiwrapper.Ingester
}

func NewEServer(ctx context.Context, config EventConfig) *EServer {
es := &EServer{
probes: make(map[string]proto.EventProbe),
cpool: make(map[string]chan<- proto.RawEvent),
config: config,
mtx: sync.Mutex{},
ctx: ctx,
control: make(chan struct{}),
}

if len(config.Probes) == 0 {
// if no probes configured, keep loop channel empty
slog.Ctx(ctx).Info("new eserver with no probe required")
return es
probes: make(map[string]proto.EventProbe),
subscribers: make(map[string]chan<- proto.RawEvent),
config: config,
mtx: sync.Mutex{},
ctx: ctx,
control: make(chan struct{}),
eventChan: make(chan proto.RawEvent),
}

for _, p := range config.Probes {
Expand All @@ -50,28 +50,126 @@ func NewEServer(ctx context.Context, config EventConfig) *EServer {
continue
}
es.probes[p] = ep
go ep.Start(ctx)
err := ep.Register(es.eventChan)
if err != nil {
slog.Ctx(ctx).Warn("probe register failed", "probe", p)
continue
}
go ep.Start(ctx, proto.ProbeTypeEvent)
slog.Ctx(ctx).Debug("eserver start", "subject", p)
}

// start cache loop
slog.Ctx(ctx).Debug("new eserver start cache loop")
slog.Ctx(ctx).Debug("new eserver start dispatch loop")
go es.dispatcher(ctx, es.control)

err := es.enableLoki()
if err != nil {
slog.Ctx(ctx).Warn("enable loki failed", "err", err)
}
return es
}

func (e *EServer) enableLoki() error {
if e.lokiIngester != nil {
return nil
}

// handle grafana loki ingester preparation
if config.LokiEnable && config.LokiAddress != "" {
if e.config.LokiEnable && e.config.LokiAddress != "" {
slog.Ctx(e.ctx).Debug("enabling loki ingester", "address", e.config.LokiAddress)
datach := make(chan proto.RawEvent)
ingester, err := lokiwrapper.NewLokiIngester(ctx, config.LokiAddress, nettop.GetNodeName())
ingester, err := lokiwrapper.NewLokiIngester(e.ctx, e.config.LokiAddress, nettop.GetNodeName())
if err != nil {
slog.Ctx(ctx).Info("new loki ingester", "err", err, "client", ingester.Name())
slog.Ctx(e.ctx).Info("new loki ingester", "err", err, "client", ingester.Name())
} else {
es.subscribe(ingester.Name(), datach)
go ingester.Watch(ctx, datach)
e.subscribe(ingester.Name(), datach)
go ingester.Watch(e.ctx, datach)
}
e.lokiDatach = datach
e.lokiIngester = ingester
}

return nil
}

func (e *EServer) disableLoki() error {
if e.lokiIngester == nil {
return nil
}

return es
slog.Ctx(e.ctx).Debug("disabling loki ingester")
e.unsubscribe(e.lokiIngester.Name())

err := e.lokiIngester.Close()
if err != nil {
return err
}

close(e.lokiDatach)
e.lokiIngester = nil
e.lokiDatach = nil
return nil
}

func (e *EServer) Reload(config EventConfig) error {
enabled := lo.Keys(e.probes)
toClose, toStart := lo.Difference(enabled, config.Probes)
slog.Ctx(e.ctx).Info("reload event probes", "close", toClose, "enable", toStart)

for _, n := range toClose {
p, ok := e.probes[n]
if !ok {
slog.Ctx(e.ctx).Warn("probe not found in enabled probes, skip.", "probe", n)
continue
}

err := p.Close(proto.ProbeTypeEvent)
if err != nil {
slog.Ctx(e.ctx).Warn("close probe error", "probe", n, "err", err)
continue
}

// clear event channel
err = p.Register(nil)
if err != nil {
slog.Ctx(e.ctx).Warn("unregister probe error", "probe", n, "err", err)
continue
}

delete(e.probes, n)
}

for _, n := range toStart {
p := probe.GetEventProbe(n)
if p == nil {
slog.Ctx(e.ctx).Info("get event probe nil", "probe", p)
continue
}
e.probes[n] = p
go p.Start(e.ctx, proto.ProbeTypeEvent)
slog.Ctx(e.ctx).Debug("eserver start", "subject", p)

err := p.Register(e.eventChan)
if err != nil {
slog.Ctx(e.ctx).Info("register receiver", "probe", p, "err", err)
continue
}
}

e.config = config

if config.LokiEnable {
if err := e.enableLoki(); err != nil {
slog.Ctx(e.ctx).Warn("enable loki error", "err", err)
}
} else {
if err := e.disableLoki(); err != nil {
slog.Ctx(e.ctx).Warn("disable loki error", "err", err)
}
}

return nil
}

func (e *EServer) WatchEvent(_ *proto.WatchRequest, srv proto.Inspector_WatchEventServer) error {
Expand Down Expand Up @@ -109,35 +207,23 @@ func (e *EServer) subscribe(client string, ch chan<- proto.RawEvent) {
e.mtx.Lock()
defer e.mtx.Unlock()

e.cpool[client] = ch
e.subscribers[client] = ch
}

func (e *EServer) unsubscribe(client string) {
e.mtx.Lock()
defer e.mtx.Unlock()

delete(e.cpool, client)
delete(e.subscribers, client)
}

func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) {
pbs := e.probes
receiver := make(chan proto.RawEvent)
for p, pb := range pbs {
err := pb.Register(receiver)
if err != nil {
slog.Ctx(ctx).Info("register receiver", "probe", p, "err", err)
continue
}
}

slog.Ctx(ctx).Debug("dispatcher", "probes", pbs)
for {
select {

case <-stopc:
slog.Ctx(ctx).Debug("dispatcher exit of sop signal", "probes", pbs)
slog.Ctx(ctx).Debug("event dispatcher exited because of stop signal")
return
case evt := <-receiver:
case evt := <-e.eventChan:
err := e.broadcast(evt)
if err != nil {
slog.Ctx(ctx).Info("dispatcher broadcast", "err", err, "event", evt)
Expand All @@ -149,7 +235,7 @@ func (e *EServer) dispatcher(ctx context.Context, stopc chan struct{}) {
}

func (e *EServer) broadcast(evt proto.RawEvent) error {
pbs := e.cpool
pbs := e.subscribers

ctx, cancelf := context.WithTimeout(e.ctx, 5*time.Second)
defer cancelf()
Expand Down
74 changes: 66 additions & 8 deletions pkg/exporter/cmd/metricserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
"sync"

"github.com/samber/lo"

"strings"
"time"
Expand Down Expand Up @@ -34,16 +37,12 @@ var (
func NewMServer(ctx context.Context, config MetricConfig) *MServer {
ms := &MServer{
ctx: ctx,
mtx: sync.Mutex{},
descs: make(map[string]*prometheus.Desc),
config: config,
probes: make(map[string]proto.MetricProbe),
metricCache: cache.New(3*cacheUpdateInterval, 10*cacheUpdateInterval),
}

if len(config.Probes) == 0 {
// if no probes configured, keep loop channel empty
slog.Ctx(ctx).Info("new mserver with no probe required")
return ms
loopctrl: make(chan struct{}),
}

for _, p := range config.Probes {
Expand All @@ -53,7 +52,7 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
continue
}
ms.probes[p] = mp
go mp.Start(ctx)
go mp.Start(ctx, proto.ProbeTypeMetrics)
slog.Ctx(ctx).Debug("new mserver add subject", "subject", p)
}

Expand All @@ -77,14 +76,14 @@ func NewMServer(ctx context.Context, config MetricConfig) *MServer {
}
// start cache loop
slog.Ctx(ctx).Debug("new mserver start cache loop")
ms.loopctrl = make(chan struct{})
go ms.collectLoop(ctx, cacheUpdateInterval, ms.loopctrl)

return ms
}

type MServer struct {
ctx context.Context
mtx sync.Mutex
descs map[string]*prometheus.Desc
config MetricConfig
metricCache *cache.Cache
Expand All @@ -104,7 +103,61 @@ func (s *MServer) Close() {
}
}

func (s *MServer) Reload(config MetricConfig) error {
s.mtx.Lock()
defer s.mtx.Unlock()

enabled := lo.Keys(s.probes)
toClose, toStart := lo.Difference(enabled, config.Probes)
slog.Ctx(s.ctx).Info("reload metric probes", "close", toClose, "enable", toStart)

for _, n := range toClose {
p, ok := s.probes[n]
if !ok {
slog.Ctx(s.ctx).Warn("probe not found in enabled probes, skip.", "probe", n)
continue
}

err := p.Close(proto.ProbeTypeMetrics)
if err != nil {
slog.Ctx(s.ctx).Warn("close probe error", "probe", n, "err", err)
continue
}
delete(s.probes, n)
}

for _, n := range toStart {
p := probe.GetProbe(n)
if p == nil {
slog.Ctx(s.ctx).Info("get metric probe nil", "probe", n)
continue
}
s.probes[n] = p
go p.Start(s.ctx, proto.ProbeTypeMetrics)
slog.Ctx(s.ctx).Debug("new mserver add subject", "subject", n)
}

for sub, mp := range s.probes {
mnames := mp.GetMetricNames()
for _, mname := range mnames {
if !strings.HasPrefix(mname, sub) {
continue
}
if s.config.Verbose {
s.descs[mname] = getDescOfMetricVerbose(sub, mname, s.additionalLabels)
} else {
s.descs[mname] = getDescOfMetric(sub, mname)
}
}
}

s.config = config
return nil
}

func (s *MServer) Collect(ch chan<- prometheus.Metric) {
s.mtx.Lock()
defer s.mtx.Unlock()
slog.Ctx(s.ctx).Debug("metric server collect request in", "metric count", len(s.descs))
for mname, desc := range s.descs {
data, err := s.collectOnceCache(s.ctx, mname)
Expand Down Expand Up @@ -204,6 +257,11 @@ func (s *MServer) collectLoop(ctx context.Context, interval time.Duration, stopc

// collectWorkerSerial collect metric data in serial
func (s *MServer) collectWorkerSerial(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if len(s.probes) == 0 {
return nil
}
slog.Ctx(s.ctx).Debug("collect worker serial start")
workdone := make(chan struct{})
cstart := time.Now()
Expand Down
Loading

0 comments on commit d62ebec

Please sign in to comment.