From 1bed5983598313c8c46514aa501bde25e2870947 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 10 Jun 2022 13:00:10 -0700 Subject: [PATCH] Refactor pipelines builder, fix some issues * Unconfigured receivers are not identified, this was not a real problem in final binaries since the validation of the config catch this. * Allow configurations to contain "unused" receivers. Receivers that are configured but not used in any pipeline, this was possible already for exporters and processors. * Remove the enforcement/check that Receiver factories create the same instance for the same config. Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 5 +- service/internal/builder/exporters_builder.go | 188 -------- .../builder/exporters_builder_test.go | 114 ----- service/internal/builder/factories_test.go | 77 --- service/internal/builder/pipelines_builder.go | 283 ----------- .../builder/pipelines_builder_test.go | 230 --------- service/internal/builder/receivers_builder.go | 293 ----------- .../builder/receivers_builder_test.go | 284 ----------- .../builder/testdata/pipelines_builder.yaml | 40 -- .../builder/testdata/unused_receiver.yaml | 12 - .../{builder => pipelines}/capabilities.go | 2 +- .../capabilities_test.go | 2 +- service/internal/pipelines/pipelines.go | 456 ++++++++++++++++-- service/internal/pipelines/pipelines_test.go | 215 ++++++++- service/service.go | 4 +- 15 files changed, 611 insertions(+), 1594 deletions(-) delete mode 100644 service/internal/builder/exporters_builder.go delete mode 100644 service/internal/builder/exporters_builder_test.go delete mode 100644 service/internal/builder/factories_test.go delete mode 100644 service/internal/builder/pipelines_builder.go delete mode 100644 service/internal/builder/pipelines_builder_test.go delete mode 100644 service/internal/builder/receivers_builder.go delete mode 100644 service/internal/builder/receivers_builder_test.go delete mode 100644 service/internal/builder/testdata/pipelines_builder.yaml delete mode 100644 service/internal/builder/testdata/unused_receiver.yaml rename service/internal/{builder => pipelines}/capabilities.go (94%) rename service/internal/{builder => pipelines}/capabilities_test.go (99%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b18166537b..6d913e63c1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,7 +42,10 @@ - Update sum field of exponential histograms to make it optional (#5530) - Remove redundant extension shutdown call (#5532) - +- Refactor pipelines builder, fix some issues (#5512) + - Unconfigured receivers are not identified, this was not a real problem in final binaries since the validation of the config catch this. + - Allow configurations to contain "unused" receivers. Receivers that are configured but not used in any pipeline, this was possible already for exporters and processors. + - Remove the enforcement/check that Receiver factories create the same instance for the same config. ## v0.53.0 Beta ### 🛑 Breaking changes 🛑 diff --git a/service/internal/builder/exporters_builder.go b/service/internal/builder/exporters_builder.go deleted file mode 100644 index de0e6ebb46a..00000000000 --- a/service/internal/builder/exporters_builder.go +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder // import "go.opentelemetry.io/collector/service/internal/builder" - -import ( - "context" - "errors" - "fmt" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/service/internal/components" -) - -// BuiltExporters is a map of exporters created from exporter configs. -type BuiltExporters struct { - settings component.TelemetrySettings - exporters map[config.DataType]map[config.ComponentID]component.Exporter -} - -// StartAll starts all exporters. -func (exps BuiltExporters) StartAll(ctx context.Context, host component.Host) error { - for dt, expByID := range exps.exporters { - for expID, exp := range expByID { - expLogger := exporterLogger(exps.settings.Logger, expID, dt) - expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { - return err - } - expLogger.Info("Exporter started.") - } - } - return nil -} - -// ShutdownAll stops all exporters. -func (exps BuiltExporters) ShutdownAll(ctx context.Context) error { - var errs error - for _, expByID := range exps.exporters { - for _, exp := range expByID { - errs = multierr.Append(errs, exp.Shutdown(ctx)) - } - } - return errs -} - -func (exps BuiltExporters) ToMapByDataType() map[config.DataType]map[config.ComponentID]component.Exporter { - exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter) - - exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter, len(exps.exporters[config.TracesDataType])) - exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter, len(exps.exporters[config.MetricsDataType])) - exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter, len(exps.exporters[config.LogsDataType])) - - for dt, expByID := range exps.exporters { - for expID, exp := range expByID { - exportersMap[dt][expID] = exp - } - } - - return exportersMap -} - -// BuildExporters builds Exporters from config. -func BuildExporters( - ctx context.Context, - settings component.TelemetrySettings, - buildInfo component.BuildInfo, - cfg *config.Config, - factories map[config.Type]component.ExporterFactory, -) (*BuiltExporters, error) { - exps := &BuiltExporters{ - settings: settings, - exporters: make(map[config.DataType]map[config.ComponentID]component.Exporter), - } - - // Go over all pipelines. The data type of the pipeline defines what data type - // each exporter is expected to receive. - - // Iterate over pipelines. - for pipelineID, pipeline := range cfg.Service.Pipelines { - dt := pipelineID.Type() - if _, ok := exps.exporters[dt]; !ok { - exps.exporters[dt] = make(map[config.ComponentID]component.Exporter) - } - expByID := exps.exporters[dt] - - // Iterate over all exporters for this pipeline. - for _, expID := range pipeline.Exporters { - // If already created an exporter for this [DataType, ComponentID] nothing to do, will reuse this instance. - if _, ok := expByID[expID]; ok { - continue - } - - set := component.ExporterCreateSettings{ - TelemetrySettings: settings, - BuildInfo: buildInfo, - } - set.TelemetrySettings.Logger = exporterLogger(settings.Logger, expID, dt) - - expCfg, existsCfg := cfg.Exporters[expID] - if !existsCfg { - return nil, fmt.Errorf("exporter %q is not configured", expID) - } - - factory, existsFactory := factories[expID.Type()] - if !existsFactory { - return nil, fmt.Errorf("exporter factory not found for type: %s", expID.Type()) - } - - exp, err := buildExporter(ctx, factory, set, expCfg, pipelineID) - if err != nil { - return nil, err - } - - expByID[expID] = exp - } - } - return exps, nil -} - -func buildExporter( - ctx context.Context, - factory component.ExporterFactory, - set component.ExporterCreateSettings, - cfg config.Exporter, - pipelineID config.ComponentID, -) (component.Exporter, error) { - var err error - var exporter component.Exporter - switch pipelineID.Type() { - case config.TracesDataType: - exporter, err = factory.CreateTracesExporter(ctx, set, cfg) - - case config.MetricsDataType: - exporter, err = factory.CreateMetricsExporter(ctx, set, cfg) - - case config.LogsDataType: - exporter, err = factory.CreateLogsExporter(ctx, set, cfg) - - default: - // Could not create because this exporter does not support this data type. - return nil, exporterTypeMismatchErr(cfg, pipelineID) - } - - if err != nil { - if errors.Is(err, component.ErrDataTypeIsNotSupported) { - // Could not create because this exporter does not support this data type. - return nil, exporterTypeMismatchErr(cfg, pipelineID) - } - return nil, fmt.Errorf("error creating %v exporter: %w", cfg.ID(), err) - } - - set.Logger.Info("Exporter was built.") - - return exporter, nil -} - -func exporterTypeMismatchErr( - config config.Exporter, - pipelineID config.ComponentID, -) error { - return fmt.Errorf( - "pipeline %q of data type %q has an exporter %v, which does not support that data type", - pipelineID, pipelineID.Type(), config.ID(), - ) -} - -func exporterLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger { - return logger.With( - zap.String(components.ZapKindKey, components.ZapKindExporter), - zap.String(components.ZapDataTypeKey, string(dt)), - zap.String(components.ZapNameKey, id.String())) -} diff --git a/service/internal/builder/exporters_builder_test.go b/service/internal/builder/exporters_builder_test.go deleted file mode 100644 index 64ecd0f7bbd..00000000000 --- a/service/internal/builder/exporters_builder_test.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" -) - -func TestBuildExporters(t *testing.T) { - factories, err := componenttest.NopFactories() - assert.NoError(t, err) - - cfg := &config.Config{ - Exporters: map[config.ComponentID]config.Exporter{ - config.NewComponentID("nop"): factories.Exporters["nop"].CreateDefaultConfig(), - }, - - Service: config.Service{ - Pipelines: map[config.ComponentID]*config.Pipeline{ - config.NewComponentID("traces"): { - Exporters: []config.ComponentID{config.NewComponentID("nop")}, - }, - config.NewComponentID("metrics"): { - Exporters: []config.ComponentID{config.NewComponentID("nop")}, - }, - config.NewComponentID("logs"): { - Exporters: []config.ComponentID{config.NewComponentID("nop")}, - }, - }, - }, - } - - exporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - exps := exporters.ToMapByDataType() - require.Len(t, exps, 3) - assert.NotNil(t, exps[config.TracesDataType][config.NewComponentID("nop")]) - assert.NotNil(t, exps[config.MetricsDataType][config.NewComponentID("nop")]) - assert.NotNil(t, exps[config.LogsDataType][config.NewComponentID("nop")]) - - // Ensure it can be started. - assert.NoError(t, exporters.StartAll(context.Background(), componenttest.NewNopHost())) - - // Ensure it can be stopped. - assert.NoError(t, exporters.ShutdownAll(context.Background())) - - // Remove the pipeline so that the exporter is not attached to any pipeline. - // This should result in creating an exporter that has none of consumption - // functions set. - cfg.Service.Pipelines = map[config.ComponentID]*config.Pipeline{} - exporters, err = BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - exps = exporters.ToMapByDataType() - require.Len(t, exps, 3) - assert.Len(t, exps[config.TracesDataType], 0) - assert.Len(t, exps[config.MetricsDataType], 0) - assert.Len(t, exps[config.LogsDataType], 0) -} - -func TestBuildExportersStartStopAll(t *testing.T) { - traceExporter := &testcomponents.ExampleExporter{} - metricExporter := &testcomponents.ExampleExporter{} - logsExporter := &testcomponents.ExampleExporter{} - exps := &BuiltExporters{ - settings: componenttest.NewNopTelemetrySettings(), - exporters: map[config.DataType]map[config.ComponentID]component.Exporter{ - config.TracesDataType: { - config.NewComponentID("example"): traceExporter, - }, - config.MetricsDataType: { - config.NewComponentID("example"): metricExporter, - }, - config.LogsDataType: { - config.NewComponentID("example"): logsExporter, - }, - }, - } - assert.False(t, traceExporter.Started) - assert.False(t, metricExporter.Started) - assert.False(t, logsExporter.Started) - - assert.NoError(t, exps.StartAll(context.Background(), componenttest.NewNopHost())) - assert.True(t, traceExporter.Started) - assert.True(t, metricExporter.Started) - assert.True(t, logsExporter.Started) - - assert.NoError(t, exps.ShutdownAll(context.Background())) - assert.True(t, traceExporter.Stopped) - assert.True(t, metricExporter.Stopped) - assert.True(t, logsExporter.Stopped) -} diff --git a/service/internal/builder/factories_test.go b/service/internal/builder/factories_test.go deleted file mode 100644 index 988997d838e..00000000000 --- a/service/internal/builder/factories_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" -) - -func createTestFactories() component.Factories { - exampleReceiverFactory := testcomponents.ExampleReceiverFactory - exampleProcessorFactory := testcomponents.ExampleProcessorFactory - exampleExporterFactory := testcomponents.ExampleExporterFactory - badReceiverFactory := newBadReceiverFactory() - badProcessorFactory := newBadProcessorFactory() - badExporterFactory := newBadExporterFactory() - - factories := component.Factories{ - Receivers: map[config.Type]component.ReceiverFactory{ - exampleReceiverFactory.Type(): exampleReceiverFactory, - badReceiverFactory.Type(): badReceiverFactory, - }, - Processors: map[config.Type]component.ProcessorFactory{ - exampleProcessorFactory.Type(): exampleProcessorFactory, - badProcessorFactory.Type(): badProcessorFactory, - }, - Exporters: map[config.Type]component.ExporterFactory{ - exampleExporterFactory.Type(): exampleExporterFactory, - badExporterFactory.Type(): badExporterFactory, - }, - } - - return factories -} - -func newBadReceiverFactory() component.ReceiverFactory { - return component.NewReceiverFactory("bf", func() config.Receiver { - return &struct { - config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - }{ - ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("bf")), - } - }) -} - -func newBadProcessorFactory() component.ProcessorFactory { - return component.NewProcessorFactory("bf", func() config.Processor { - return &struct { - config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - }{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("bf")), - } - }) -} - -func newBadExporterFactory() component.ExporterFactory { - return component.NewExporterFactory("bf", func() config.Exporter { - return &struct { - config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - }{ - ExporterSettings: config.NewExporterSettings(config.NewComponentID("bf")), - } - }) -} diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go deleted file mode 100644 index 22fad0a23d5..00000000000 --- a/service/internal/builder/pipelines_builder.go +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder // import "go.opentelemetry.io/collector/service/internal/builder" - -import ( - "context" - "fmt" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/fanoutconsumer" -) - -// builtPipeline is a pipeline that is built based on a config. -// It can have a trace and/or a metrics consumer (the consumer is either the first -// processor in the pipeline or the exporter if pipeline has no processors). -type builtPipeline struct { - logger *zap.Logger - firstTC consumer.Traces - firstMC consumer.Metrics - firstLC consumer.Logs - - // Config is the configuration of this Pipeline. - Config *config.Pipeline - // MutatesData is set to true if any processors in the pipeline - // can mutate the TraceData or MetricsData input argument. - MutatesData bool - - Processors []component.Processor -} - -// BuiltPipelines is a map of build pipelines created from pipeline configs. -type BuiltPipelines map[config.ComponentID]*builtPipeline - -func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Host) error { - for _, bp := range bps { - bp.logger.Info("Pipeline is starting...") - hostWrapper := components.NewHostWrapper(host, bp.logger) - // Start in reverse order, starting from the back of processors pipeline. - // This is important so that processors that are earlier in the pipeline and - // reference processors that are later in the pipeline do not start sending - // data to later pipelines which are not yet started. - for i := len(bp.Processors) - 1; i >= 0; i-- { - if err := bp.Processors[i].Start(ctx, hostWrapper); err != nil { - return err - } - } - bp.logger.Info("Pipeline is started.") - } - return nil -} - -func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error { - var errs error - for _, bp := range bps { - bp.logger.Info("Pipeline is shutting down...") - for _, p := range bp.Processors { - errs = multierr.Append(errs, p.Shutdown(ctx)) - } - bp.logger.Info("Pipeline is shutdown.") - } - - return errs -} - -// pipelinesBuilder builds Pipelines from config. -type pipelinesBuilder struct { - settings component.TelemetrySettings - buildInfo component.BuildInfo - config *config.Config - exporters *BuiltExporters - factories map[config.Type]component.ProcessorFactory -} - -// BuildPipelines builds pipeline processors from config. Requires exporters to be already -// built via BuildExporters. -func BuildPipelines( - settings component.TelemetrySettings, - buildInfo component.BuildInfo, - config *config.Config, - exporters *BuiltExporters, - factories map[config.Type]component.ProcessorFactory, -) (BuiltPipelines, error) { - pb := &pipelinesBuilder{settings, buildInfo, config, exporters, factories} - - pipelineProcessors := make(BuiltPipelines) - for pipelineID, pipeline := range pb.config.Service.Pipelines { - bp, err := pb.buildPipeline(context.Background(), pipelineID, pipeline) - if err != nil { - return nil, err - } - pipelineProcessors[pipelineID] = bp - } - - return pipelineProcessors, nil -} - -// Builds a pipeline of processors. Returns the first processor in the pipeline. -// The last processor in the pipeline will be plugged to fan out the data into exporters -// that are configured for this pipeline. -func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config.ComponentID, pipelineCfg *config.Pipeline) (*builtPipeline, error) { - - // BuildProcessors the pipeline backwards. - - // First create a consumer junction point that fans out the data to all exporters. - var tc consumer.Traces - var mc consumer.Metrics - var lc consumer.Logs - - // Take into consideration the Capabilities for the exporter as well. - mutatesConsumedData := false - switch pipelineID.Type() { - case config.TracesDataType: - tc = pb.buildFanoutExportersTracesConsumer(pipelineCfg.Exporters) - mutatesConsumedData = tc.Capabilities().MutatesData - case config.MetricsDataType: - mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters) - mutatesConsumedData = mc.Capabilities().MutatesData - case config.LogsDataType: - lc = pb.buildFanoutExportersLogsConsumer(pipelineCfg.Exporters) - mutatesConsumedData = lc.Capabilities().MutatesData - } - - processors := make([]component.Processor, len(pipelineCfg.Processors)) - - // Now build the processors backwards, starting from the last one. - // The last processor points to consumer which fans out to exporters, then - // the processor itself becomes a consumer for the one that precedes it in - // in the pipeline and so on. - for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- { - procID := pipelineCfg.Processors[i] - - procCfg, existsCfg := pb.config.Processors[procID] - if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", procID) - } - - factory, existsFactory := pb.factories[procID.Type()] - if !existsFactory { - return nil, fmt.Errorf("processor factory for type %q is not configured", procID.Type()) - } - - // This processor must point to the next consumer and then - // it becomes the next for the previous one (previous in the pipeline, - // which we will build in the next loop iteration). - var err error - set := component.ProcessorCreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: pb.settings.Logger.With( - zap.String(components.ZapKindKey, components.ZapKindProcessor), - zap.String(components.ZapNameKey, procID.String()), - zap.String(components.ZapKindPipeline, pipelineID.String())), - TracerProvider: pb.settings.TracerProvider, - MeterProvider: pb.settings.MeterProvider, - MetricsLevel: pb.config.Telemetry.Metrics.Level, - }, - BuildInfo: pb.buildInfo, - } - - switch pipelineID.Type() { - case config.TracesDataType: - var proc component.TracesProcessor - if proc, err = factory.CreateTracesProcessor(ctx, set, procCfg, tc); err != nil { - return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", procID, pipelineID, err) - } - // Check if the factory really created the processor. - if proc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procID) - } - mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData - processors[i] = proc - tc = proc - case config.MetricsDataType: - var proc component.MetricsProcessor - if proc, err = factory.CreateMetricsProcessor(ctx, set, procCfg, mc); err != nil { - return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", procID, pipelineID, err) - } - // Check if the factory really created the processor. - if proc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procID) - } - mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData - processors[i] = proc - mc = proc - - case config.LogsDataType: - var proc component.LogsProcessor - if proc, err = factory.CreateLogsProcessor(ctx, set, procCfg, lc); err != nil { - return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", procID, pipelineID, err) - } - // Check if the factory really created the processor. - if proc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procID) - } - mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData - processors[i] = proc - lc = proc - - default: - return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %s is not supported", - procID, pipelineID, pipelineID.Type()) - } - } - - pipelineLogger := pb.settings.Logger.With(zap.String(components.ZapKindKey, components.ZapKindPipeline), - zap.String(components.ZapNameKey, pipelineID.String())) - pipelineLogger.Info("Pipeline was built.") - - // Some consumers may not correctly implement the Capabilities, - // and ignore the next consumer when calculated the Capabilities. - // Because of this wrap the first consumer if any consumers in the pipeline - // mutate the data and the first says that it doesn't. - if tc != nil { - tc = wrapTraces(tc, consumer.Capabilities{MutatesData: mutatesConsumedData}) - } - if mc != nil { - mc = wrapMetrics(mc, consumer.Capabilities{MutatesData: mutatesConsumedData}) - } - if lc != nil { - lc = wrapLogs(lc, consumer.Capabilities{MutatesData: mutatesConsumedData}) - } - bp := &builtPipeline{ - logger: pipelineLogger, - firstTC: tc, - firstMC: mc, - firstLC: lc, - Config: pipelineCfg, - MutatesData: mutatesConsumedData, - Processors: processors, - } - - return bp, nil -} - -func (pb *pipelinesBuilder) buildFanoutExportersTracesConsumer(exporterIDs []config.ComponentID) consumer.Traces { - tracesExporters := pb.exporters.exporters[config.TracesDataType] - var exporters []consumer.Traces - for _, expID := range exporterIDs { - exporters = append(exporters, tracesExporters[expID].(consumer.Traces)) - } - - // Create a junction point that fans out to all exporters. - return fanoutconsumer.NewTraces(exporters) -} - -func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterIDs []config.ComponentID) consumer.Metrics { - metricsExporters := pb.exporters.exporters[config.MetricsDataType] - var exporters []consumer.Metrics - for _, expID := range exporterIDs { - exporters = append(exporters, metricsExporters[expID].(consumer.Metrics)) - } - - // Create a junction point that fans out to all exporters. - return fanoutconsumer.NewMetrics(exporters) -} - -func (pb *pipelinesBuilder) buildFanoutExportersLogsConsumer(exporterIDs []config.ComponentID) consumer.Logs { - logsExporters := pb.exporters.exporters[config.LogsDataType] - var exporters []consumer.Logs - for _, expID := range exporterIDs { - exporters = append(exporters, logsExporters[expID].(consumer.Logs)) - } - - // Create a junction point that fans out to all exporters. - return fanoutconsumer.NewLogs(exporters) -} diff --git a/service/internal/builder/pipelines_builder_test.go b/service/internal/builder/pipelines_builder_test.go deleted file mode 100644 index 596ab96454d..00000000000 --- a/service/internal/builder/pipelines_builder_test.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" - "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/service/servicetest" -) - -func TestBuildPipelines(t *testing.T) { - tests := []struct { - name string - pipelineID config.ComponentID - exporterNames []config.ComponentID - }{ - { - name: "one-exporter", - pipelineID: config.NewComponentID("traces"), - exporterNames: []config.ComponentID{config.NewComponentID("exampleexporter")}, - }, - { - name: "multi-exporter", - pipelineID: config.NewComponentIDWithName("traces", "2"), - exporterNames: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testPipeline(t, test.pipelineID, test.exporterNames) - }) - } -} - -func createExampleConfig(dataType config.DataType) *config.Config { - exampleReceiverFactory := testcomponents.ExampleReceiverFactory - exampleProcessorFactory := testcomponents.ExampleProcessorFactory - exampleExporterFactory := testcomponents.ExampleExporterFactory - - cfg := &config.Config{ - Receivers: map[config.ComponentID]config.Receiver{ - config.NewComponentID(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(), - }, - Processors: map[config.ComponentID]config.Processor{ - config.NewComponentID(exampleProcessorFactory.Type()): exampleProcessorFactory.CreateDefaultConfig(), - }, - Exporters: map[config.ComponentID]config.Exporter{ - config.NewComponentID(exampleExporterFactory.Type()): exampleExporterFactory.CreateDefaultConfig(), - }, - Service: config.Service{ - Pipelines: map[config.ComponentID]*config.Pipeline{ - config.NewComponentID(dataType): { - Receivers: []config.ComponentID{config.NewComponentID(exampleReceiverFactory.Type())}, - Processors: []config.ComponentID{config.NewComponentID(exampleProcessorFactory.Type())}, - Exporters: []config.ComponentID{config.NewComponentID(exampleExporterFactory.Type())}, - }, - }, - }, - } - return cfg -} - -func TestBuildPipelines_BuildVarious(t *testing.T) { - - factories := createTestFactories() - - tests := []struct { - dataType config.DataType - shouldFail bool - }{ - { - dataType: config.LogsDataType, - shouldFail: false, - }, - { - dataType: "nosuchdatatype", - shouldFail: true, - }, - } - - for _, test := range tests { - t.Run(string(test.dataType), func(t *testing.T) { - cfg := createExampleConfig(test.dataType) - - // BuildProcessors the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - if test.shouldFail { - assert.Error(t, err) - return - } - - require.NoError(t, err) - require.Len(t, allExporters.ToMapByDataType()[config.LogsDataType], 1) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - - assert.NoError(t, err) - require.NotNil(t, pipelineProcessors) - - err = pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost()) - assert.NoError(t, err) - - processor := pipelineProcessors[config.NewComponentID(test.dataType)] - - // Ensure pipeline has its fields correctly populated. - require.NotNil(t, processor) - assert.Nil(t, processor.firstTC) - assert.Nil(t, processor.firstMC) - assert.NotNil(t, processor.firstLC) - - // Compose the list of created exporters. - exporterIDs := []config.ComponentID{config.NewComponentID("exampleexporter")} - var exporters []component.Exporter - for _, expID := range exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[test.dataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send Logs via processor and verify that all exporters of the pipeline receive it. - - // First check that there are no logs in the exporters yet. - var exporterConsumers []*testcomponents.ExampleExporter - for _, exporter := range exporters { - expConsumer := exporter.(*testcomponents.ExampleExporter) - exporterConsumers = append(exporterConsumers, expConsumer) - require.Equal(t, len(expConsumer.Logs), 0) - } - - // Send one custom data. - log := plog.Logs{} - require.NoError(t, processor.firstLC.ConsumeLogs(context.Background(), log)) - - // Now verify received data. - for _, expConsumer := range exporterConsumers { - // Check that the trace is received by exporter. - require.Equal(t, 1, len(expConsumer.Logs)) - - // Verify that span is successfully delivered. - assert.EqualValues(t, log, expConsumer.Logs[0]) - } - - err = pipelineProcessors.ShutdownProcessors(context.Background()) - assert.NoError(t, err) - }) - } -} - -func testPipeline(t *testing.T, pipelineID config.ComponentID, exporterIDs []config.ComponentID) { - factories, err := testcomponents.ExampleComponents() - assert.NoError(t, err) - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "pipelines_builder.yaml"), factories) - // Unmarshal the config - require.Nil(t, err) - - // BuildProcessors the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - - assert.NoError(t, err) - require.NotNil(t, pipelineProcessors) - - assert.NoError(t, pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost())) - - processor := pipelineProcessors[pipelineID] - - // Ensure pipeline has its fields correctly populated. - require.NotNil(t, processor) - assert.NotNil(t, processor.firstTC) - assert.Nil(t, processor.firstMC) - - // Compose the list of created exporters. - var exporters []component.Exporter - for _, expID := range exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[config.TracesDataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send TraceData via processor and verify that all exporters of the pipeline receive it. - - // First check that there are no traces in the exporters yet. - var exporterConsumers []*testcomponents.ExampleExporter - for _, exporter := range exporters { - expConsumer := exporter.(*testcomponents.ExampleExporter) - exporterConsumers = append(exporterConsumers, expConsumer) - require.Equal(t, len(expConsumer.Traces), 0) - } - - td := testdata.GenerateTracesOneSpan() - require.NoError(t, processor.firstTC.ConsumeTraces(context.Background(), td)) - - // Now verify received data. - for _, expConsumer := range exporterConsumers { - // Check that the trace is received by exporter. - require.Equal(t, 1, len(expConsumer.Traces)) - - // Verify that span is successfully delivered. - assert.EqualValues(t, td, expConsumer.Traces[0]) - } - - err = pipelineProcessors.ShutdownProcessors(context.Background()) - assert.NoError(t, err) -} diff --git a/service/internal/builder/receivers_builder.go b/service/internal/builder/receivers_builder.go deleted file mode 100644 index 4db4cab8665..00000000000 --- a/service/internal/builder/receivers_builder.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder // import "go.opentelemetry.io/collector/service/internal/builder" - -import ( - "context" - "errors" - "fmt" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/fanoutconsumer" -) - -var errUnusedReceiver = errors.New("receiver defined but not used by any pipeline") - -// builtReceiver is a receiver that is built based on a config. It can have -// a trace and/or a metrics component. -type builtReceiver struct { - logger *zap.Logger - Receiver component.Receiver -} - -// Start starts the receiver. -func (rcv *builtReceiver) Start(ctx context.Context, host component.Host) error { - return rcv.Receiver.Start(ctx, components.NewHostWrapper(host, rcv.logger)) -} - -// Shutdown stops the receiver. -func (rcv *builtReceiver) Shutdown(ctx context.Context) error { - return rcv.Receiver.Shutdown(ctx) -} - -// Receivers is a map of receivers created from receiver configs. -type Receivers map[config.ComponentID]*builtReceiver - -// ShutdownAll stops all receivers. -func (rcvs Receivers) ShutdownAll(ctx context.Context) error { - var err error - for _, rcv := range rcvs { - err = multierr.Append(err, rcv.Shutdown(ctx)) - } - - return err -} - -// StartAll starts all receivers. -func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error { - for _, rcv := range rcvs { - rcv.logger.Info("Receiver is starting...") - - if err := rcv.Start(ctx, host); err != nil { - return err - } - rcv.logger.Info("Receiver started.") - } - return nil -} - -// receiversBuilder builds receivers from config. -type receiversBuilder struct { - config *config.Config - builtPipelines BuiltPipelines - factories map[config.Type]component.ReceiverFactory -} - -// BuildReceivers builds Receivers from config. -func BuildReceivers( - settings component.TelemetrySettings, - buildInfo component.BuildInfo, - cfg *config.Config, - builtPipelines BuiltPipelines, - factories map[config.Type]component.ReceiverFactory, -) (Receivers, error) { - rb := &receiversBuilder{cfg, builtPipelines, factories} - - receivers := make(Receivers) - for recvID, recvCfg := range cfg.Receivers { - set := component.ReceiverCreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: settings.Logger.With( - zap.String(components.ZapKindKey, components.ZapKindReceiver), - zap.String(components.ZapNameKey, recvID.String())), - TracerProvider: settings.TracerProvider, - MeterProvider: settings.MeterProvider, - MetricsLevel: cfg.Telemetry.Metrics.Level, - }, - BuildInfo: buildInfo, - } - - rcv, err := rb.buildReceiver(context.Background(), set, recvID, recvCfg) - if err != nil { - if errors.Is(err, errUnusedReceiver) { - set.Logger.Info("Ignoring receiver as it is not used by any pipeline") - continue - } - return nil, err - } - receivers[recvID] = rcv - } - - return receivers, nil -} - -// hasReceiver returns true if the pipeline is attached to specified receiver. -func hasReceiver(pipeline *config.Pipeline, receiverID config.ComponentID) bool { - for _, id := range pipeline.Receivers { - if id == receiverID { - return true - } - } - return false -} - -type attachedPipelines map[config.DataType][]*builtPipeline - -func (rb *receiversBuilder) findPipelinesToAttach(receiverID config.ComponentID) (attachedPipelines, error) { - // A receiver may be attached to multiple pipelines. Pipelines may consume different - // data types. We need to compile the list of pipelines of each type that must be - // attached to this receiver according to configuration. - - pipelinesToAttach := make(attachedPipelines) - - // Iterate over all pipelines. - for pipelineID, pipelineCfg := range rb.config.Service.Pipelines { - // Get the first processor of the pipeline. - pipelineProcessor := rb.builtPipelines[pipelineID] - if pipelineProcessor == nil { - return nil, fmt.Errorf("cannot find pipeline %q", pipelineID) - } - - // Is this receiver attached to the pipeline? - if hasReceiver(pipelineCfg, receiverID) { - if _, exists := pipelinesToAttach[pipelineID.Type()]; !exists { - pipelinesToAttach[pipelineID.Type()] = make([]*builtPipeline, 0) - } - - // Yes, add it to the list of pipelines of corresponding data type. - pipelinesToAttach[pipelineID.Type()] = append(pipelinesToAttach[pipelineID.Type()], pipelineProcessor) - } - } - - return pipelinesToAttach, nil -} - -func attachReceiverToPipelines( - ctx context.Context, - set component.ReceiverCreateSettings, - factory component.ReceiverFactory, - dataType config.DataType, - id config.ComponentID, - cfg config.Receiver, - rcv *builtReceiver, - builtPipelines []*builtPipeline, -) error { - // There are pipelines of the specified data type that must be attached to - // the receiver. Create the receiver of corresponding data type and make - // sure its output is fanned out to all attached pipelines. - var err error - var createdReceiver component.Receiver - - switch dataType { - case config.TracesDataType: - junction := buildFanoutTraceConsumer(builtPipelines) - createdReceiver, err = factory.CreateTracesReceiver(ctx, set, cfg, junction) - - case config.MetricsDataType: - junction := buildFanoutMetricConsumer(builtPipelines) - createdReceiver, err = factory.CreateMetricsReceiver(ctx, set, cfg, junction) - - case config.LogsDataType: - junction := buildFanoutLogConsumer(builtPipelines) - createdReceiver, err = factory.CreateLogsReceiver(ctx, set, cfg, junction) - - default: - err = component.ErrDataTypeIsNotSupported - } - - if err != nil { - if errors.Is(err, component.ErrDataTypeIsNotSupported) { - return fmt.Errorf( - "receiver %v does not support %s but it was used in a %s pipeline", - id, dataType, dataType) - } - return fmt.Errorf("cannot create receiver %v: %w", id, err) - } - - // Check if the factory really created the receiver. - if createdReceiver == nil { - return fmt.Errorf("factory for %v produced a nil receiver", id) - } - - if rcv.Receiver != nil { - // The receiver was previously created for this config. This can happen if the - // same receiver type supports more than one data type. In that case we expect - // that CreateTracesReceiver and CreateMetricsReceiver return the same value. - if rcv.Receiver != createdReceiver { - return fmt.Errorf( - "factory for %q is implemented incorrectly: "+ - "CreateTracesReceiver, CreateMetricsReceiver and CreateLogsReceiver must return "+ - "the same receiver pointer when creating receivers of different data types", - id, - ) - } - } - rcv.Receiver = createdReceiver - - set.Logger.Info("Receiver was built.", zap.String("datatype", string(dataType))) - - return nil -} - -func (rb *receiversBuilder) buildReceiver(ctx context.Context, set component.ReceiverCreateSettings, id config.ComponentID, cfg config.Receiver) (*builtReceiver, error) { - - // First find pipelines that must be attached to this receiver. - pipelinesToAttach, err := rb.findPipelinesToAttach(id) - if err != nil { - return nil, err - } - - // Prepare to build the receiver. - factory := rb.factories[id.Type()] - if factory == nil { - return nil, fmt.Errorf("receiver factory not found for: %v", cfg.ID()) - } - rcv := &builtReceiver{ - logger: set.Logger, - } - - // Now we have list of pipelines broken down by data type. Iterate for each data type. - for dataType, pipelines := range pipelinesToAttach { - if len(pipelines) == 0 { - // No pipelines of this data type are attached to this receiver. - continue - } - - // Attach the corresponding part of the receiver to all pipelines that require - // this data type. - if err = attachReceiverToPipelines(ctx, set, factory, dataType, id, cfg, rcv, pipelines); err != nil { - return nil, err - } - } - - if rcv.Receiver == nil { - return nil, errUnusedReceiver - } - - return rcv, nil -} - -func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces { - var pipelineConsumers []consumer.Traces - for _, pipeline := range pipelines { - pipelineConsumers = append(pipelineConsumers, pipeline.firstTC) - } - // Create a junction point that fans out to all pipelines. - return fanoutconsumer.NewTraces(pipelineConsumers) -} - -func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.Metrics { - var pipelineConsumers []consumer.Metrics - for _, pipeline := range pipelines { - pipelineConsumers = append(pipelineConsumers, pipeline.firstMC) - } - // Create a junction point that fans out to all pipelines. - return fanoutconsumer.NewMetrics(pipelineConsumers) -} - -func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.Logs { - var pipelineConsumers []consumer.Logs - for _, pipeline := range pipelines { - pipelineConsumers = append(pipelineConsumers, pipeline.firstLC) - } - // Create a junction point that fans out to all pipelines. - return fanoutconsumer.NewLogs(pipelineConsumers) -} diff --git a/service/internal/builder/receivers_builder_test.go b/service/internal/builder/receivers_builder_test.go deleted file mode 100644 index 21eed8b4867..00000000000 --- a/service/internal/builder/receivers_builder_test.go +++ /dev/null @@ -1,284 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" - "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/service/servicetest" -) - -type testCase struct { - name string - receiverID config.ComponentID - exporterIDs []config.ComponentID - spanDuplicationByExporter map[config.ComponentID]int - hasTraces bool - hasMetrics bool -} - -func TestBuildReceivers(t *testing.T) { - tests := []testCase{ - { - name: "one-exporter", - receiverID: config.NewComponentID("examplereceiver"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, - hasTraces: true, - hasMetrics: true, - }, - { - name: "multi-exporter", - receiverID: config.NewComponentIDWithName("examplereceiver", "2"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - hasTraces: true, - }, - { - name: "multi-metrics-receiver", - receiverID: config.NewComponentIDWithName("examplereceiver", "3"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - hasTraces: false, - hasMetrics: true, - }, - { - name: "multi-receiver-multi-exporter", - receiverID: config.NewComponentIDWithName("examplereceiver", "multi"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - - // Check pipelines_builder.yaml to understand this case. - // We have 2 pipelines, one exporting to one exporter, the other - // exporting to both exporters, so we expect a duplication on - // one of the exporters, but not on the other. - spanDuplicationByExporter: map[config.ComponentID]int{ - config.NewComponentID("exampleexporter"): 2, config.NewComponentIDWithName("exampleexporter", "2"): 1, - }, - hasTraces: true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testReceivers(t, test) - }) - } -} - -func testReceivers(t *testing.T, test testCase) { - factories, err := testcomponents.ExampleComponents() - assert.NoError(t, err) - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "pipelines_builder.yaml"), factories) - require.NoError(t, err) - - // Build the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - - assert.NoError(t, err) - require.NotNil(t, receivers) - - receiver := receivers[test.receiverID] - - // Ensure receiver has its fields correctly populated. - require.NotNil(t, receiver) - - assert.NotNil(t, receiver.Receiver) - - // Compose the list of created exporters. - var exporters []component.Exporter - for _, expID := range test.exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[config.TracesDataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send TraceData via receiver and verify that all exporters of the pipeline receive it. - - // First check that there are no traces in the exporters yet. - for _, exporter := range exporters { - consumer := exporter.(*testcomponents.ExampleExporter) - require.Equal(t, len(consumer.Traces), 0) - require.Equal(t, len(consumer.Metrics), 0) - } - - td := testdata.GenerateTracesOneSpan() - if test.hasTraces { - traceProducer := receiver.Receiver.(*testcomponents.ExampleReceiver) - assert.NoError(t, traceProducer.ConsumeTraces(context.Background(), td)) - } - - md := testdata.GenerateMetricsOneMetric() - if test.hasMetrics { - metricsProducer := receiver.Receiver.(*testcomponents.ExampleReceiver) - assert.NoError(t, metricsProducer.ConsumeMetrics(context.Background(), md)) - } - - // Now verify received data. - for _, expID := range test.exporterIDs { - // Validate traces. - if test.hasTraces { - var spanDuplicationCount int - if test.spanDuplicationByExporter != nil { - spanDuplicationCount = test.spanDuplicationByExporter[expID] - } else { - spanDuplicationCount = 1 - } - - traceConsumer := allExporters.exporters[config.TracesDataType][expID].(*testcomponents.ExampleExporter) - require.Equal(t, spanDuplicationCount, len(traceConsumer.Traces)) - - for i := 0; i < spanDuplicationCount; i++ { - assert.EqualValues(t, td, traceConsumer.Traces[i]) - } - } - - // Validate metrics. - if test.hasMetrics { - metricsConsumer := allExporters.exporters[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) - require.Equal(t, 1, len(metricsConsumer.Metrics)) - assert.EqualValues(t, md, metricsConsumer.Metrics[0]) - } - } -} - -func TestBuildReceiversBuildCustom(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - dataType config.DataType - shouldFail bool - }{ - { - dataType: config.LogsDataType, - shouldFail: false, - }, - { - dataType: "nosuchdatatype", - shouldFail: true, - }, - } - - for _, test := range tests { - t.Run(string(test.dataType), func(t *testing.T) { - cfg := createExampleConfig(test.dataType) - - // Build the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - if test.shouldFail { - assert.Error(t, err) - return - } - - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - - assert.NoError(t, err) - require.NotNil(t, receivers) - - receiver := receivers[config.NewComponentID("examplereceiver")] - - // Ensure receiver has its fields correctly populated. - require.NotNil(t, receiver) - - assert.NotNil(t, receiver.Receiver) - - // Compose the list of created exporters. - exporterIDs := []config.ComponentID{config.NewComponentID("exampleexporter")} - var exporters []component.Exporter - for _, expID := range exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[config.LogsDataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send Data via receiver and verify that all exporters of the pipeline receive it. - - // First check that there are no traces in the exporters yet. - for _, exporter := range exporters { - consumer := exporter.(*testcomponents.ExampleExporter) - require.Equal(t, len(consumer.Logs), 0) - } - - // Send one data. - log := plog.Logs{} - producer := receiver.Receiver.(*testcomponents.ExampleReceiver) - require.NoError(t, producer.ConsumeLogs(context.Background(), log)) - - // Now verify received data. - for _, exporter := range exporters { - // Validate exported data. - consumer := exporter.(*testcomponents.ExampleExporter) - require.Equal(t, 1, len(consumer.Logs)) - assert.EqualValues(t, log, consumer.Logs[0]) - } - }) - } -} - -func TestBuildReceivers_StartAll(t *testing.T) { - receivers := make(Receivers) - receiver := &testcomponents.ExampleReceiver{} - - receivers[config.NewComponentID("example")] = &builtReceiver{ - logger: zap.NewNop(), - Receiver: receiver, - } - - assert.False(t, receiver.Started) - assert.NoError(t, receivers.StartAll(context.Background(), componenttest.NewNopHost())) - assert.True(t, receiver.Started) - - assert.False(t, receiver.Stopped) - assert.NoError(t, receivers.ShutdownAll(context.Background())) - assert.True(t, receiver.Stopped) -} - -func TestBuildReceivers_Unused(t *testing.T) { - factories, err := testcomponents.ExampleComponents() - assert.NoError(t, err) - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "unused_receiver.yaml"), factories) - assert.NoError(t, err) - - // Build the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - assert.NoError(t, err) - assert.NotNil(t, receivers) - - assert.NoError(t, receivers.StartAll(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, receivers.ShutdownAll(context.Background())) -} diff --git a/service/internal/builder/testdata/pipelines_builder.yaml b/service/internal/builder/testdata/pipelines_builder.yaml deleted file mode 100644 index ef965adb5fa..00000000000 --- a/service/internal/builder/testdata/pipelines_builder.yaml +++ /dev/null @@ -1,40 +0,0 @@ -receivers: - examplereceiver: - examplereceiver/2: - examplereceiver/3: - examplereceiver/multi: - -processors: - exampleprocessor: - -exporters: - exampleexporter: - exampleexporter/2: - -service: - pipelines: - traces: - receivers: [examplereceiver, examplereceiver/multi] - processors: [exampleprocessor] - exporters: [exampleexporter] - - traces/2: - receivers: [examplereceiver/2, examplereceiver/multi] - processors: [exampleprocessor] - exporters: [exampleexporter, exampleexporter/2] - - metrics: - receivers: [examplereceiver] - exporters: [exampleexporter] - - metrics/2: - receivers: [examplereceiver/3] - exporters: [exampleexporter] - - metrics/3: - receivers: [examplereceiver/3] - exporters: [exampleexporter/2] - - logs: - receivers: [examplereceiver/3] - exporters: [exampleexporter/2] diff --git a/service/internal/builder/testdata/unused_receiver.yaml b/service/internal/builder/testdata/unused_receiver.yaml deleted file mode 100644 index f8dc448ac28..00000000000 --- a/service/internal/builder/testdata/unused_receiver.yaml +++ /dev/null @@ -1,12 +0,0 @@ -receivers: - examplereceiver: - examplereceiver/2: -processors: -exporters: - exampleexporter: - -service: - pipelines: - traces: - receivers: [examplereceiver] - exporters: [exampleexporter] \ No newline at end of file diff --git a/service/internal/builder/capabilities.go b/service/internal/pipelines/capabilities.go similarity index 94% rename from service/internal/builder/capabilities.go rename to service/internal/pipelines/capabilities.go index b2caefc38e4..6414f239f56 100644 --- a/service/internal/builder/capabilities.go +++ b/service/internal/pipelines/capabilities.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package builder // import "go.opentelemetry.io/collector/service/internal/builder" +package pipelines // import "go.opentelemetry.io/collector/service/internal/pipelines" import ( "go.opentelemetry.io/collector/consumer" diff --git a/service/internal/builder/capabilities_test.go b/service/internal/pipelines/capabilities_test.go similarity index 99% rename from service/internal/builder/capabilities_test.go rename to service/internal/pipelines/capabilities_test.go index 9ce53faef49..39350281d72 100644 --- a/service/internal/builder/capabilities_test.go +++ b/service/internal/pipelines/capabilities_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package builder +package pipelines import ( "context" diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 8e8461d52da..6b0fd2101ce 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -16,14 +16,18 @@ package pipelines // import "go.opentelemetry.io/collector/service/internal/pipe import ( "context" + "fmt" "net/http" "sort" "go.uber.org/multierr" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/service/internal/builder" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/fanoutconsumer" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -33,38 +37,32 @@ const ( zComponentKind = "zcomponentkind" ) -// Pipelines is set of all pipelines created from exporter configs. -type Pipelines struct { - telemetry component.TelemetrySettings +// baseConsumer redeclared here since not public in consumer package. May consider to make that public. +type baseConsumer interface { + Capabilities() consumer.Capabilities +} - exporters *builder.BuiltExporters - pipelines builder.BuiltPipelines - receivers builder.Receivers +type builtComponent struct { + id config.ComponentID + comp component.Component } -// Build builds all pipelines from config. -func Build(ctx context.Context, telemetry component.TelemetrySettings, buildInfo component.BuildInfo, cfg *config.Config, factories component.Factories) (*Pipelines, error) { - exporters, err := builder.BuildExporters(ctx, telemetry, buildInfo, cfg, factories.Exporters) - if err != nil { - return nil, err - } +type builtPipeline struct { + lastConsumer baseConsumer - pipelines, err := builder.BuildPipelines(telemetry, buildInfo, cfg, exporters, factories.Processors) - if err != nil { - return nil, err - } + receivers []builtComponent + processors []builtComponent + exporters []builtComponent +} - receivers, err := builder.BuildReceivers(telemetry, buildInfo, cfg, pipelines, factories.Receivers) - if err != nil { - return nil, err - } +// Pipelines is set of all pipelines created from exporter configs. +type Pipelines struct { + telemetry component.TelemetrySettings + + allReceivers map[config.DataType]map[config.ComponentID]component.Receiver + allExporters map[config.DataType]map[config.ComponentID]component.Exporter - return &Pipelines{ - telemetry: telemetry, - receivers: receivers, - exporters: exporters, - pipelines: pipelines, - }, nil + pipelines map[config.ComponentID]*builtPipeline } // StartAll starts all pipelines. @@ -74,18 +72,39 @@ func Build(ctx context.Context, telemetry component.TelemetrySettings, buildInfo // later in the pipeline do not start sending data to later components which are not yet started. func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { bps.telemetry.Logger.Info("Starting exporters...") - if err := bps.exporters.StartAll(ctx, host); err != nil { - return err + for dt, expByID := range bps.allExporters { + for expID, exp := range expByID { + expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) + expLogger.Info("Exporter is starting...") + if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + return err + } + expLogger.Info("Exporter started.") + } } bps.telemetry.Logger.Info("Starting processors...") - if err := bps.pipelines.StartProcessors(ctx, host); err != nil { - return err + for pipelineID, bp := range bps.pipelines { + for i := len(bp.processors) - 1; i >= 0; i-- { + procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + procLogger.Info("Processor is starting...") + if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + return err + } + procLogger.Info("Processor started.") + } } bps.telemetry.Logger.Info("Starting receivers...") - if err := bps.receivers.StartAll(ctx, host); err != nil { - return err + for dt, recvByID := range bps.allReceivers { + for recvID, recv := range recvByID { + recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) + recvLogger.Info("Exporter is starting...") + if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + return err + } + recvLogger.Info("Exporter started.") + } } return nil } @@ -97,19 +116,43 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { func (bps *Pipelines) ShutdownAll(ctx context.Context) error { var errs error bps.telemetry.Logger.Info("Stopping receivers...") - errs = multierr.Append(errs, bps.receivers.ShutdownAll(ctx)) + for _, recvByID := range bps.allReceivers { + for _, recv := range recvByID { + errs = multierr.Append(errs, recv.Shutdown(ctx)) + } + } bps.telemetry.Logger.Info("Stopping processors...") - errs = multierr.Append(errs, bps.pipelines.ShutdownProcessors(ctx)) + for _, bp := range bps.pipelines { + for _, p := range bp.processors { + errs = multierr.Append(errs, p.comp.Shutdown(ctx)) + } + } bps.telemetry.Logger.Info("Stopping exporters...") - errs = multierr.Append(errs, bps.exporters.ShutdownAll(ctx)) + for _, expByID := range bps.allExporters { + for _, exp := range expByID { + errs = multierr.Append(errs, exp.Shutdown(ctx)) + } + } return errs } func (bps *Pipelines) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter { - return bps.exporters.ToMapByDataType() + exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter) + + exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.TracesDataType])) + exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.MetricsDataType])) + exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.LogsDataType])) + + for dt, expByID := range bps.allExporters { + for expID, exp := range expByID { + exportersMap[dt][expID] = exp + } + } + + return exportersMap } func (bps *Pipelines) HandleZPages(w http.ResponseWriter, r *http.Request) { @@ -134,27 +177,352 @@ func (bps *Pipelines) HandleZPages(w http.ResponseWriter, r *http.Request) { zpages.WriteHTMLPageFooter(w) } +// Build builds all pipelines from config. +func Build(ctx context.Context, settings component.TelemetrySettings, buildInfo component.BuildInfo, cfg *config.Config, factories component.Factories) (*Pipelines, error) { + exps := &Pipelines{ + telemetry: settings, + allReceivers: make(map[config.DataType]map[config.ComponentID]component.Receiver), + allExporters: make(map[config.DataType]map[config.ComponentID]component.Exporter), + pipelines: make(map[config.ComponentID]*builtPipeline, len(cfg.Service.Pipelines)), + } + + receiversConsumers := make(map[config.DataType]map[config.ComponentID][]baseConsumer) + + // Iterate over all pipelines, and create exporters, then processors. + // Receivers cannot be created since we need to know all consumers, a.k.a. we need all pipelines build up to the + // first processor. + for pipelineID, pipeline := range cfg.Service.Pipelines { + // The data type of the pipeline defines what data type each exporter is expected to receive. + if _, ok := exps.allExporters[pipelineID.Type()]; !ok { + exps.allExporters[pipelineID.Type()] = make(map[config.ComponentID]component.Exporter) + } + expByID := exps.allExporters[pipelineID.Type()] + + bp := &builtPipeline{ + receivers: make([]builtComponent, len(pipeline.Receivers)), + processors: make([]builtComponent, len(pipeline.Processors)), + exporters: make([]builtComponent, len(pipeline.Exporters)), + } + exps.pipelines[pipelineID] = bp + + // Iterate over all Exporters for this pipeline. + for i, expID := range pipeline.Exporters { + // If already created an exporter for this [DataType, ComponentID] nothing to do, will reuse this instance. + if exp, ok := expByID[expID]; ok { + bp.exporters[i] = builtComponent{id: expID, comp: exp} + continue + } + + exp, err := buildExporter(ctx, settings, buildInfo, cfg.Exporters, factories.Exporters, expID, pipelineID) + if err != nil { + return nil, err + } + + bp.exporters[i] = builtComponent{id: expID, comp: exp} + expByID[expID] = exp + } + + // Build a fan out consumer to all exporters. + switch pipelineID.Type() { + case config.TracesDataType: + bp.lastConsumer = buildFanOutExportersTracesConsumer(bp.exporters) + case config.MetricsDataType: + bp.lastConsumer = buildFanOutExportersMetricsConsumer(bp.exporters) + case config.LogsDataType: + bp.lastConsumer = buildFanOutExportersLogsConsumer(bp.exporters) + default: + return nil, fmt.Errorf("create fan-out exporter in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type()) + } + + mutatesConsumedData := bp.lastConsumer.Capabilities().MutatesData + // Build the processors backwards, starting from the last one. + // The last processor points to fan out consumer to all Exporters, then the processor itself becomes a + // consumer for the one that precedes it in the pipeline and so on. + for i := len(pipeline.Processors) - 1; i >= 0; i-- { + procID := pipeline.Processors[i] + + proc, err := buildProcessor(ctx, settings, buildInfo, cfg.Processors, factories.Processors, procID, pipelineID, bp.lastConsumer) + if err != nil { + return nil, err + } + + bp.processors[i] = builtComponent{id: procID, comp: proc} + bp.lastConsumer = proc.(baseConsumer) + mutatesConsumedData = mutatesConsumedData || bp.lastConsumer.Capabilities().MutatesData + } + + // Some consumers may not correctly implement the Capabilities, and ignore the next consumer when calculated the Capabilities. + // Because of this wrap the first consumer if any consumers in the pipeline mutate the data and the first says that it doesn't. + switch pipelineID.Type() { + case config.TracesDataType: + bp.lastConsumer = capTraces{Traces: bp.lastConsumer.(consumer.Traces), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + case config.MetricsDataType: + bp.lastConsumer = capMetrics{Metrics: bp.lastConsumer.(consumer.Metrics), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + case config.LogsDataType: + bp.lastConsumer = capLogs{Logs: bp.lastConsumer.(consumer.Logs), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + default: + return nil, fmt.Errorf("create cap consumer in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type()) + } + + // The data type of the pipeline defines what data type each exporter is expected to receive. + if _, ok := receiversConsumers[pipelineID.Type()]; !ok { + receiversConsumers[pipelineID.Type()] = make(map[config.ComponentID][]baseConsumer) + } + recvConsByID := receiversConsumers[pipelineID.Type()] + // Iterate over all Receivers for this pipeline and just append the lastConsumer as a consumer for the receiver. + for _, recvID := range pipeline.Receivers { + recvConsByID[recvID] = append(recvConsByID[recvID], bp.lastConsumer) + } + } + + // Now that we built the `receiversConsumers` map, we can build the receivers as well. + for pipelineID, pipeline := range cfg.Service.Pipelines { + // The data type of the pipeline defines what data type each exporter is expected to receive. + if _, ok := exps.allReceivers[pipelineID.Type()]; !ok { + exps.allReceivers[pipelineID.Type()] = make(map[config.ComponentID]component.Receiver) + } + recvByID := exps.allReceivers[pipelineID.Type()] + bp := exps.pipelines[pipelineID] + + // Iterate over all Receivers for this pipeline. + for i, recvID := range pipeline.Receivers { + // If already created a receiver for this [DataType, ComponentID] nothing to do. + if exp, ok := recvByID[recvID]; ok { + bp.receivers[i] = builtComponent{id: recvID, comp: exp} + continue + } + + recv, err := buildReceiver(ctx, settings, buildInfo, cfg.Receivers, factories.Receivers, recvID, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) + if err != nil { + return nil, err + } + + bp.receivers[i] = builtComponent{id: recvID, comp: recv} + recvByID[recvID] = recv + } + } + return exps, nil +} + +func buildExporter( + ctx context.Context, + settings component.TelemetrySettings, + buildInfo component.BuildInfo, + cfgs map[config.ComponentID]config.Exporter, + factories map[config.Type]component.ExporterFactory, + id config.ComponentID, + pipelineID config.ComponentID, +) (component.Exporter, error) { + cfg, existsCfg := cfgs[id] + if !existsCfg { + return nil, fmt.Errorf("exporter %q is not configured", id) + } + + factory, existsFactory := factories[id.Type()] + if !existsFactory { + return nil, fmt.Errorf("exporter factory not available for: %q", id) + } + + set := component.ExporterCreateSettings{ + TelemetrySettings: settings, + BuildInfo: buildInfo, + } + set.TelemetrySettings.Logger = exporterLogger(settings.Logger, id, pipelineID.Type()) + + exp, err := createExporter(ctx, set, cfg, id, pipelineID, factory) + if err != nil { + return nil, fmt.Errorf("failt to create %q exporter, in pipeline %q: %w", id, pipelineID, err) + } + + return exp, nil +} + +func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg config.Exporter, id config.ComponentID, pipelineID config.ComponentID, factory component.ExporterFactory) (component.Exporter, error) { + switch pipelineID.Type() { + case config.TracesDataType: + return factory.CreateTracesExporter(ctx, set, cfg) + + case config.MetricsDataType: + return factory.CreateMetricsExporter(ctx, set, cfg) + + case config.LogsDataType: + return factory.CreateLogsExporter(ctx, set, cfg) + } + return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) +} + +func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces { + consumers := make([]consumer.Traces, 0, len(exporters)) + for _, exp := range exporters { + consumers = append(consumers, exp.comp.(consumer.Traces)) + } + // Create a junction point that fans out to all allExporters. + return fanoutconsumer.NewTraces(consumers) +} + +func buildFanOutExportersMetricsConsumer(exporters []builtComponent) consumer.Metrics { + consumers := make([]consumer.Metrics, 0, len(exporters)) + for _, exp := range exporters { + consumers = append(consumers, exp.comp.(consumer.Metrics)) + } + // Create a junction point that fans out to all allExporters. + return fanoutconsumer.NewMetrics(consumers) +} + +func buildFanOutExportersLogsConsumer(exporters []builtComponent) consumer.Logs { + consumers := make([]consumer.Logs, 0, len(exporters)) + for _, exp := range exporters { + consumers = append(consumers, exp.comp.(consumer.Logs)) + } + // Create a junction point that fans out to all allExporters. + return fanoutconsumer.NewLogs(consumers) +} + +func exporterLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger { + return logger.With( + zap.String(components.ZapKindKey, components.ZapKindExporter), + zap.String(components.ZapDataTypeKey, string(dt)), + zap.String(components.ZapNameKey, id.String())) +} + +func buildProcessor(ctx context.Context, + settings component.TelemetrySettings, + buildInfo component.BuildInfo, + cfgs map[config.ComponentID]config.Processor, + factories map[config.Type]component.ProcessorFactory, + id config.ComponentID, + pipelineID config.ComponentID, + next baseConsumer, +) (component.Processor, error) { + procCfg, existsCfg := cfgs[id] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", id) + } + + factory, existsFactory := factories[id.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", id) + } + + set := component.ProcessorCreateSettings{ + TelemetrySettings: settings, + BuildInfo: buildInfo, + } + set.TelemetrySettings.Logger = processorLogger(settings.Logger, id, pipelineID) + + proc, err := createProcessor(ctx, set, procCfg, id, pipelineID, next, factory) + if err != nil { + return nil, fmt.Errorf("failt to create %q processor, in pipeline %q: %w", id, pipelineID, err) + } + return proc, nil +} + +func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg config.Processor, id config.ComponentID, pipelineID config.ComponentID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) { + switch pipelineID.Type() { + case config.TracesDataType: + return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces)) + + case config.MetricsDataType: + return factory.CreateMetricsProcessor(ctx, set, cfg, next.(consumer.Metrics)) + + case config.LogsDataType: + return factory.CreateLogsProcessor(ctx, set, cfg, next.(consumer.Logs)) + } + return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) +} + +func processorLogger(logger *zap.Logger, procID config.ComponentID, pipelineID config.ComponentID) *zap.Logger { + return logger.With( + zap.String(components.ZapKindKey, components.ZapKindProcessor), + zap.String(components.ZapNameKey, procID.String()), + zap.String(components.ZapKindPipeline, pipelineID.String())) +} + +func buildReceiver(ctx context.Context, + settings component.TelemetrySettings, + buildInfo component.BuildInfo, + cfgs map[config.ComponentID]config.Receiver, + factories map[config.Type]component.ReceiverFactory, + id config.ComponentID, + pipelineID config.ComponentID, + nexts []baseConsumer, +) (component.Receiver, error) { + cfg, existsCfg := cfgs[id] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", id) + } + + factory, existsFactory := factories[id.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", id) + } + + set := component.ReceiverCreateSettings{ + TelemetrySettings: settings, + BuildInfo: buildInfo, + } + set.TelemetrySettings.Logger = receiverLogger(settings.Logger, id, pipelineID.Type()) + + recv, err := createReceiver(ctx, set, cfg, id, pipelineID, nexts, factory) + if err != nil { + return nil, fmt.Errorf("failt to create %q receiver, in pipeline %q: %w", id, pipelineID, err) + } + + return recv, nil +} + +func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg config.Receiver, id config.ComponentID, pipelineID config.ComponentID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) { + switch pipelineID.Type() { + case config.TracesDataType: + var consumers []consumer.Traces + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Traces)) + } + return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers)) + case config.MetricsDataType: + var consumers []consumer.Metrics + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Metrics)) + } + return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers)) + case config.LogsDataType: + var consumers []consumer.Logs + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Logs)) + } + return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers)) + } + return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) +} + +func receiverLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger { + return logger.With( + zap.String(components.ZapKindKey, components.ZapKindReceiver), + zap.String(components.ZapNameKey, id.String()), + zap.String(components.ZapKindPipeline, string(dt))) +} + func (bps *Pipelines) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData { sumData := zpages.SummaryPipelinesTableData{} sumData.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(bps.pipelines)) for c, p := range bps.pipelines { // TODO: Change the template to use ID. var recvs []string - for _, bRecv := range p.Config.Receivers { - recvs = append(recvs, bRecv.String()) + for _, bRecv := range p.receivers { + recvs = append(recvs, bRecv.id.String()) } var procs []string - for _, bProc := range p.Config.Processors { - procs = append(procs, bProc.String()) + for _, bProc := range p.processors { + procs = append(procs, bProc.id.String()) } var exps []string - for _, bExp := range p.Config.Exporters { - exps = append(exps, bExp.String()) + for _, bExp := range p.exporters { + exps = append(exps, bExp.id.String()) } row := zpages.SummaryPipelinesTableRowData{ FullName: c.String(), InputType: string(c.Type()), - MutatesData: p.MutatesData, + MutatesData: p.lastConsumer.Capabilities().MutatesData, Receivers: recvs, Processors: procs, Exporters: exps, diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index 855d21827c8..8e4a7ec2df8 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -16,6 +16,7 @@ package pipelines import ( "context" + "errors" "path/filepath" "testing" @@ -25,6 +26,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testcomponents" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/service/servicetest" @@ -111,32 +114,35 @@ func TestBuild(t *testing.T) { } // Verify processors created in the given order and started. - for i := range test.processorIDs { - traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].Processors[i] - assert.True(t, traceProcessor.(*testcomponents.ExampleProcessor).Started) + for i, procID := range test.processorIDs { + traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].processors[i] + assert.Equal(t, procID, traceProcessor.id) + assert.True(t, traceProcessor.comp.(*testcomponents.ExampleProcessor).Started) // Validate metrics. - metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].Processors[i] - assert.True(t, metricsProcessor.(*testcomponents.ExampleProcessor).Started) + metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].processors[i] + assert.Equal(t, procID, metricsProcessor.id) + assert.True(t, metricsProcessor.comp.(*testcomponents.ExampleProcessor).Started) // Validate logs. - logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].Processors[i] - assert.True(t, logsProcessor.(*testcomponents.ExampleProcessor).Started) + logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].processors[i] + assert.Equal(t, procID, logsProcessor.id) + assert.True(t, logsProcessor.comp.(*testcomponents.ExampleProcessor).Started) } // Verify receivers created, started and send data to confirm pipelines correctly connected. for _, recvID := range test.receiverIDs { - traceReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + traceReceiver := pipelines.allReceivers[config.TracesDataType][recvID].(*testcomponents.ExampleReceiver) assert.True(t, traceReceiver.Started) // Send traces. assert.NoError(t, traceReceiver.ConsumeTraces(context.Background(), testdata.GenerateTracesOneSpan())) - metricsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + metricsReceiver := pipelines.allReceivers[config.MetricsDataType][recvID].(*testcomponents.ExampleReceiver) assert.True(t, metricsReceiver.Started) // Send metrics. assert.NoError(t, metricsReceiver.ConsumeMetrics(context.Background(), testdata.GenerateMetricsOneMetric())) - logsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + logsReceiver := pipelines.allReceivers[config.LogsDataType][recvID].(*testcomponents.ExampleReceiver) assert.True(t, logsReceiver.Started) // Send logs. assert.NoError(t, logsReceiver.ConsumeLogs(context.Background(), testdata.GenerateLogsOneLogRecord())) @@ -146,28 +152,28 @@ func TestBuild(t *testing.T) { // Verify receivers shutdown. for _, recvID := range test.receiverIDs { - traceReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + traceReceiver := pipelines.allReceivers[config.TracesDataType][recvID].(*testcomponents.ExampleReceiver) assert.True(t, traceReceiver.Stopped) - metricsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + metricsReceiver := pipelines.allReceivers[config.MetricsDataType][recvID].(*testcomponents.ExampleReceiver) assert.True(t, metricsReceiver.Stopped) - logsReceiver := pipelines.receivers[recvID].Receiver.(*testcomponents.ExampleReceiver) + logsReceiver := pipelines.allReceivers[config.LogsDataType][recvID].(*testcomponents.ExampleReceiver) assert.True(t, logsReceiver.Stopped) } // Verify processors shutdown. for i := range test.processorIDs { - traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].Processors[i] - assert.True(t, traceProcessor.(*testcomponents.ExampleProcessor).Stopped) + traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].processors[i] + assert.True(t, traceProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) // Validate metrics. - metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].Processors[i] - assert.True(t, metricsProcessor.(*testcomponents.ExampleProcessor).Stopped) + metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].processors[i] + assert.True(t, metricsProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) // Validate logs. - logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].Processors[i] - assert.True(t, logsProcessor.(*testcomponents.ExampleProcessor).Stopped) + logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].processors[i] + assert.True(t, logsProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) } // Now verify that exporters received data, and are shutdown. @@ -194,7 +200,7 @@ func TestBuild(t *testing.T) { } } -func TestBuildExportersNotSupportedDataType(t *testing.T) { +func TestBuildErrors(t *testing.T) { nopReceiverFactory := componenttest.NewNopReceiverFactory() nopProcessorFactory := componenttest.NewNopProcessorFactory() nopExporterFactory := componenttest.NewNopExporterFactory() @@ -224,10 +230,6 @@ func TestBuildExportersNotSupportedDataType(t *testing.T) { for _, test := range tests { t.Run(test.configFile, func(t *testing.T) { - if test.configFile == "unknown_receiver_config.yaml" { - t.Skip("This is a small issue with current implementation which will be fixed in #5512." + - "In real binary this will not be hit, since the validation of the config will catch this issue in advance.") - } factories := component.Factories{ Receivers: map[config.Type]component.ReceiverFactory{ nopReceiverFactory.Type(): nopReceiverFactory, @@ -261,6 +263,95 @@ func TestBuildExportersNotSupportedDataType(t *testing.T) { } } +func TestFailToStartAndShutdown(t *testing.T) { + errReceiverFactory := newErrReceiverFactory() + errProcessorFactory := newErrProcessorFactory() + errExporterFactory := newErrExporterFactory() + nopReceiverFactory := componenttest.NewNopReceiverFactory() + nopProcessorFactory := componenttest.NewNopProcessorFactory() + nopExporterFactory := componenttest.NewNopExporterFactory() + + factories := component.Factories{ + Receivers: map[config.Type]component.ReceiverFactory{ + nopReceiverFactory.Type(): nopReceiverFactory, + errReceiverFactory.Type(): errReceiverFactory, + }, + Processors: map[config.Type]component.ProcessorFactory{ + nopProcessorFactory.Type(): nopProcessorFactory, + errProcessorFactory.Type(): errProcessorFactory, + }, + Exporters: map[config.Type]component.ExporterFactory{ + nopExporterFactory.Type(): nopExporterFactory, + errExporterFactory.Type(): errExporterFactory, + }, + } + + cfg := &config.Config{ + Receivers: map[config.ComponentID]config.Receiver{ + config.NewComponentID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), + config.NewComponentID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), + }, + Exporters: map[config.ComponentID]config.Exporter{ + config.NewComponentID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), + config.NewComponentID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), + }, + Processors: map[config.ComponentID]config.Processor{ + config.NewComponentID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), + config.NewComponentID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), + }, + } + + for _, dt := range []config.DataType{config.TracesDataType, config.MetricsDataType, config.LogsDataType} { + t.Run(string(dt)+"/receiver", func(t *testing.T) { + cfg.Service = config.Service{ + Pipelines: map[config.ComponentID]*config.Pipeline{ + config.NewComponentID(dt): { + Receivers: []config.ComponentID{config.NewComponentID("nop"), config.NewComponentID("err")}, + Processors: []config.ComponentID{config.NewComponentID("nop")}, + Exporters: []config.ComponentID{config.NewComponentID("nop")}, + }, + }, + } + pipelines, err := Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.NoError(t, err) + assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.ShutdownAll(context.Background())) + }) + + t.Run(string(dt)+"/processor", func(t *testing.T) { + cfg.Service = config.Service{ + Pipelines: map[config.ComponentID]*config.Pipeline{ + config.NewComponentID(dt): { + Receivers: []config.ComponentID{config.NewComponentID("nop")}, + Processors: []config.ComponentID{config.NewComponentID("nop"), config.NewComponentID("err")}, + Exporters: []config.ComponentID{config.NewComponentID("nop")}, + }, + }, + } + pipelines, err := Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.NoError(t, err) + assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.ShutdownAll(context.Background())) + }) + + t.Run(string(dt)+"/exporter", func(t *testing.T) { + cfg.Service = config.Service{ + Pipelines: map[config.ComponentID]*config.Pipeline{ + config.NewComponentID(dt): { + Receivers: []config.ComponentID{config.NewComponentID("nop")}, + Processors: []config.ComponentID{config.NewComponentID("nop")}, + Exporters: []config.ComponentID{config.NewComponentID("nop"), config.NewComponentID("err")}, + }, + }, + } + pipelines, err := Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.NoError(t, err) + assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.ShutdownAll(context.Background())) + }) + } +} + func newBadReceiverFactory() component.ReceiverFactory { return component.NewReceiverFactory("bf", func() config.Receiver { return &struct { @@ -290,3 +381,79 @@ func newBadExporterFactory() component.ExporterFactory { } }) } + +func newErrReceiverFactory() component.ReceiverFactory { + return component.NewReceiverFactory("err", func() config.Receiver { + return &struct { + config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("bf")), + } + }, + component.WithTracesReceiver(func(context.Context, component.ReceiverCreateSettings, config.Receiver, consumer.Traces) (component.TracesReceiver, error) { + return &errComponent{}, nil + }), + component.WithLogsReceiver(func(context.Context, component.ReceiverCreateSettings, config.Receiver, consumer.Logs) (component.LogsReceiver, error) { + return &errComponent{}, nil + }), + component.WithMetricsReceiver(func(context.Context, component.ReceiverCreateSettings, config.Receiver, consumer.Metrics) (component.MetricsReceiver, error) { + return &errComponent{}, nil + }), + ) +} + +func newErrProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory("err", func() config.Processor { + return &struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("bf")), + } + }, + component.WithTracesProcessor(func(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Traces) (component.TracesProcessor, error) { + return &errComponent{}, nil + }), + component.WithLogsProcessor(func(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Logs) (component.LogsProcessor, error) { + return &errComponent{}, nil + }), + component.WithMetricsProcessor(func(context.Context, component.ProcessorCreateSettings, config.Processor, consumer.Metrics) (component.MetricsProcessor, error) { + return &errComponent{}, nil + }), + ) +} + +func newErrExporterFactory() component.ExporterFactory { + return component.NewExporterFactory("err", func() config.Exporter { + return &struct { + config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID("bf")), + } + }, + component.WithTracesExporter(func(context.Context, component.ExporterCreateSettings, config.Exporter) (component.TracesExporter, error) { + return &errComponent{}, nil + }), + component.WithLogsExporter(func(context.Context, component.ExporterCreateSettings, config.Exporter) (component.LogsExporter, error) { + return &errComponent{}, nil + }), + component.WithMetricsExporter(func(context.Context, component.ExporterCreateSettings, config.Exporter) (component.MetricsExporter, error) { + return &errComponent{}, nil + }), + ) +} + +type errComponent struct { + consumertest.Consumer +} + +func (e errComponent) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (e errComponent) Start(context.Context, component.Host) error { + return errors.New("my error") +} + +func (e errComponent) Shutdown(context.Context) error { + return errors.New("my error") +} diff --git a/service/service.go b/service/service.go index 9abf81a8f67..124f9d0e85e 100644 --- a/service/service.go +++ b/service/service.go @@ -81,7 +81,7 @@ func (srv *service) Start(ctx context.Context) error { } if err := srv.host.pipelines.StartAll(ctx, srv.host); err != nil { - return fmt.Errorf("cannot start exporters: %w", err) + return fmt.Errorf("cannot start pipelines: %w", err) } return srv.host.builtExtensions.NotifyPipelineReady() @@ -96,7 +96,7 @@ func (srv *service) Shutdown(ctx context.Context) error { } if err := srv.host.pipelines.ShutdownAll(ctx); err != nil { - errs = multierr.Append(errs, fmt.Errorf("failed to shutdown exporters: %w", err)) + errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err)) } if err := srv.host.builtExtensions.ShutdownAll(ctx); err != nil {