diff --git a/.chloggen/shutdowntel.yaml b/.chloggen/shutdowntel.yaml new file mode 100755 index 00000000000..1342ef97eda --- /dev/null +++ b/.chloggen/shutdowntel.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Shutdown internal telemetry with the Service (every time config changes). + +# One or more tracking issues or pull requests related to the change +issues: [5564] diff --git a/service/collector.go b/service/collector.go index eab9630c1ed..db16e3de92e 100644 --- a/service/collector.go +++ b/service/collector.go @@ -28,7 +28,6 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service/internal/grpclog" ) @@ -75,8 +74,7 @@ func (s State) String() string { // Collector represents a server providing the OpenTelemetry Collector service. // Deprecated: [v0.67.0] use otelcol.Collector type Collector struct { - set CollectorSettings - telemetry *telemetryInitializer + set CollectorSettings service *service state *atomic.Int32 @@ -98,7 +96,6 @@ func New(set CollectorSettings) (*Collector, error) { return &Collector{ set: set, - telemetry: newColTelemetry(featuregate.GetRegistry()), state: atomic.NewInt32(int32(StateStarting)), shutdownChan: make(chan struct{}), // Per signal.Notify documentation, a size of the channel equaled with @@ -145,7 +142,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { Config: cfg, AsyncErrorChannel: col.asyncErrorChannel, LoggingOptions: col.set.LoggingOptions, - telemetry: col.telemetry, }) if err != nil { return err @@ -156,7 +152,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } if err = col.service.Start(ctx); err != nil { - return multierr.Append(err, col.shutdownServiceAndTelemetry(ctx)) + return multierr.Combine(err, col.service.Shutdown(ctx)) } col.setCollectorState(StateRunning) return nil @@ -238,28 +234,13 @@ func (col *Collector) shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err)) } - errs = multierr.Append(errs, col.shutdownServiceAndTelemetry(ctx)) - - col.setCollectorState(StateClosed) - - return errs -} - -// shutdownServiceAndTelemetry bundles shutting down the service and telemetryInitializer. -// Returned error will be in multierr form and wrapped. -func (col *Collector) shutdownServiceAndTelemetry(ctx context.Context) error { - var errs error - // shutdown service if err := col.service.Shutdown(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown service after error: %w", err)) } - // TODO: Move this as part of the service shutdown. - // shutdown telemetryInitializer - if err := col.telemetry.shutdown(); err != nil { - errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err)) - } + col.setCollectorState(StateClosed) + return errs } diff --git a/service/collector_test.go b/service/collector_test.go index 11ca02502e8..7e6a1f62e0d 100644 --- a/service/collector_test.go +++ b/service/collector_test.go @@ -29,8 +29,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/featuregate" - "go.opentelemetry.io/collector/internal/obsreportconfig" ) func TestStateString(t *testing.T) { @@ -243,26 +241,6 @@ func TestCollectorStartInvalidConfig(t *testing.T) { assert.Error(t, col.Run(context.Background())) } -func TestCollectorStartWithOpenCensusMetrics(t *testing.T) { - for _, tc := range ownMetricsTestCases() { - t.Run(tc.name, func(t *testing.T) { - testCollectorStartHelper(t, newColTelemetry(featuregate.NewRegistry()), tc) - }) - } -} - -func TestCollectorStartWithOpenTelemetryMetrics(t *testing.T) { - for _, tc := range ownMetricsTestCases() { - t.Run(tc.name, func(t *testing.T) { - registry := featuregate.NewRegistry() - obsreportconfig.RegisterInternalMetricFeatureGate(registry) - colTel := newColTelemetry(registry) - require.NoError(t, colTel.registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true})) - testCollectorStartHelper(t, colTel, tc) - }) - } -} - func TestCollectorStartWithTraceContextPropagation(t *testing.T) { tests := []struct { file string diff --git a/service/service.go b/service/service.go index 5e87429d1cd..f42fc68977d 100644 --- a/service/service.go +++ b/service/service.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/proctelemetry" "go.opentelemetry.io/collector/service/telemetry" @@ -41,6 +42,10 @@ type service struct { } func newService(set *settings) (*service, error) { + reg := set.registry + if reg == nil { + reg = featuregate.GetRegistry() + } srv := &service{ buildInfo: set.BuildInfo, config: set.Config, @@ -49,12 +54,10 @@ func newService(set *settings) (*service, error) { buildInfo: set.BuildInfo, asyncErrorChannel: set.AsyncErrorChannel, }, - telemetryInitializer: set.telemetry, + telemetryInitializer: newColTelemetry(reg), } - var err error - srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{ - ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry) + srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry) if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } @@ -131,7 +134,9 @@ func (srv *service) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err)) } - // TODO: Shutdown MeterProvider. + if err := srv.telemetryInitializer.shutdown(); err != nil { + errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err)) + } return errs } diff --git a/service/service_test.go b/service/service_test.go index d8105d1a7f3..fad0e455f53 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -17,7 +17,6 @@ package service import ( "bufio" "context" - "fmt" "net/http" "path/filepath" "strings" @@ -207,44 +206,34 @@ func TestServiceTelemetryCleanupOnError(t *testing.T) { invalidCfg, err := invalidProvider.Get(context.Background(), factories) require.NoError(t, err) - // Read valid yaml config from file - validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - validCfg, err := validProvider.Get(context.Background(), factories) - require.NoError(t, err) - // Create a service with an invalid config and expect an error - telemetryOne := newColTelemetry(featuregate.NewRegistry()) _, err = newService(&settings{ BuildInfo: component.NewDefaultBuildInfo(), Factories: factories, Config: invalidCfg, - telemetry: telemetryOne, }) require.Error(t, err) + // Read valid yaml config from file + validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) + require.NoError(t, err) + validCfg, err := validProvider.Get(context.Background(), factories) + require.NoError(t, err) + // Create a service with a valid config and expect no error - telemetryTwo := newColTelemetry(featuregate.NewRegistry()) srv, err := newService(&settings{ BuildInfo: component.NewDefaultBuildInfo(), Factories: factories, Config: validCfg, - telemetry: telemetryTwo, }) require.NoError(t, err) - - // For safety ensure everything is cleaned up - t.Cleanup(func() { - assert.NoError(t, telemetryOne.shutdown()) - assert.NoError(t, telemetryTwo.shutdown()) - assert.NoError(t, srv.Shutdown(context.Background())) - }) + assert.NoError(t, srv.Shutdown(context.Background())) } func TestServiceTelemetryWithOpenCensusMetrics(t *testing.T) { for _, tc := range ownMetricsTestCases() { t.Run(tc.name, func(t *testing.T) { - testCollectorStartHelper(t, newColTelemetry(featuregate.NewRegistry()), tc) + testCollectorStartHelper(t, featuregate.NewRegistry(), tc) }) } } @@ -256,12 +245,12 @@ func TestServiceTelemetryWithOpenTelemetryMetrics(t *testing.T) { obsreportconfig.RegisterInternalMetricFeatureGate(registry) colTel := newColTelemetry(registry) require.NoError(t, colTel.registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true})) - testCollectorStartHelper(t, colTel, tc) + testCollectorStartHelper(t, registry, tc) }) } } -func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc ownMetricsTestCase) { +func testCollectorStartHelper(t *testing.T, reg *featuregate.Registry, tc ownMetricsTestCase) { factories, err := componenttest.NopFactories() zpagesExt := zpagesextension.NewFactory() factories.Extensions[zpagesExt.Type()] = zpagesExt @@ -301,27 +290,29 @@ func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc cfg, err := cfgProvider.Get(context.Background(), factories) require.NoError(t, err) - srv, err := newService(&settings{ - BuildInfo: component.BuildInfo{Version: "test version"}, - Factories: factories, - Config: cfg, - LoggingOptions: []zap.Option{zap.Hooks(hook)}, - telemetry: telemetry, - }) - require.NoError(t, err) - - require.NoError(t, srv.Start(context.Background())) - // Sleep for 1 second to ensure the http server is started. - time.Sleep(1 * time.Second) - assert.True(t, loggingHookCalled) - assertMetrics(t, metricsAddr, tc.expectedLabels) - assertZPages(t, zpagesAddr) - require.NoError(t, srv.Shutdown(context.Background())) - require.NoError(t, telemetry.shutdown()) + // Create a service, check for metrics, shutdown and repeat to ensure that telemetry can be started/shutdown and started again. + for i := 0; i < 2; i++ { + srv, err := newService(&settings{ + BuildInfo: component.BuildInfo{Version: "test version"}, + Factories: factories, + Config: cfg, + LoggingOptions: []zap.Option{zap.Hooks(hook)}, + registry: reg, + }) + require.NoError(t, err) + + require.NoError(t, srv.Start(context.Background())) + // Sleep for 1 second to ensure the http server is started. + time.Sleep(1 * time.Second) + assert.True(t, loggingHookCalled) + assertMetrics(t, metricsAddr, tc.expectedLabels) + assertZPages(t, zpagesAddr) + require.NoError(t, srv.Shutdown(context.Background())) + } } -// TestServiceTelemetryReusable tests that a single telemetryInitializer can be reused in multiple services -func TestServiceTelemetryReusable(t *testing.T) { +// TestServiceTelemetryRestart tests that the service correctly restarts the telemetry server. +func TestServiceTelemetryRestart(t *testing.T) { factories, err := componenttest.NopFactories() require.NoError(t, err) @@ -332,22 +323,15 @@ func TestServiceTelemetryReusable(t *testing.T) { require.NoError(t, err) // Create a service - telemetry := newColTelemetry(featuregate.NewRegistry()) - // For safety ensure everything is cleaned up - t.Cleanup(func() { - assert.NoError(t, telemetry.shutdown()) - }) - srvOne, err := newService(&settings{ BuildInfo: component.NewDefaultBuildInfo(), Factories: factories, Config: validCfg, - telemetry: telemetry, }) require.NoError(t, err) // URL of the telemetry service metrics endpoint - telemetryURL := fmt.Sprintf("http://%s/metrics", telemetry.server.Addr) + telemetryURL := "http://localhost:8888/metrics" // Start the service require.NoError(t, srvOne.Start(context.Background())) @@ -368,7 +352,6 @@ func TestServiceTelemetryReusable(t *testing.T) { BuildInfo: component.NewDefaultBuildInfo(), Factories: factories, Config: validCfg, - telemetry: telemetry, }) require.NoError(t, err) @@ -392,17 +375,12 @@ func createExampleService(t *testing.T, factories component.Factories) *service cfg, err := prov.Get(context.Background(), factories) require.NoError(t, err) - telemetry := newColTelemetry(featuregate.NewRegistry()) srv, err := newService(&settings{ BuildInfo: component.NewDefaultBuildInfo(), Factories: factories, Config: cfg, - telemetry: telemetry, }) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, telemetry.shutdown()) - }) return srv } diff --git a/service/settings.go b/service/settings.go index e77a14cb47c..02348c95f8f 100644 --- a/service/settings.go +++ b/service/settings.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/featuregate" ) // settings holds configuration for building a new service. @@ -38,7 +39,7 @@ type settings struct { LoggingOptions []zap.Option // For testing purpose only. - telemetry *telemetryInitializer + registry *featuregate.Registry } // CollectorSettings holds configuration for creating a new Collector. diff --git a/service/telemetry.go b/service/telemetry.go index a6d8af64208..87fe150f94d 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -20,7 +20,6 @@ import ( "fmt" "net/http" "strings" - "sync" "unicode" ocprom "contrib.go.opencensus.io/exporter/prometheus" @@ -69,8 +68,7 @@ type telemetryInitializer struct { ocRegistry *ocmetric.Registry mp metric.MeterProvider - server *http.Server - doInitOnce sync.Once + server *http.Server } func newColTelemetry(registry *featuregate.Registry) *telemetryInitializer { @@ -81,33 +79,15 @@ func newColTelemetry(registry *featuregate.Registry) *telemetryInitializer { } func (tel *telemetryInitializer) init(buildInfo component.BuildInfo, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { - var err error - tel.doInitOnce.Do( - func() { - if cfg.Metrics.Level == configtelemetry.LevelNone || cfg.Metrics.Address == "" { - logger.Info( - "Skipping telemetry setup.", - zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address), - zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()), - ) - return - } - - err = tel.initOnce(buildInfo, logger, cfg) - if err == nil { - go func() { - if serveErr := tel.server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { - asyncErrorChannel <- serveErr - } - }() - } - - }, - ) - return err -} + if cfg.Metrics.Level == configtelemetry.LevelNone || cfg.Metrics.Address == "" { + logger.Info( + "Skipping telemetry setup.", + zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address), + zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()), + ) + return nil + } -func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger *zap.Logger, cfg telemetry.Config) error { logger.Info("Setting up own telemetry...") // Construct telemetry attributes from build info and config's resource attributes. @@ -152,6 +132,12 @@ func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger Handler: mux, } + go func() { + if serveErr := tel.server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { + asyncErrorChannel <- serveErr + } + }() + return nil } diff --git a/service/telemetry_test.go b/service/telemetry_test.go index 2df182c3ba0..8bd2ae28f95 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -32,8 +32,10 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "go.opentelemetry.io/collector/service/telemetry" ) @@ -152,9 +154,13 @@ func TestTelemetryInit(t *testing.T) { Resource: map[string]*string{ semconv.AttributeServiceInstanceID: &testInstanceID, }, + Metrics: telemetry.MetricsConfig{ + Level: configtelemetry.LevelDetailed, + Address: testutil.GetAvailableLocalAddress(t), + }, } - err := tel.initOnce(buildInfo, zap.NewNop(), cfg) + err := tel.init(buildInfo, zap.NewNop(), cfg, make(chan error)) require.NoError(t, err) defer func() { require.NoError(t, tel.shutdown())