diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index fbf940d711dc..806e11c10546 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -16,158 +16,232 @@ package kafkaexporter import ( "context" - "reflect" + "errors" + "net" "testing" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) -func TestCreateDefaultConfig(t *testing.T) { - cfg := createDefaultConfig().(*Config) - assert.NotNil(t, cfg, "failed to create default config") - assert.NoError(t, componenttest.CheckConfigStruct(cfg)) - assert.Equal(t, []string{defaultBroker}, cfg.Brokers) - assert.Equal(t, "", cfg.Topic) +// data is a simple means of allowing +// interchangeability between the +// different marshaller types +type data interface { + ptrace.Traces | plog.Logs | pmetric.Metrics } -func TestCreateAllExporter(t *testing.T) { - cfg0 := createDefaultConfig().(*Config) - cfg1 := createDefaultConfig().(*Config) - cfg2 := createDefaultConfig().(*Config) - - cfg0.Brokers = []string{"invalid:9092"} - cfg1.Brokers = []string{"invalid:9092"} - cfg2.Brokers = []string{"invalid:9092"} - - cfg0.ProtocolVersion = "2.0.0" - cfg1.ProtocolVersion = "2.0.0" - cfg2.ProtocolVersion = "2.0.0" - - // this disables contacting the broker so we can successfully create the exporter - cfg0.Metadata.Full = false - cfg1.Metadata.Full = false - cfg2.Metadata.Full = false - - cfgClone := *cfg0 // Clone the config - - f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers(), metricsMarshalers: metricsMarshalers(), logsMarshalers: logsMarshalers()} - r0, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg0) - require.NoError(t, err) - r1, err := f.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg1) - require.NoError(t, err) - r2, err := f.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg2) - require.NoError(t, err) - - // createTracesExporter should not mutate values - assert.True(t, reflect.DeepEqual(*cfg0, cfgClone), "config should not mutate") - assert.True(t, reflect.DeepEqual(*cfg1, cfgClone), "config should not mutate") - assert.True(t, reflect.DeepEqual(*cfg2, cfgClone), "config should not mutate") - assert.NotNil(t, r0) - assert.NotNil(t, r1) - assert.NotNil(t, r2) +type mockMarshaler[Data data] struct { + consume func(d Data, topic string) ([]*sarama.ProducerMessage, error) + encoding string } -func TestCreateTracesExporter(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - // this disables contacting the broker so we can successfully create the exporter - cfg.Metadata.Full = false - f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()} - r, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.NoError(t, err) - assert.NotNil(t, r) -} +func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding } -func TestCreateMetricsExport(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - // this disables contacting the broker so we can successfully create the exporter - cfg.Metadata.Full = false - mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()} - mr, err := mf.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.NoError(t, err) - assert.NotNil(t, mr) +func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) { + if mm.consume != nil { + return mm.consume(d, topic) + } + return nil, errors.New("not implemented") } -func TestCreateLogsExport(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - // this disables contacting the broker so we can successfully create the exporter - cfg.Metadata.Full = false - mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()} - mr, err := mf.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.NoError(t, err) - assert.NotNil(t, mr) +func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] { + return &mockMarshaler[Data]{encoding: encoding} } -func TestCreateTracesExporter_err(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()} - r, err := f.createTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - // no available broker - require.Error(t, err) - assert.Nil(t, r) +// applyConfigOption is used to modify values of the +// the default exporter config to make it easier to +// use the return in a test table set up +func applyConfigOption(option func(conf *Config)) *Config { + conf := createDefaultConfig().(*Config) + option(conf) + return conf } -func TestCreateMetricsExporter_err(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()} - mr, err := mf.createMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.Error(t, err) - assert.Nil(t, mr) -} - -func TestCreateLogsExporter_err(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} - cfg.ProtocolVersion = "2.0.0" - mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()} - mr, err := mf.createLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.Error(t, err) - assert.Nil(t, mr) -} - -func TestWithMarshalers(t *testing.T) { - cm := &customMarshaler{} - f := NewFactory(WithTracesMarshalers(cm)) +func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) - // disable contacting broker - cfg.Metadata.Full = false - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = cm.Encoding() - exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.NoError(t, err) - require.NotNil(t, exporter) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg) - require.NoError(t, err) - assert.NotNil(t, exporter) - }) + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.Equal(t, []string{defaultBroker}, cfg.Brokers) + assert.Equal(t, "", cfg.Topic) } -type customMarshaler struct { +func TestCreateMetricExporter(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + conf *Config + err error + }{ + { + name: "valid config (no validating broker)", + conf: applyConfigOption(func(conf *Config) { + // this disables contacting the broker so + // we can successfully create the exporter + conf.Metadata.Full = false + conf.Brokers = []string{"invalid:9092"} + conf.ProtocolVersion = "2.0.0" + }), + err: nil, + }, + { + name: "invalid config (validating broker)", + conf: applyConfigOption(func(conf *Config) { + conf.Brokers = []string{"invalid:9092"} + conf.ProtocolVersion = "2.0.0" + }), + err: &net.DNSError{}, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f := NewFactory() + exporter, err := f.CreateMetricsExporter( + context.Background(), + componenttest.NewNopExporterCreateSettings(), + tc.conf, + ) + if tc.err != nil { + assert.ErrorAs(t, err, &tc.err, "Must match the expected error") + assert.Nil(t, exporter, "Must return nil value for invalid exporter") + return + } + assert.NoError(t, err, "Must not error") + assert.NotNil(t, exporter, "Must return valid exporter when no error is returned") + }) + } } -var _ TracesMarshaler = (*customMarshaler)(nil) - -func (c customMarshaler) Marshal(_ ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { - panic("implement me") +func TestCreateLogExporter(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + conf *Config + err error + }{ + { + name: "valid config (no validating broker)", + conf: applyConfigOption(func(conf *Config) { + // this disables contacting the broker so + // we can successfully create the exporter + conf.Metadata.Full = false + conf.Brokers = []string{"invalid:9092"} + conf.ProtocolVersion = "2.0.0" + }), + err: nil, + }, + { + name: "invalid config (validating broker)", + conf: applyConfigOption(func(conf *Config) { + conf.Brokers = []string{"invalid:9092"} + conf.ProtocolVersion = "2.0.0" + }), + err: &net.DNSError{}, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f := NewFactory() + exporter, err := f.CreateLogsExporter( + context.Background(), + componenttest.NewNopExporterCreateSettings(), + tc.conf, + ) + if tc.err != nil { + assert.ErrorAs(t, err, &tc.err, "Must match the expected error") + assert.Nil(t, exporter, "Must return nil value for invalid exporter") + return + } + assert.NoError(t, err, "Must not error") + assert.NotNil(t, exporter, "Must return valid exporter when no error is returned") + }) + } } -func (c customMarshaler) Encoding() string { - return "custom" +func TestCreateTraceExporter(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + conf *Config + marshalers []TracesMarshaler + err error + }{ + { + name: "valid config (no validating brokers)", + conf: applyConfigOption(func(conf *Config) { + conf.Metadata.Full = false + conf.Brokers = []string{"invalid:9092"} + conf.ProtocolVersion = "2.0.0" + }), + marshalers: nil, + err: nil, + }, + { + name: "invalid config (validating brokers)", + conf: applyConfigOption(func(conf *Config) { + conf.Brokers = []string{"invalid:9092"} + conf.ProtocolVersion = "2.0.0" + }), + marshalers: nil, + err: &net.DNSError{}, + }, + { + name: "default_encoding", + conf: applyConfigOption(func(conf *Config) { + // Disabling broker check to ensure encoding work + conf.Metadata.Full = false + conf.Encoding = defaultEncoding + }), + marshalers: nil, + err: nil, + }, + { + name: "custom_encoding", + conf: applyConfigOption(func(conf *Config) { + // Disabling broker check to ensure encoding work + conf.Metadata.Full = false + conf.Encoding = "custom" + }), + marshalers: []TracesMarshaler{ + newMockMarshaler[ptrace.Traces]("custom"), + }, + err: nil, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f := NewFactory(WithTracesMarshalers(tc.marshalers...)) + exporter, err := f.CreateTracesExporter( + context.Background(), + componenttest.NewNopExporterCreateSettings(), + tc.conf, + ) + if tc.err != nil { + assert.ErrorAs(t, err, &tc.err, "Must match the expected error") + assert.Nil(t, exporter, "Must return nil value for invalid exporter") + return + } + assert.NoError(t, err, "Must not error") + assert.NotNil(t, exporter, "Must return valid exporter when no error is returned") + }) + } }