Skip to content

Commit

Permalink
[chore] Reduce number of arguments pass around in service.pipelines
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Dec 14, 2022
1 parent 0578ca4 commit 512b743
Showing 1 changed file with 48 additions and 54 deletions.
102 changes: 48 additions & 54 deletions service/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,13 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines
continue
}

exp, err := buildExporter(ctx, set.Telemetry, set.BuildInfo, set.ExporterConfigs, set.ExporterFactories, expID, pipelineID)
cSet := exporter.CreateSettings{
ID: expID,
TelemetrySettings: set.Telemetry,
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = exporterLogger(set.Telemetry.Logger, expID, pipelineID.Type())
exp, err := buildExporter(ctx, cSet, set.ExporterConfigs, set.ExporterFactories, pipelineID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -271,7 +277,13 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines
for i := len(pipeline.Processors) - 1; i >= 0; i-- {
procID := pipeline.Processors[i]

proc, err := buildProcessor(ctx, set.Telemetry, set.BuildInfo, set.ProcessorConfigs, set.ProcessorFactories, procID, pipelineID, bp.lastConsumer)
cSet := processor.CreateSettings{
ID: procID,
TelemetrySettings: set.Telemetry,
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = processorLogger(set.Telemetry.Logger, procID, pipelineID)
proc, err := buildProcessor(ctx, cSet, set.ProcessorConfigs, set.ProcessorFactories, pipelineID, bp.lastConsumer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,7 +334,13 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines
continue
}

recv, err := buildReceiver(ctx, set.Telemetry, set.BuildInfo, set.ReceiverConfigs, set.ReceiverFactories, recvID, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
cSet := receiver.CreateSettings{
ID: recvID,
TelemetrySettings: set.Telemetry,
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = receiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type())
recv, err := buildReceiver(ctx, cSet, set.ReceiverConfigs, set.ReceiverFactories, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
if err != nil {
return nil, err
}
Expand All @@ -336,40 +354,32 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines

func buildExporter(
ctx context.Context,
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
set exporter.CreateSettings,
cfgs map[component.ID]component.Config,
factories map[component.Type]exporter.Factory,
id component.ID,
pipelineID component.ID,
) (component.Component, error) {
cfg, existsCfg := cfgs[id]
cfg, existsCfg := cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("exporter %q is not configured", id)
return nil, fmt.Errorf("exporter %q is not configured", set.ID)
}

factory, existsFactory := factories[id.Type()]
factory, existsFactory := factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("exporter factory not available for: %q", id)
return nil, fmt.Errorf("exporter factory not available for: %q", set.ID)
}

set := exporter.CreateSettings{
ID: id,
TelemetrySettings: settings,
BuildInfo: buildInfo,
}
set.TelemetrySettings.Logger = exporterLogger(settings.Logger, id, pipelineID.Type())
components.LogStabilityLevel(set.TelemetrySettings.Logger, getExporterStabilityLevel(factory, pipelineID.Type()))

exp, err := createExporter(ctx, set, cfg, id, pipelineID, factory)
exp, err := createExporter(ctx, set, cfg, pipelineID, factory)
if err != nil {
return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", id, pipelineID, err)
return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", set.ID, pipelineID, err)
}

return exp, nil
}

func createExporter(ctx context.Context, set exporter.CreateSettings, cfg component.Config, id component.ID, pipelineID component.ID, factory exporter.Factory) (component.Component, error) {
func createExporter(ctx context.Context, set exporter.CreateSettings, cfg component.Config, pipelineID component.ID, factory exporter.Factory) (component.Component, error) {
switch pipelineID.Type() {
case component.DataTypeTraces:
return factory.CreateTracesExporter(ctx, set, cfg)
Expand All @@ -380,7 +390,7 @@ func createExporter(ctx context.Context, set exporter.CreateSettings, cfg compon
case component.DataTypeLogs:
return factory.CreateLogsExporter(ctx, set, cfg)
}
return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type())
return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
}

func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces {
Expand Down Expand Up @@ -430,40 +440,32 @@ func getExporterStabilityLevel(factory exporter.Factory, dt component.DataType)
}

func buildProcessor(ctx context.Context,
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
set processor.CreateSettings,
cfgs map[component.ID]component.Config,
factories map[component.Type]processor.Factory,
id component.ID,
pipelineID component.ID,
next baseConsumer,
) (component.Component, error) {
procCfg, existsCfg := cfgs[id]
procCfg, existsCfg := cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", id)
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

factory, existsFactory := factories[id.Type()]
factory, existsFactory := factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", id)
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

set := processor.CreateSettings{
ID: id,
TelemetrySettings: settings,
BuildInfo: buildInfo,
}
set.TelemetrySettings.Logger = processorLogger(settings.Logger, id, pipelineID)
components.LogStabilityLevel(set.TelemetrySettings.Logger, getProcessorStabilityLevel(factory, pipelineID.Type()))

proc, err := createProcessor(ctx, set, procCfg, id, pipelineID, next, factory)
proc, err := createProcessor(ctx, set, procCfg, pipelineID, next, factory)
if err != nil {
return nil, fmt.Errorf("failed to create %q processor, in pipeline %q: %w", id, pipelineID, err)
return nil, fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, pipelineID, err)
}
return proc, nil
}

func createProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, id component.ID, pipelineID component.ID, next baseConsumer, factory processor.Factory) (component.Component, error) {
func createProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, pipelineID component.ID, next baseConsumer, factory processor.Factory) (component.Component, error) {
switch pipelineID.Type() {
case component.DataTypeTraces:
return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces))
Expand All @@ -474,7 +476,7 @@ func createProcessor(ctx context.Context, set processor.CreateSettings, cfg comp
case component.DataTypeLogs:
return factory.CreateLogsProcessor(ctx, set, cfg, next.(consumer.Logs))
}
return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type())
return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
}

func processorLogger(logger *zap.Logger, procID component.ID, pipelineID component.ID) *zap.Logger {
Expand All @@ -497,41 +499,33 @@ func getProcessorStabilityLevel(factory processor.Factory, dt component.DataType
}

func buildReceiver(ctx context.Context,
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
set receiver.CreateSettings,
cfgs map[component.ID]component.Config,
factories map[component.Type]receiver.Factory,
id component.ID,
pipelineID component.ID,
nexts []baseConsumer,
) (component.Component, error) {
cfg, existsCfg := cfgs[id]
cfg, existsCfg := cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", id)
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

factory, existsFactory := factories[id.Type()]
factory, existsFactory := factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", id)
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

set := receiver.CreateSettings{
ID: id,
TelemetrySettings: settings,
BuildInfo: buildInfo,
}
set.TelemetrySettings.Logger = receiverLogger(settings.Logger, id, pipelineID.Type())
components.LogStabilityLevel(set.TelemetrySettings.Logger, getReceiverStabilityLevel(factory, pipelineID.Type()))

recv, err := createReceiver(ctx, set, cfg, id, pipelineID, nexts, factory)
recv, err := createReceiver(ctx, set, cfg, pipelineID, nexts, factory)
if err != nil {
return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", id, pipelineID, err)
return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err)
}

return recv, nil
}

func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) {
func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) {
switch pipelineID.Type() {
case component.DataTypeTraces:
var consumers []consumer.Traces
Expand All @@ -552,7 +546,7 @@ func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg compon
}
return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers))
}
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type())
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
}

func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger {
Expand Down

0 comments on commit 512b743

Please sign in to comment.