Skip to content

Commit

Permalink
Move shutdown telemetry to Service (open-telemetry#6765)
Browse files Browse the repository at this point in the history
This removes any "private" dependency between Collector and Service and will allow to move collector to otelcol per the deprecation notice.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Dec 13, 2022
1 parent 7cb2d76 commit 489e478
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 135 deletions.
11 changes: 11 additions & 0 deletions .chloggen/shutdowntel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Shutdown internal telemetry with the Service (every time config changes).

# One or more tracking issues or pull requests related to the change
issues: [5564]
27 changes: 4 additions & 23 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/grpclog"
)

Expand Down Expand Up @@ -75,8 +74,7 @@ func (s State) String() string {
// Collector represents a server providing the OpenTelemetry Collector service.
// Deprecated: [v0.67.0] use otelcol.Collector
type Collector struct {
set CollectorSettings
telemetry *telemetryInitializer
set CollectorSettings

service *service
state *atomic.Int32
Expand All @@ -98,7 +96,6 @@ func New(set CollectorSettings) (*Collector, error) {

return &Collector{
set: set,
telemetry: newColTelemetry(featuregate.GetRegistry()),
state: atomic.NewInt32(int32(StateStarting)),
shutdownChan: make(chan struct{}),
// Per signal.Notify documentation, a size of the channel equaled with
Expand Down Expand Up @@ -145,7 +142,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Config: cfg,
AsyncErrorChannel: col.asyncErrorChannel,
LoggingOptions: col.set.LoggingOptions,
telemetry: col.telemetry,
})
if err != nil {
return err
Expand All @@ -156,7 +152,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

if err = col.service.Start(ctx); err != nil {
return multierr.Append(err, col.shutdownServiceAndTelemetry(ctx))
return multierr.Combine(err, col.service.Shutdown(ctx))
}
col.setCollectorState(StateRunning)
return nil
Expand Down Expand Up @@ -238,28 +234,13 @@ func (col *Collector) shutdown(ctx context.Context) error {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
}

errs = multierr.Append(errs, col.shutdownServiceAndTelemetry(ctx))

col.setCollectorState(StateClosed)

return errs
}

// shutdownServiceAndTelemetry bundles shutting down the service and telemetryInitializer.
// Returned error will be in multierr form and wrapped.
func (col *Collector) shutdownServiceAndTelemetry(ctx context.Context) error {
var errs error

// shutdown service
if err := col.service.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown service after error: %w", err))
}

// TODO: Move this as part of the service shutdown.
// shutdown telemetryInitializer
if err := col.telemetry.shutdown(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
}
col.setCollectorState(StateClosed)

return errs
}

Expand Down
22 changes: 0 additions & 22 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
)

func TestStateString(t *testing.T) {
Expand Down Expand Up @@ -243,26 +241,6 @@ func TestCollectorStartInvalidConfig(t *testing.T) {
assert.Error(t, col.Run(context.Background()))
}

func TestCollectorStartWithOpenCensusMetrics(t *testing.T) {
for _, tc := range ownMetricsTestCases() {
t.Run(tc.name, func(t *testing.T) {
testCollectorStartHelper(t, newColTelemetry(featuregate.NewRegistry()), tc)
})
}
}

func TestCollectorStartWithOpenTelemetryMetrics(t *testing.T) {
for _, tc := range ownMetricsTestCases() {
t.Run(tc.name, func(t *testing.T) {
registry := featuregate.NewRegistry()
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
colTel := newColTelemetry(registry)
require.NoError(t, colTel.registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true}))
testCollectorStartHelper(t, colTel, tc)
})
}
}

func TestCollectorStartWithTraceContextPropagation(t *testing.T) {
tests := []struct {
file string
Expand Down
15 changes: 10 additions & 5 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/telemetry"
Expand All @@ -41,6 +42,10 @@ type service struct {
}

func newService(set *settings) (*service, error) {
reg := set.registry
if reg == nil {
reg = featuregate.GetRegistry()
}
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
Expand All @@ -49,12 +54,10 @@ func newService(set *settings) (*service, error) {
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
},
telemetryInitializer: set.telemetry,
telemetryInitializer: newColTelemetry(reg),
}

var err error
srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{
ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry)
srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
Expand Down Expand Up @@ -131,7 +134,9 @@ func (srv *service) Shutdown(ctx context.Context) error {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
}

// TODO: Shutdown MeterProvider.
if err := srv.telemetryInitializer.shutdown(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
}
return errs
}

Expand Down
86 changes: 32 additions & 54 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package service
import (
"bufio"
"context"
"fmt"
"net/http"
"path/filepath"
"strings"
Expand Down Expand Up @@ -207,44 +206,34 @@ func TestServiceTelemetryCleanupOnError(t *testing.T) {
invalidCfg, err := invalidProvider.Get(context.Background(), factories)
require.NoError(t, err)

// Read valid yaml config from file
validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)
validCfg, err := validProvider.Get(context.Background(), factories)
require.NoError(t, err)

// Create a service with an invalid config and expect an error
telemetryOne := newColTelemetry(featuregate.NewRegistry())
_, err = newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: invalidCfg,
telemetry: telemetryOne,
})
require.Error(t, err)

