Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: probe state #279

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions pkg/exporter/cmd/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type EventServer struct {
*DynamicProbeServer[probe.EventProbe]
}

func NewEventServer(sinks []sink.Sink) (*EventServer, error) {
func newEventServer(sinks []sink.Sink) (*EventServer, error) {
var sinkWrappers []*sinkWrapper

done := make(chan struct{})
Expand Down Expand Up @@ -109,13 +109,18 @@ func (m *EventProbeManager) CreateProbe(config ProbeConfig) (probe.EventProbe, e
return probe.CreateEventProbe(config.Name, m.sinkChan, config.Args)
}

func (m *EventProbeManager) StartProbe(ctx context.Context, probe probe.EventProbe) error {
log.Infof("start event probe %s", probe.Name())
return probe.Start(ctx)
func (m *EventProbeManager) StartProbe(ctx context.Context, p probe.EventProbe) error {
log.Infof("start event probe %s", p.Name())
return p.Start(ctx)
}

func (m *EventProbeManager) StopProbe(ctx context.Context, probe probe.EventProbe) error {
return probe.Stop(ctx)
func (m *EventProbeManager) StopProbe(ctx context.Context, p probe.EventProbe) error {
log.Infof("stop event probe %s", p.Name())
state := p.State()
if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed {
return nil
}
return p.Stop(ctx)
}

var _ ProbeManager[probe.MetricsProbe] = &MetricsProbeManager{}
24 changes: 15 additions & 9 deletions pkg/exporter/cmd/metricserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
log "github.com/sirupsen/logrus"
)

func NewMetricsServer() (*MetricsServer, error) {
func newMetricsServer() (*MetricsServer, error) {

r := prometheus.NewRegistry()
handler := promhttp.HandlerFor(prometheus.Gatherers{
Expand All @@ -37,21 +37,27 @@ func (m *MetricsProbeManager) CreateProbe(config ProbeConfig) (probe.MetricsProb
return probe.CreateMetricsProbe(config.Name, config.Args)
}

func (m *MetricsProbeManager) StartProbe(ctx context.Context, probe probe.MetricsProbe) error {
log.Infof("start metrics probe %s", probe.Name())
if err := probe.Start(ctx); err != nil {
func (m *MetricsProbeManager) StartProbe(ctx context.Context, p probe.MetricsProbe) error {
log.Infof("start metrics probe %s", p.Name())
if err := p.Start(ctx); err != nil {
return err
}
m.prometheusRegistry.MustRegister(probe)
m.prometheusRegistry.MustRegister(p)
return nil
}

func (m *MetricsProbeManager) StopProbe(ctx context.Context, probe probe.MetricsProbe) error {
log.Infof("stop metrics probe %s", probe.Name())
if err := probe.Stop(ctx); err != nil {
func (m *MetricsProbeManager) StopProbe(ctx context.Context, p probe.MetricsProbe) error {
log.Infof("stop metrics probe %s", p.Name())

state := p.State()
if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed {
return nil
}

if err := p.Stop(ctx); err != nil {
return err
}
m.prometheusRegistry.Unregister(probe)
m.prometheusRegistry.Unregister(p)
return nil
}

Expand Down
147 changes: 74 additions & 73 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,91 +280,93 @@ func (i *inspServer) reload() error {
return nil
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
return &http.Server{Addr: listenAddr}
}

func (i *inspServer) start(cfg *InspServerConfig) error {
if err := gops.Listen(gops.Options{}); err != nil {
log.Infof("start gops err: %v", err)
}

go func() {
var err error
ctx := context.TODO()

err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels)
if err != nil {
log.Errorf("failed init additional labels: %v", err)
return
}

log.Infof("start metrics server")
i.metricsServer, err = NewMetricsServer()
if err != nil {
log.Errorf("failed create metrics server: %v", err)
return
}
var err error
ctx := context.TODO()
err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels)
if err != nil {
return fmt.Errorf("failed init additional labels: %w", err)
}

defer func() {
_ = i.metricsServer.Stop(ctx)
}()
i.metricsServer, err = newMetricsServer()
if err != nil {
return fmt.Errorf("failed create metrics server: %w", err)
}

if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
log.Errorf("failed start metrics server: %v", err)
return
}
if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
return fmt.Errorf("failed start metrics server: %w", err)
}

//sink
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
log.Errorf("failed create sinks, err: %v", err)
} else if len(sinks) != len(cfg.EventConfig.EventSinks) {
log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks))
}
defer func() {
_ = i.metricsServer.Stop(ctx)
}()

log.Infof("start event server")
//TODO create sinks from config
i.eventServer, err = NewEventServer(sinks)
if err != nil {
log.Errorf("failed create event server: %v", err)
return
}
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
return fmt.Errorf("failed create sinks, err: %w", err)
}
if len(sinks) != len(cfg.EventConfig.EventSinks) {
log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks))
}

defer func() {
_ = i.eventServer.Stop(context.TODO())
}()
i.eventServer, err = newEventServer(sinks)
if err != nil {
return fmt.Errorf("failed create event server: %w", err)
}

if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
log.Errorf("failed start event server: %v", err)
return
}
if err = i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
return fmt.Errorf("failed start event server: %w", err)
}

http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
srv := &http.Server{Addr: listenAddr}
if err := srv.ListenAndServe(); err != nil {
log.Errorf("inspector start metric server err: %v", err)
}
defer func() {
_ = i.eventServer.Stop(ctx)
}()

done := make(chan struct{})

if err := i.WatchConfig(done); err != nil {
if err = i.WatchConfig(done); err != nil {
log.Errorf("failed watch config, dynamic load would not work: %v", err)
}

WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)
srv := i.newHTTPServer(cfg)
serverClosedChan := make(chan struct{})
serverClosed := false
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("server error: %v", err)
}

close(serverClosedChan)
serverClosed = true
}()

WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)
if !serverClosed {
_ = srv.Shutdown(ctx)
}
return nil
}

Expand All @@ -381,16 +383,15 @@ func createSink(sinkConfigs []EventSinkConfig) ([]sink.Sink, error) {
return ret, nil
}

func WaitSignals(i *inspServer, sgs ...os.Signal) {
func WaitSignals(done <-chan struct{}, sgs ...os.Signal) {
s := make(chan os.Signal, 1)
signal.Notify(s, sgs...)
sig := <-s
log.Warnf("recive signal %s, stopping", sig.String())
if err := i.metricsServer.Stop(i.ctx); err != nil {
log.Errorf("failed stop metrics server, err: %v", err)
}
if err := i.eventServer.Stop(i.ctx); err != nil {
log.Errorf("failed stop event server, err: %v", err)
select {
case sig := <-s:
log.Warnf("recive signal %s, stopping", sig.String())
return
case <-done:
log.Warnf("recive server close signal")
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/exporter/nettop/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func StartCache(ctx context.Context, sidecarMode bool) error {
}

func StopCache() {
control <- struct{}{}
close(control)
}

func cacheDaemonLoop(_ context.Context, control chan struct{}) {
Expand All @@ -249,10 +249,12 @@ func cacheDaemonLoop(_ context.Context, control chan struct{}) {
t := time.NewTicker(cacheUpdateInterval)
defer t.Stop()

loop:
for {
select {
case <-control:
log.Info("cache daemon loop exit of control signal")
break loop
case <-t.C:
if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil {
log.Errorf("failed cache pods: %v", err)
Expand Down
Loading