Skip to content

Commit

Permalink
Draft: receiver.Builder
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Dec 15, 2022
1 parent 512b743 commit 9e90c0b
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 286 deletions.
4 changes: 2 additions & 2 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol/internal/grpclog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -158,8 +159,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
ReceiverFactories: col.set.Factories.Receivers,
ReceiverConfigs: cfg.Receivers,
ReceiverBuilder: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers),
ProcessorFactories: col.set.Factories.Processors,
ProcessorConfigs: cfg.Processors,
ExporterFactories: col.set.Factories.Exporters,
Expand Down
75 changes: 75 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)
Expand Down Expand Up @@ -195,3 +197,76 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) {
}
return fMap, nil
}

// Builder receiver is a helper struct that given a set of Configs and Factories helps with creating receivers.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}

func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}

// CreateTraces creates a Traces receiver based on the settings and config.
func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesReceiverStability())
return f.CreateTracesReceiver(ctx, set, cfg, next)
}

// CreateMetrics creates a Metrics receiver based on the settings and config.
func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsReceiverStability())
return f.CreateMetricsReceiver(ctx, set, cfg, next)
}

// CreateLogs creates a Logs receiver based on the settings and config.
func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsReceiverStability())
return f.CreateLogsReceiver(ctx, set, cfg, next)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}

// logStabilityLevel logs the stability level of a component. The log level is set to info for
// undefined, unmaintained, deprecated and development. The log level is set to debug
// for alpha, beta and stable.
func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) {
if sl >= component.StabilityLevelAlpha {
logger.Debug(sl.LogMessage())
} else {
logger.Info(sl.LogMessage())
}
}
4 changes: 2 additions & 2 deletions service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var _ component.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
receiverFactories map[component.Type]receiver.Factory
receiverBuilder *receiver.Builder
processorFactories map[component.Type]processor.Factory
exporterFactories map[component.Type]exporter.Factory
extensionFactories map[component.Type]extension.Factory
Expand All @@ -48,7 +48,7 @@ func (host *serviceHost) ReportFatalError(err error) {
func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
return host.receiverFactories[componentType]
return host.receiverBuilder.Factory(componentType)
case component.KindProcessor:
return host.processorFactories[componentType]
case component.KindExporter:
Expand Down
47 changes: 13 additions & 34 deletions service/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,7 @@ type pipelinesSettings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

// ReceiverFactories maps receiver type names in the config to the respective receiver.Factory.
ReceiverFactories map[component.Type]receiver.Factory

// ReceiverConfigs is a map of component.ID to component.Config.
ReceiverConfigs map[component.ID]component.Config
ReceiverBuilder *receiver.Builder

// ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory.
ProcessorFactories map[component.Type]processor.Factory
Expand Down Expand Up @@ -340,7 +336,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = receiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type())
recv, err := buildReceiver(ctx, cSet, set.ReceiverConfigs, set.ReceiverFactories, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
recv, err := buildReceiver(ctx, cSet, set.ReceiverBuilder, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -500,53 +496,36 @@ func getProcessorStabilityLevel(factory processor.Factory, dt component.DataType

func buildReceiver(ctx context.Context,
set receiver.CreateSettings,
cfgs map[component.ID]component.Config,
factories map[component.Type]receiver.Factory,
builder *receiver.Builder,
pipelineID component.ID,
nexts []baseConsumer,
) (component.Component, error) {
cfg, existsCfg := cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

factory, existsFactory := factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

components.LogStabilityLevel(set.TelemetrySettings.Logger, getReceiverStabilityLevel(factory, pipelineID.Type()))

recv, err := createReceiver(ctx, set, cfg, pipelineID, nexts, factory)
if err != nil {
return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err)
}

return recv, nil
}

func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) {
) (recv component.Component, err error) {
switch pipelineID.Type() {
case component.DataTypeTraces:
var consumers []consumer.Traces
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers))
recv, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
case component.DataTypeMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers))
recv, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
case component.DataTypeLogs:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers))
recv, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
default:
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
}
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
if err != nil {
return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err)
}
return recv, nil
}

func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger {
Expand Down
Loading

0 comments on commit 9e90c0b

Please sign in to comment.