// Read valid yaml config from file
validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)
validCfg, err := validProvider.Get(context.Background(), factories)
require.NoError(t, err)

// Create a service with a valid config and expect no error
telemetryTwo := newColTelemetry(featuregate.NewRegistry())
srv, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: validCfg,
telemetry: telemetryTwo,
})
require.NoError(t, err)

// For safety ensure everything is cleaned up
t.Cleanup(func() {
assert.NoError(t, telemetryOne.shutdown())
assert.NoError(t, telemetryTwo.shutdown())
assert.NoError(t, srv.Shutdown(context.Background()))
})
assert.NoError(t, srv.Shutdown(context.Background()))
}

func TestServiceTelemetryWithOpenCensusMetrics(t *testing.T) {
for _, tc := range ownMetricsTestCases() {
t.Run(tc.name, func(t *testing.T) {
testCollectorStartHelper(t, newColTelemetry(featuregate.NewRegistry()), tc)
testCollectorStartHelper(t, featuregate.NewRegistry(), tc)
})
}
}
Expand All @@ -256,12 +245,12 @@ func TestServiceTelemetryWithOpenTelemetryMetrics(t *testing.T) {
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
colTel := newColTelemetry(registry)
require.NoError(t, colTel.registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true}))
testCollectorStartHelper(t, colTel, tc)
testCollectorStartHelper(t, registry, tc)
})
}
}

func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc ownMetricsTestCase) {
func testCollectorStartHelper(t *testing.T, reg *featuregate.Registry, tc ownMetricsTestCase) {
factories, err := componenttest.NopFactories()
zpagesExt := zpagesextension.NewFactory()
factories.Extensions[zpagesExt.Type()] = zpagesExt
Expand Down Expand Up @@ -301,27 +290,29 @@ func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc
cfg, err := cfgProvider.Get(context.Background(), factories)
require.NoError(t, err)

srv, err := newService(&settings{
BuildInfo: component.BuildInfo{Version: "test version"},
Factories: factories,
Config: cfg,
LoggingOptions: []zap.Option{zap.Hooks(hook)},
telemetry: telemetry,
})
require.NoError(t, err)

require.NoError(t, srv.Start(context.Background()))
// Sleep for 1 second to ensure the http server is started.
time.Sleep(1 * time.Second)
assert.True(t, loggingHookCalled)
assertMetrics(t, metricsAddr, tc.expectedLabels)
assertZPages(t, zpagesAddr)
require.NoError(t, srv.Shutdown(context.Background()))
require.NoError(t, telemetry.shutdown())
// Create a service, check for metrics, shutdown and repeat to ensure that telemetry can be started/shutdown and started again.
for i := 0; i < 2; i++ {
srv, err := newService(&settings{
BuildInfo: component.BuildInfo{Version: "test version"},
Factories: factories,
Config: cfg,
LoggingOptions: []zap.Option{zap.Hooks(hook)},
registry: reg,
})
require.NoError(t, err)

require.NoError(t, srv.Start(context.Background()))
// Sleep for 1 second to ensure the http server is started.
time.Sleep(1 * time.Second)
assert.True(t, loggingHookCalled)
assertMetrics(t, metricsAddr, tc.expectedLabels)
assertZPages(t, zpagesAddr)
require.NoError(t, srv.Shutdown(context.Background()))
}
}

// TestServiceTelemetryReusable tests that a single telemetryInitializer can be reused in multiple services
func TestServiceTelemetryReusable(t *testing.T) {
// TestServiceTelemetryRestart tests that the service correctly restarts the telemetry server.
func TestServiceTelemetryRestart(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)

Expand All @@ -332,22 +323,15 @@ func TestServiceTelemetryReusable(t *testing.T) {
require.NoError(t, err)

// Create a service
telemetry := newColTelemetry(featuregate.NewRegistry())
// For safety ensure everything is cleaned up
t.Cleanup(func() {
assert.NoError(t, telemetry.shutdown())
})

srvOne, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: validCfg,
telemetry: telemetry,
})
require.NoError(t, err)

// URL of the telemetry service metrics endpoint
telemetryURL := fmt.Sprintf("http://%s/metrics", telemetry.server.Addr)
telemetryURL := "http://localhost:8888/metrics"

// Start the service
require.NoError(t, srvOne.Start(context.Background()))
Expand All @@ -368,7 +352,6 @@ func TestServiceTelemetryReusable(t *testing.T) {
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: validCfg,
telemetry: telemetry,
})
require.NoError(t, err)

Expand All @@ -392,17 +375,12 @@ func createExampleService(t *testing.T, factories component.Factories) *service
cfg, err := prov.Get(context.Background(), factories)
require.NoError(t, err)

telemetry := newColTelemetry(featuregate.NewRegistry())
srv, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: cfg,
telemetry: telemetry,
})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, telemetry.shutdown())
})
return srv
}

Expand Down
3 changes: 2 additions & 1 deletion service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
)

// settings holds configuration for building a new service.
Expand All @@ -38,7 +39,7 @@ type settings struct {
LoggingOptions []zap.Option

// For testing purpose only.
telemetry *telemetryInitializer
registry *featuregate.Registry
}

// CollectorSettings holds configuration for creating a new Collector.
Expand Down
Loading

0 comments on commit 489e478

Please sign in to comment.