diff --git a/service/pipelines.go b/service/pipelines.go index 32e29309f6b..945908fc1cd 100644 --- a/service/pipelines.go +++ b/service/pipelines.go @@ -243,7 +243,13 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines continue } - exp, err := buildExporter(ctx, set.Telemetry, set.BuildInfo, set.ExporterConfigs, set.ExporterFactories, expID, pipelineID) + cSet := exporter.CreateSettings{ + ID: expID, + TelemetrySettings: set.Telemetry, + BuildInfo: set.BuildInfo, + } + cSet.TelemetrySettings.Logger = exporterLogger(set.Telemetry.Logger, expID, pipelineID.Type()) + exp, err := buildExporter(ctx, cSet, set.ExporterConfigs, set.ExporterFactories, pipelineID) if err != nil { return nil, err } @@ -271,7 +277,13 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines for i := len(pipeline.Processors) - 1; i >= 0; i-- { procID := pipeline.Processors[i] - proc, err := buildProcessor(ctx, set.Telemetry, set.BuildInfo, set.ProcessorConfigs, set.ProcessorFactories, procID, pipelineID, bp.lastConsumer) + cSet := processor.CreateSettings{ + ID: procID, + TelemetrySettings: set.Telemetry, + BuildInfo: set.BuildInfo, + } + cSet.TelemetrySettings.Logger = processorLogger(set.Telemetry.Logger, procID, pipelineID) + proc, err := buildProcessor(ctx, cSet, set.ProcessorConfigs, set.ProcessorFactories, pipelineID, bp.lastConsumer) if err != nil { return nil, err } @@ -322,7 +334,13 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines continue } - recv, err := buildReceiver(ctx, set.Telemetry, set.BuildInfo, set.ReceiverConfigs, set.ReceiverFactories, recvID, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) + cSet := receiver.CreateSettings{ + ID: recvID, + TelemetrySettings: set.Telemetry, + BuildInfo: set.BuildInfo, + } + cSet.TelemetrySettings.Logger = receiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type()) + recv, err := buildReceiver(ctx, cSet, set.ReceiverConfigs, set.ReceiverFactories, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) if err != nil { return nil, err } @@ -336,40 +354,32 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines func buildExporter( ctx context.Context, - settings component.TelemetrySettings, - buildInfo component.BuildInfo, + set exporter.CreateSettings, cfgs map[component.ID]component.Config, factories map[component.Type]exporter.Factory, - id component.ID, pipelineID component.ID, ) (component.Component, error) { - cfg, existsCfg := cfgs[id] + cfg, existsCfg := cfgs[set.ID] if !existsCfg { - return nil, fmt.Errorf("exporter %q is not configured", id) + return nil, fmt.Errorf("exporter %q is not configured", set.ID) } - factory, existsFactory := factories[id.Type()] + factory, existsFactory := factories[set.ID.Type()] if !existsFactory { - return nil, fmt.Errorf("exporter factory not available for: %q", id) + return nil, fmt.Errorf("exporter factory not available for: %q", set.ID) } - set := exporter.CreateSettings{ - ID: id, - TelemetrySettings: settings, - BuildInfo: buildInfo, - } - set.TelemetrySettings.Logger = exporterLogger(settings.Logger, id, pipelineID.Type()) components.LogStabilityLevel(set.TelemetrySettings.Logger, getExporterStabilityLevel(factory, pipelineID.Type())) - exp, err := createExporter(ctx, set, cfg, id, pipelineID, factory) + exp, err := createExporter(ctx, set, cfg, pipelineID, factory) if err != nil { - return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", id, pipelineID, err) + return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", set.ID, pipelineID, err) } return exp, nil } -func createExporter(ctx context.Context, set exporter.CreateSettings, cfg component.Config, id component.ID, pipelineID component.ID, factory exporter.Factory) (component.Component, error) { +func createExporter(ctx context.Context, set exporter.CreateSettings, cfg component.Config, pipelineID component.ID, factory exporter.Factory) (component.Component, error) { switch pipelineID.Type() { case component.DataTypeTraces: return factory.CreateTracesExporter(ctx, set, cfg) @@ -380,7 +390,7 @@ func createExporter(ctx context.Context, set exporter.CreateSettings, cfg compon case component.DataTypeLogs: return factory.CreateLogsExporter(ctx, set, cfg) } - return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) + return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) } func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces { @@ -430,40 +440,32 @@ func getExporterStabilityLevel(factory exporter.Factory, dt component.DataType) } func buildProcessor(ctx context.Context, - settings component.TelemetrySettings, - buildInfo component.BuildInfo, + set processor.CreateSettings, cfgs map[component.ID]component.Config, factories map[component.Type]processor.Factory, - id component.ID, pipelineID component.ID, next baseConsumer, ) (component.Component, error) { - procCfg, existsCfg := cfgs[id] + procCfg, existsCfg := cfgs[set.ID] if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", id) + return nil, fmt.Errorf("processor %q is not configured", set.ID) } - factory, existsFactory := factories[id.Type()] + factory, existsFactory := factories[set.ID.Type()] if !existsFactory { - return nil, fmt.Errorf("processor factory not available for: %q", id) + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) } - set := processor.CreateSettings{ - ID: id, - TelemetrySettings: settings, - BuildInfo: buildInfo, - } - set.TelemetrySettings.Logger = processorLogger(settings.Logger, id, pipelineID) components.LogStabilityLevel(set.TelemetrySettings.Logger, getProcessorStabilityLevel(factory, pipelineID.Type())) - proc, err := createProcessor(ctx, set, procCfg, id, pipelineID, next, factory) + proc, err := createProcessor(ctx, set, procCfg, pipelineID, next, factory) if err != nil { - return nil, fmt.Errorf("failed to create %q processor, in pipeline %q: %w", id, pipelineID, err) + return nil, fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, pipelineID, err) } return proc, nil } -func createProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, id component.ID, pipelineID component.ID, next baseConsumer, factory processor.Factory) (component.Component, error) { +func createProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, pipelineID component.ID, next baseConsumer, factory processor.Factory) (component.Component, error) { switch pipelineID.Type() { case component.DataTypeTraces: return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces)) @@ -474,7 +476,7 @@ func createProcessor(ctx context.Context, set processor.CreateSettings, cfg comp case component.DataTypeLogs: return factory.CreateLogsProcessor(ctx, set, cfg, next.(consumer.Logs)) } - return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) + return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) } func processorLogger(logger *zap.Logger, procID component.ID, pipelineID component.ID) *zap.Logger { @@ -497,41 +499,33 @@ func getProcessorStabilityLevel(factory processor.Factory, dt component.DataType } func buildReceiver(ctx context.Context, - settings component.TelemetrySettings, - buildInfo component.BuildInfo, + set receiver.CreateSettings, cfgs map[component.ID]component.Config, factories map[component.Type]receiver.Factory, - id component.ID, pipelineID component.ID, nexts []baseConsumer, ) (component.Component, error) { - cfg, existsCfg := cfgs[id] + cfg, existsCfg := cfgs[set.ID] if !existsCfg { - return nil, fmt.Errorf("receiver %q is not configured", id) + return nil, fmt.Errorf("receiver %q is not configured", set.ID) } - factory, existsFactory := factories[id.Type()] + factory, existsFactory := factories[set.ID.Type()] if !existsFactory { - return nil, fmt.Errorf("receiver factory not available for: %q", id) + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) } - set := receiver.CreateSettings{ - ID: id, - TelemetrySettings: settings, - BuildInfo: buildInfo, - } - set.TelemetrySettings.Logger = receiverLogger(settings.Logger, id, pipelineID.Type()) components.LogStabilityLevel(set.TelemetrySettings.Logger, getReceiverStabilityLevel(factory, pipelineID.Type())) - recv, err := createReceiver(ctx, set, cfg, id, pipelineID, nexts, factory) + recv, err := createReceiver(ctx, set, cfg, pipelineID, nexts, factory) if err != nil { - return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", id, pipelineID, err) + return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err) } return recv, nil } -func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) { +func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) { switch pipelineID.Type() { case component.DataTypeTraces: var consumers []consumer.Traces @@ -552,7 +546,7 @@ func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg compon } return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers)) } - return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) + return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) } func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger {