diff --git a/pkg/exporter/cmd/eventserver.go b/pkg/exporter/cmd/eventserver.go index 298aa7aa..85fa0d7f 100644 --- a/pkg/exporter/cmd/eventserver.go +++ b/pkg/exporter/cmd/eventserver.go @@ -12,7 +12,7 @@ type EventServer struct { *DynamicProbeServer[probe.EventProbe] } -func NewEventServer(sinks []sink.Sink) (*EventServer, error) { +func newEventServer(sinks []sink.Sink) (*EventServer, error) { var sinkWrappers []*sinkWrapper done := make(chan struct{}) @@ -109,13 +109,18 @@ func (m *EventProbeManager) CreateProbe(config ProbeConfig) (probe.EventProbe, e return probe.CreateEventProbe(config.Name, m.sinkChan, config.Args) } -func (m *EventProbeManager) StartProbe(ctx context.Context, probe probe.EventProbe) error { - log.Infof("start event probe %s", probe.Name()) - return probe.Start(ctx) +func (m *EventProbeManager) StartProbe(ctx context.Context, p probe.EventProbe) error { + log.Infof("start event probe %s", p.Name()) + return p.Start(ctx) } -func (m *EventProbeManager) StopProbe(ctx context.Context, probe probe.EventProbe) error { - return probe.Stop(ctx) +func (m *EventProbeManager) StopProbe(ctx context.Context, p probe.EventProbe) error { + log.Infof("stop event probe %s", p.Name()) + state := p.State() + if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed { + return nil + } + return p.Stop(ctx) } var _ ProbeManager[probe.MetricsProbe] = &MetricsProbeManager{} diff --git a/pkg/exporter/cmd/metricserver.go b/pkg/exporter/cmd/metricserver.go index afa55570..90eb6d19 100644 --- a/pkg/exporter/cmd/metricserver.go +++ b/pkg/exporter/cmd/metricserver.go @@ -11,7 +11,7 @@ import ( log "github.com/sirupsen/logrus" ) -func NewMetricsServer() (*MetricsServer, error) { +func newMetricsServer() (*MetricsServer, error) { r := prometheus.NewRegistry() handler := promhttp.HandlerFor(prometheus.Gatherers{ @@ -37,21 +37,27 @@ func (m *MetricsProbeManager) CreateProbe(config ProbeConfig) (probe.MetricsProb return probe.CreateMetricsProbe(config.Name, config.Args) } -func (m *MetricsProbeManager) StartProbe(ctx context.Context, probe probe.MetricsProbe) error { - log.Infof("start metrics probe %s", probe.Name()) - if err := probe.Start(ctx); err != nil { +func (m *MetricsProbeManager) StartProbe(ctx context.Context, p probe.MetricsProbe) error { + log.Infof("start metrics probe %s", p.Name()) + if err := p.Start(ctx); err != nil { return err } - m.prometheusRegistry.MustRegister(probe) + m.prometheusRegistry.MustRegister(p) return nil } -func (m *MetricsProbeManager) StopProbe(ctx context.Context, probe probe.MetricsProbe) error { - log.Infof("stop metrics probe %s", probe.Name()) - if err := probe.Stop(ctx); err != nil { +func (m *MetricsProbeManager) StopProbe(ctx context.Context, p probe.MetricsProbe) error { + log.Infof("stop metrics probe %s", p.Name()) + + state := p.State() + if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed { + return nil + } + + if err := p.Stop(ctx); err != nil { return err } - m.prometheusRegistry.Unregister(probe) + m.prometheusRegistry.Unregister(p) return nil } diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go index dcb92267..3f8d90b2 100644 --- a/pkg/exporter/cmd/server.go +++ b/pkg/exporter/cmd/server.go @@ -280,91 +280,93 @@ func (i *inspServer) reload() error { return nil } +func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server { + http.Handle("/metrics", i.metricsServer) + http.Handle("/", http.HandlerFunc(defaultPage)) + http.Handle("/status", http.HandlerFunc(i.statusPage)) + if cfg.DebugMode { + reg := prometheus.NewRegistry() + + reg.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + ) + http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) + } + listenAddr := fmt.Sprintf(":%d", cfg.Port) + log.Infof("inspector start metric server, listenAddr: %s", listenAddr) + return &http.Server{Addr: listenAddr} +} + func (i *inspServer) start(cfg *InspServerConfig) error { if err := gops.Listen(gops.Options{}); err != nil { log.Infof("start gops err: %v", err) } - go func() { - var err error - ctx := context.TODO() - - err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels) - if err != nil { - log.Errorf("failed init additional labels: %v", err) - return - } - - log.Infof("start metrics server") - i.metricsServer, err = NewMetricsServer() - if err != nil { - log.Errorf("failed create metrics server: %v", err) - return - } + var err error + ctx := context.TODO() + err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels) + if err != nil { + return fmt.Errorf("failed init additional labels: %w", err) + } - defer func() { - _ = i.metricsServer.Stop(ctx) - }() + i.metricsServer, err = newMetricsServer() + if err != nil { + return fmt.Errorf("failed create metrics server: %w", err) + } - if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil { - log.Errorf("failed start metrics server: %v", err) - return - } + if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil { + return fmt.Errorf("failed start metrics server: %w", err) + } - //sink - sinks, err := createSink(cfg.EventConfig.EventSinks) - if err != nil { - log.Errorf("failed create sinks, err: %v", err) - } else if len(sinks) != len(cfg.EventConfig.EventSinks) { - log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks)) - } + defer func() { + _ = i.metricsServer.Stop(ctx) + }() - log.Infof("start event server") - //TODO create sinks from config - i.eventServer, err = NewEventServer(sinks) - if err != nil { - log.Errorf("failed create event server: %v", err) - return - } + sinks, err := createSink(cfg.EventConfig.EventSinks) + if err != nil { + return fmt.Errorf("failed create sinks, err: %w", err) + } + if len(sinks) != len(cfg.EventConfig.EventSinks) { + log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks)) + } - defer func() { - _ = i.eventServer.Stop(context.TODO()) - }() + i.eventServer, err = newEventServer(sinks) + if err != nil { + return fmt.Errorf("failed create event server: %w", err) + } - if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil { - log.Errorf("failed start event server: %v", err) - return - } + if err = i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil { + return fmt.Errorf("failed start event server: %w", err) + } - http.Handle("/metrics", i.metricsServer) - http.Handle("/", http.HandlerFunc(defaultPage)) - http.Handle("/status", http.HandlerFunc(i.statusPage)) - if cfg.DebugMode { - reg := prometheus.NewRegistry() - - reg.MustRegister( - collectors.NewGoCollector(), - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - ) - http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) - } - listenAddr := fmt.Sprintf(":%d", cfg.Port) - log.Infof("inspector start metric server, listenAddr: %s", listenAddr) - srv := &http.Server{Addr: listenAddr} - if err := srv.ListenAndServe(); err != nil { - log.Errorf("inspector start metric server err: %v", err) - } + defer func() { + _ = i.eventServer.Stop(ctx) }() done := make(chan struct{}) - if err := i.WatchConfig(done); err != nil { + if err = i.WatchConfig(done); err != nil { log.Errorf("failed watch config, dynamic load would not work: %v", err) } - WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - close(done) + srv := i.newHTTPServer(cfg) + serverClosedChan := make(chan struct{}) + serverClosed := false + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf("server error: %v", err) + } + close(serverClosedChan) + serverClosed = true + }() + + WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + close(done) + if !serverClosed { + _ = srv.Shutdown(ctx) + } return nil } @@ -381,16 +383,15 @@ func createSink(sinkConfigs []EventSinkConfig) ([]sink.Sink, error) { return ret, nil } -func WaitSignals(i *inspServer, sgs ...os.Signal) { +func WaitSignals(done <-chan struct{}, sgs ...os.Signal) { s := make(chan os.Signal, 1) signal.Notify(s, sgs...) - sig := <-s - log.Warnf("recive signal %s, stopping", sig.String()) - if err := i.metricsServer.Stop(i.ctx); err != nil { - log.Errorf("failed stop metrics server, err: %v", err) - } - if err := i.eventServer.Stop(i.ctx); err != nil { - log.Errorf("failed stop event server, err: %v", err) + select { + case sig := <-s: + log.Warnf("recive signal %s, stopping", sig.String()) + return + case <-done: + log.Warnf("recive server close signal") } } diff --git a/pkg/exporter/nettop/cache.go b/pkg/exporter/nettop/cache.go index e202cc5e..1e82f22c 100644 --- a/pkg/exporter/nettop/cache.go +++ b/pkg/exporter/nettop/cache.go @@ -240,7 +240,7 @@ func StartCache(ctx context.Context, sidecarMode bool) error { } func StopCache() { - control <- struct{}{} + close(control) } func cacheDaemonLoop(_ context.Context, control chan struct{}) { @@ -249,10 +249,12 @@ func cacheDaemonLoop(_ context.Context, control chan struct{}) { t := time.NewTicker(cacheUpdateInterval) defer t.Stop() +loop: for { select { case <-control: log.Info("cache daemon loop exit of control signal") + break loop case <-t.C: if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil { log.Errorf("failed cache pods: %v", err)