diff --git a/service/internal/builder/capabilities.go b/service/internal/builder/capabilities.go new file mode 100644 index 00000000000..b2caefc38e4 --- /dev/null +++ b/service/internal/builder/capabilities.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder // import "go.opentelemetry.io/collector/service/internal/builder" + +import ( + "go.opentelemetry.io/collector/consumer" +) + +func wrapLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs { + return capLogs{Logs: logs, cap: cap} +} + +type capLogs struct { + consumer.Logs + cap consumer.Capabilities +} + +func (mts capLogs) Capabilities() consumer.Capabilities { + return mts.cap +} + +func wrapMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics { + return capMetrics{Metrics: metrics, cap: cap} +} + +type capMetrics struct { + consumer.Metrics + cap consumer.Capabilities +} + +func (mts capMetrics) Capabilities() consumer.Capabilities { + return mts.cap +} + +func wrapTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces { + return capTraces{Traces: traces, cap: cap} +} + +type capTraces struct { + consumer.Traces + cap consumer.Capabilities +} + +func (mts capTraces) Capabilities() consumer.Capabilities { + return mts.cap +} diff --git a/service/internal/builder/capabilities_test.go b/service/internal/builder/capabilities_test.go new file mode 100644 index 00000000000..9ce53faef49 --- /dev/null +++ b/service/internal/builder/capabilities_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/internal/testdata" +) + +func TestWrapLogs(t *testing.T) { + sink := &consumertest.LogsSink{} + require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) + + wrap := wrapLogs(sink, consumer.Capabilities{MutatesData: true}) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) + + assert.NoError(t, wrap.ConsumeLogs(context.Background(), testdata.GenerateLogsOneLogRecord())) + assert.Len(t, sink.AllLogs(), 1) + assert.Equal(t, testdata.GenerateLogsOneLogRecord(), sink.AllLogs()[0]) +} + +func TestWrapMetrics(t *testing.T) { + sink := &consumertest.MetricsSink{} + require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) + + wrap := wrapMetrics(sink, consumer.Capabilities{MutatesData: true}) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) + + assert.NoError(t, wrap.ConsumeMetrics(context.Background(), testdata.GenerateMetricsOneMetric())) + assert.Len(t, sink.AllMetrics(), 1) + assert.Equal(t, testdata.GenerateMetricsOneMetric(), sink.AllMetrics()[0]) +} + +func TestWrapTraces(t *testing.T) { + sink := &consumertest.TracesSink{} + require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) + + wrap := wrapTraces(sink, consumer.Capabilities{MutatesData: true}) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) + + assert.NoError(t, wrap.ConsumeTraces(context.Background(), testdata.GenerateTracesOneSpan())) + assert.Len(t, sink.AllTraces(), 1) + assert.Equal(t, testdata.GenerateTracesOneSpan(), sink.AllTraces()[0]) +} diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go index 3b006c6c3d9..014bfbb7ad1 100644 --- a/service/internal/builder/pipelines_builder.go +++ b/service/internal/builder/pipelines_builder.go @@ -228,13 +228,13 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config // Because of this wrap the first consumer if any consumers in the pipeline // mutate the data and the first says that it doesn't. if tc != nil { - tc = capabilitiesTraces{Traces: tc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} + tc = wrapTraces(tc, consumer.Capabilities{MutatesData: mutatesConsumedData}) } if mc != nil { - mc = capabilitiesMetrics{Metrics: mc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} + mc = wrapMetrics(mc, consumer.Capabilities{MutatesData: mutatesConsumedData}) } if lc != nil { - lc = capabilitiesLogs{Logs: lc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} + lc = wrapLogs(lc, consumer.Capabilities{MutatesData: mutatesConsumedData}) } bp := &builtPipeline{ logger: pipelineLogger, @@ -281,30 +281,3 @@ func (pb *pipelinesBuilder) buildFanoutExportersLogsConsumer(exporterIDs []confi // Create a junction point that fans out to all exporters. return fanoutconsumer.NewLogs(exporters) } - -type capabilitiesLogs struct { - consumer.Logs - capabilities consumer.Capabilities -} - -func (mts capabilitiesLogs) Capabilities() consumer.Capabilities { - return mts.capabilities -} - -type capabilitiesMetrics struct { - consumer.Metrics - capabilities consumer.Capabilities -} - -func (mts capabilitiesMetrics) Capabilities() consumer.Capabilities { - return mts.capabilities -} - -type capabilitiesTraces struct { - consumer.Traces - capabilities consumer.Capabilities -} - -func (mts capabilitiesTraces) Capabilities() consumer.Capabilities { - return mts.capabilities -}