Skip to content

Commit

Permalink
pipeline/builder: add tests for capabilities wrappers (#5543)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jun 19, 2022
1 parent 635e0ae commit c099f6a
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 30 deletions.
58 changes: 58 additions & 0 deletions service/internal/builder/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder // import "go.opentelemetry.io/collector/service/internal/builder"

import (
"go.opentelemetry.io/collector/consumer"
)

func wrapLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs {
return capLogs{Logs: logs, cap: cap}
}

type capLogs struct {
consumer.Logs
cap consumer.Capabilities
}

func (mts capLogs) Capabilities() consumer.Capabilities {
return mts.cap
}

func wrapMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics {
return capMetrics{Metrics: metrics, cap: cap}
}

type capMetrics struct {
consumer.Metrics
cap consumer.Capabilities
}

func (mts capMetrics) Capabilities() consumer.Capabilities {
return mts.cap
}

func wrapTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces {
return capTraces{Traces: traces, cap: cap}
}

type capTraces struct {
consumer.Traces
cap consumer.Capabilities
}

func (mts capTraces) Capabilities() consumer.Capabilities {
return mts.cap
}
63 changes: 63 additions & 0 deletions service/internal/builder/capabilities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
)

func TestWrapLogs(t *testing.T) {
sink := &consumertest.LogsSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

wrap := wrapLogs(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeLogs(context.Background(), testdata.GenerateLogsOneLogRecord()))
assert.Len(t, sink.AllLogs(), 1)
assert.Equal(t, testdata.GenerateLogsOneLogRecord(), sink.AllLogs()[0])
}

func TestWrapMetrics(t *testing.T) {
sink := &consumertest.MetricsSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

wrap := wrapMetrics(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeMetrics(context.Background(), testdata.GenerateMetricsOneMetric()))
assert.Len(t, sink.AllMetrics(), 1)
assert.Equal(t, testdata.GenerateMetricsOneMetric(), sink.AllMetrics()[0])
}

func TestWrapTraces(t *testing.T) {
sink := &consumertest.TracesSink{}
require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities())

wrap := wrapTraces(sink, consumer.Capabilities{MutatesData: true})
assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities())

assert.NoError(t, wrap.ConsumeTraces(context.Background(), testdata.GenerateTracesOneSpan()))
assert.Len(t, sink.AllTraces(), 1)
assert.Equal(t, testdata.GenerateTracesOneSpan(), sink.AllTraces()[0])
}
33 changes: 3 additions & 30 deletions service/internal/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config
// Because of this wrap the first consumer if any consumers in the pipeline
// mutate the data and the first says that it doesn't.
if tc != nil {
tc = capabilitiesTraces{Traces: tc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}}
tc = wrapTraces(tc, consumer.Capabilities{MutatesData: mutatesConsumedData})
}
if mc != nil {
mc = capabilitiesMetrics{Metrics: mc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}}
mc = wrapMetrics(mc, consumer.Capabilities{MutatesData: mutatesConsumedData})
}
if lc != nil {
lc = capabilitiesLogs{Logs: lc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}}
lc = wrapLogs(lc, consumer.Capabilities{MutatesData: mutatesConsumedData})
}
bp := &builtPipeline{
logger: pipelineLogger,
Expand Down Expand Up @@ -281,30 +281,3 @@ func (pb *pipelinesBuilder) buildFanoutExportersLogsConsumer(exporterIDs []confi
// Create a junction point that fans out to all exporters.
return fanoutconsumer.NewLogs(exporters)
}

type capabilitiesLogs struct {
consumer.Logs
capabilities consumer.Capabilities
}

func (mts capabilitiesLogs) Capabilities() consumer.Capabilities {
return mts.capabilities
}

type capabilitiesMetrics struct {
consumer.Metrics
capabilities consumer.Capabilities
}

func (mts capabilitiesMetrics) Capabilities() consumer.Capabilities {
return mts.capabilities
}

type capabilitiesTraces struct {
consumer.Traces
capabilities consumer.Capabilities
}

func (mts capabilitiesTraces) Capabilities() consumer.Capabilities {
return mts.capabilities
}

0 comments on commit c099f6a

Please sign in to comment.