Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka Exporter] refactor tests #16265

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 202 additions & 128 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,158 +16,232 @@ package kafkaexporter

import (
"context"
"reflect"
"errors"
"net"
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, "", cfg.Topic)
// data is a simple means of allowing
// interchangeability between the
// different marshaller types
type data interface {
ptrace.Traces | plog.Logs | pmetric.Metrics
}

func TestCreateAllExporter(t *testing.T) {
cfg0 := createDefaultConfig().(*Config)
cfg1 := createDefaultConfig().(*Config)
cfg2 := createDefaultConfig().(*Config)

cfg0.Brokers = []string{"invalid:9092"}
cfg1.Brokers = []string{"invalid:9092"}
cfg2.Brokers = []string{"invalid:9092"}

cfg0.ProtocolVersion = "2.0.0"
cfg1.ProtocolVersion = "2.0.0"
cfg2.ProtocolVersion = "2.0.0"

// this disables contacting the broker so we can successfully create the exporter
cfg0.Metadata.Full = false
cfg1.Metadata.Full = false
cfg2.Metadata.Full = false

cfgClone := *cfg0 // Clone the config

f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers(), metricsMarshalers: metricsMarshalers(), logsMarshalers: logsMarshalers()}
r0, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg0)
require.NoError(t, err)
r1, err := f.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg1)
require.NoError(t, err)
r2, err := f.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg2)
require.NoError(t, err)

// createTracesExporter should not mutate values
assert.True(t, reflect.DeepEqual(*cfg0, cfgClone), "config should not mutate")
assert.True(t, reflect.DeepEqual(*cfg1, cfgClone), "config should not mutate")
assert.True(t, reflect.DeepEqual(*cfg2, cfgClone), "config should not mutate")
assert.NotNil(t, r0)
assert.NotNil(t, r1)
assert.NotNil(t, r2)
type mockMarshaler[Data data] struct {
consume func(d Data, topic string) ([]*sarama.ProducerMessage, error)
encoding string
}

func TestCreateTracesExporter(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}
func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding }

func TestCreateMetricsExport(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) {
if mm.consume != nil {
return mm.consume(d, topic)
}
return nil, errors.New("not implemented")
}

func TestCreateLogsExport(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] {
return &mockMarshaler[Data]{encoding: encoding}
}

func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
// applyConfigOption is used to modify values of the
// the default exporter config to make it easier to
// use the return in a test table set up
func applyConfigOption(option func(conf *Config)) *Config {
conf := createDefaultConfig().(*Config)
option(conf)
return conf
}

func TestCreateMetricsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.Error(t, err)
assert.Nil(t, mr)
}

func TestCreateLogsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.Error(t, err)
assert.Nil(t, mr)
}

func TestWithMarshalers(t *testing.T) {
cm := &customMarshaler{}
f := NewFactory(WithTracesMarshalers(cm))
func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, "", cfg.Topic)
}

type customMarshaler struct {
func TestCreateMetricExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
err error
}{
{
name: "valid config (no validating broker)",
conf: applyConfigOption(func(conf *Config) {
// this disables contacting the broker so
// we can successfully create the exporter
conf.Metadata.Full = false
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: nil,
},
{
name: "invalid config (validating broker)",
conf: applyConfigOption(func(conf *Config) {
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: &net.DNSError{},
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory()
exporter, err := f.CreateMetricsExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
tc.conf,
)
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
})
}
}

var _ TracesMarshaler = (*customMarshaler)(nil)

func (c customMarshaler) Marshal(_ ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
panic("implement me")
func TestCreateLogExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
err error
}{
{
name: "valid config (no validating broker)",
conf: applyConfigOption(func(conf *Config) {
// this disables contacting the broker so
// we can successfully create the exporter
conf.Metadata.Full = false
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: nil,
},
{
name: "invalid config (validating broker)",
conf: applyConfigOption(func(conf *Config) {
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
err: &net.DNSError{},
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory()
exporter, err := f.CreateLogsExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
tc.conf,
)
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
})
}
}

func (c customMarshaler) Encoding() string {
return "custom"
func TestCreateTraceExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
marshalers []TracesMarshaler
err error
}{
{
name: "valid config (no validating brokers)",
conf: applyConfigOption(func(conf *Config) {
conf.Metadata.Full = false
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
marshalers: nil,
err: nil,
},
{
name: "invalid config (validating brokers)",
conf: applyConfigOption(func(conf *Config) {
conf.Brokers = []string{"invalid:9092"}
conf.ProtocolVersion = "2.0.0"
}),
marshalers: nil,
err: &net.DNSError{},
},
{
name: "default_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
}),
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []TracesMarshaler{
newMockMarshaler[ptrace.Traces]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory(WithTracesMarshalers(tc.marshalers...))
exporter, err := f.CreateTracesExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
tc.conf,
)
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
assert.NotNil(t, exporter, "Must return valid exporter when no error is returned")
})
}
}