Skip to content

Commit

Permalink
[kafka Exporter] refactor tests (#16265)
Browse files Browse the repository at this point in the history
These tests needed a little love to make it easier to contribute to the
component

Co-authored-by: Alex Boten <alex@boten.ca>
  • Loading branch information
MovieStoreGuy and codeboten authored Nov 18, 2022
1 parent 1374862 commit 24ac3f8
Showing 1 changed file with 202 additions and 128 deletions.
330 changes: 202 additions & 128 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,158 +16,232 @@ package kafkaexporter

import (
"context"
"reflect"
"errors"
"net"
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, "", cfg.Topic)
// data is a simple means of allowing
// interchangeability between the
// different marshaller types
type data interface {
ptrace.Traces | plog.Logs | pmetric.Metrics
}

func TestCreateAllExporter(t *testing.T) {
cfg0 := createDefaultConfig().(*Config)
cfg1 := createDefaultConfig().(*Config)
cfg2 := createDefaultConfig().(*Config)

cfg0.Brokers = []string{"invalid:9092"}
cfg1.Brokers = []string{"invalid:9092"}
cfg2.Brokers = []string{"invalid:9092"}

cfg0.ProtocolVersion = "2.0.0"
cfg1.ProtocolVersion = "2.0.0"
cfg2.ProtocolVersion = "2.0.0"

// this disables contacting the broker so we can successfully create the exporter
cfg0.Metadata.Full = false
cfg1.Metadata.Full = false
cfg2.Metadata.Full = false

cfgClone := *cfg0 // Clone the config

f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers(), metricsMarshalers: metricsMarshalers(), logsMarshalers: logsMarshalers()}
r0, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg0)
require.NoError(t, err)
r1, err := f.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg1)
require.NoError(t, err)
r2, err := f.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg2)
require.NoError(t, err)

// createTracesExporter should not mutate values
assert.True(t, reflect.DeepEqual(*cfg0, cfgClone), "config should not mutate")
assert.True(t, reflect.DeepEqual(*cfg1, cfgClone), "config should not mutate")
assert.True(t, reflect.DeepEqual(*cfg2, cfgClone), "config should not mutate")
assert.NotNil(t, r0)
assert.NotNil(t, r1)
assert.NotNil(t, r2)
type mockMarshaler[Data data] struct {
consume func(d Data, topic string) ([]*sarama.ProducerMessage, error)
encoding string
}

func TestCreateTracesExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}
func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding }

func TestCreateMetricsExport(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) {
if mm.consume != nil {
return mm.consume(d, topic)
}
return nil, errors.New("not implemented")
}

func TestCreateLogsExport(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] {
return &mockMarshaler[Data]{encoding: encoding}
}

func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
// applyConfigOption is used to modify values of the
// the default exporter config to make it easier to
// use the return in a test table set up
func applyConfigOption(option func(conf *Config)) *Config {
conf := createDefaultConfig().(*Config)
option(conf)
return conf
}

func TestCreateMetricsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.Error(t, err)
assert.Nil(t, mr)
}

func TestCreateLogsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.Error(t, err)
assert.Nil(t, mr)
}

func TestWithMarshalers(t *testing.T) {
cm := &customMarshaler{}
f := NewFactory(WithTracesMarshalers(cm))
func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, "", cfg.Topic)
}

type customMarshaler struct {
func TestCreateMetricExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
err error
}{
{
name: "valid config (no validating broker)",
conf: applyConfigOption(func(conf *Config) {
// this disables contacting the broker so
// we can successfully create the exporter
conf.Metadata.Full = false
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: nil,
},
{
name: "invalid config (validating broker)",
conf: applyConfigOption(func(conf *Config) {
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: &net.DNSError{},
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory()
exporter, err := f.CreateMetricsExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
tc.conf,
)
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
})
}
}

var _ TracesMarshaler = (*customMarshaler)(nil)

func (c customMarshaler) Marshal(_ ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
panic("implement me")
func TestCreateLogExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
err error
}{
{
name: "valid config (no validating broker)",
conf: applyConfigOption(func(conf *Config) {
// this disables contacting the broker so
// we can successfully create the exporter
conf.Metadata.Full = false
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: nil,
},
{
name: "invalid config (validating broker)",
conf: applyConfigOption(func(conf *Config) {
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: &net.DNSError{},
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory()
exporter, err := f.CreateLogsExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
tc.conf,
)
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
})
}
}

func (c customMarshaler) Encoding() string {
return "custom"
func TestCreateTraceExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
marshalers []TracesMarshaler
err error
}{
{
name: "valid config (no validating brokers)",
conf: applyConfigOption(func(conf *Config) {
conf.Metadata.Full = false
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
marshalers: nil,
err: nil,
},
{
name: "invalid config (validating brokers)",
conf: applyConfigOption(func(conf *Config) {
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
marshalers: nil,
err: &net.DNSError{},
},
{
name: "default_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
}),
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []TracesMarshaler{
newMockMarshaler[ptrace.Traces]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory(WithTracesMarshalers(tc.marshalers...))
exporter, err := f.CreateTracesExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
tc.conf,
)
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
})
}
}

0 comments on commit 24ac3f8

Please sign in to comment.