Skip to content

Commit

Permalink
Draft: receiver.Builder
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Dec 15, 2022
1 parent eb5f336 commit 2325358
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 298 deletions.
4 changes: 2 additions & 2 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol/internal/grpclog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -158,8 +159,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
ReceiverFactories: col.set.Factories.Receivers,
ReceiverConfigs: cfg.Receivers,
ReceiverBuilder: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers),
ProcessorFactories: col.set.Factories.Processors,
ProcessorConfigs: cfg.Processors,
ExporterFactories: col.set.Factories.Exporters,
Expand Down
76 changes: 76 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)
Expand Down Expand Up @@ -195,3 +197,77 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) {
}
return fMap, nil
}

// Builder receiver is a helper struct that given a set of Configs and Factories helps with creating receivers.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}

// NewBuilder creates a new receiver.Builder to help with creating components form a set of configs and factories.
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}

// CreateTraces creates a Traces receiver based on the settings and config.
func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesReceiverStability())
return f.CreateTracesReceiver(ctx, set, cfg, next)
}

// CreateMetrics creates a Metrics receiver based on the settings and config.
func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsReceiverStability())
return f.CreateMetricsReceiver(ctx, set, cfg, next)
}

// CreateLogs creates a Logs receiver based on the settings and config.
func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsReceiverStability())
return f.CreateLogsReceiver(ctx, set, cfg, next)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}

// logStabilityLevel logs the stability level of a component. The log level is set to info for
// undefined, unmaintained, deprecated and development. The log level is set to debug
// for alpha, beta and stable.
func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) {
if sl >= component.StabilityLevelAlpha {
logger.Debug(sl.LogMessage())
} else {
logger.Info(sl.LogMessage())
}
}
10 changes: 10 additions & 0 deletions receiver/receivertest/nop_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"go.opentelemetry.io/collector/receiver"
)

const typeStr = "nop"

// NewNopCreateSettings returns a new nop settings for Create* functions.
func NewNopCreateSettings() receiver.CreateSettings {
return receiver.CreateSettings{
Expand All @@ -30,3 +32,11 @@ func NewNopCreateSettings() receiver.CreateSettings {

// NewNopFactory returns a receiver.Factory that constructs nop receivers.
var NewNopFactory = componenttest.NewNopReceiverFactory //nolint:staticcheck

// NewNopBuilder returns a receiver.Builder that constructs nop receivers.
func NewNopBuilder() *receiver.Builder {
nopFactory := NewNopFactory()
return receiver.NewBuilder(
map[component.ID]component.Config{component.NewID(typeStr): nopFactory.CreateDefaultConfig()},
map[component.Type]receiver.Factory{typeStr: nopFactory})
}
28 changes: 28 additions & 0 deletions receiver/receivertest/nop_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,31 @@ func TestNewNopFactory(t *testing.T) {
assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, logs.Shutdown(context.Background()))
}

func TestNewNopBuilder(t *testing.T) {
builder := NewNopBuilder()
require.NotNil(t, builder)

factory := NewNopFactory()
cfg := factory.CreateDefaultConfig()
set := NewNopCreateSettings()
set.ID = component.NewID(typeStr)

traces, err := factory.CreateTracesReceiver(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, traces, bTraces)

metrics, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, metrics, bMetrics)

logs, err := factory.CreateLogsReceiver(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, logs, bLogs)
}
4 changes: 2 additions & 2 deletions service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var _ component.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
receiverFactories map[component.Type]receiver.Factory
receiverBuilder *receiver.Builder
processorFactories map[component.Type]processor.Factory
exporterFactories map[component.Type]exporter.Factory
extensionFactories map[component.Type]extension.Factory
Expand All @@ -48,7 +48,7 @@ func (host *serviceHost) ReportFatalError(err error) {
func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
return host.receiverFactories[componentType]
return host.receiverBuilder.Factory(componentType)
case component.KindProcessor:
return host.processorFactories[componentType]
case component.KindExporter:
Expand Down
59 changes: 13 additions & 46 deletions service/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,7 @@ type pipelinesSettings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

// ReceiverFactories maps receiver type names in the config to the respective receiver.Factory.
ReceiverFactories map[component.Type]receiver.Factory

// ReceiverConfigs is a map of component.ID to component.Config.
ReceiverConfigs map[component.ID]component.Config
ReceiverBuilder *receiver.Builder

// ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory.
ProcessorFactories map[component.Type]processor.Factory
Expand Down Expand Up @@ -340,7 +336,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = receiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type())
recv, err := buildReceiver(ctx, cSet, set.ReceiverConfigs, set.ReceiverFactories, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
recv, err := buildReceiver(ctx, cSet, set.ReceiverBuilder, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -500,53 +496,36 @@ func getProcessorStabilityLevel(factory processor.Factory, dt component.DataType

func buildReceiver(ctx context.Context,
set receiver.CreateSettings,
cfgs map[component.ID]component.Config,
factories map[component.Type]receiver.Factory,
builder *receiver.Builder,
pipelineID component.ID,
nexts []baseConsumer,
) (component.Component, error) {
cfg, existsCfg := cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", set.ID)
}

factory, existsFactory := factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
}

components.LogStabilityLevel(set.TelemetrySettings.Logger, getReceiverStabilityLevel(factory, pipelineID.Type()))

recv, err := createReceiver(ctx, set, cfg, pipelineID, nexts, factory)
if err != nil {
return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err)
}

return recv, nil
}

func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) {
) (recv component.Component, err error) {
switch pipelineID.Type() {
case component.DataTypeTraces:
var consumers []consumer.Traces
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers))
recv, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
case component.DataTypeMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers))
recv, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
case component.DataTypeLogs:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers))
recv, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
default:
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
}
if err != nil {
return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err)
}
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type())
return recv, nil
}

func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger {
Expand All @@ -556,18 +535,6 @@ func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType)
zap.String(components.ZapKindPipeline, string(dt)))
}

func getReceiverStabilityLevel(factory receiver.Factory, dt component.DataType) component.StabilityLevel {
switch dt {
case component.DataTypeTraces:
return factory.TracesReceiverStability()
case component.DataTypeMetrics:
return factory.MetricsReceiverStability()
case component.DataTypeLogs:
return factory.LogsReceiverStability()
}
return component.StabilityLevelUndefined
}

func (bps *builtPipelines) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData {
sumData := zpages.SummaryPipelinesTableData{}
sumData.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(bps.pipelines))
Expand Down
Loading

0 comments on commit 2325358

Please sign in to comment.