Skip to content

Commit

Permalink
keep topic field and set it as deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Aug 19, 2024
1 parent d9cf33e commit ed98bc7
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .chloggen/kafka-receiver-topic-per-signal.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver
Expand Down
2 changes: 2 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The following settings can be optionally configured:

- `brokers` (default = localhost:9092): The list of kafka brokers
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `topic` The name of the kafka topic to read from. Only one telemetry type may be used for a given topic.
**Warning: this setting is deprecated in favor of the `*_topic` settings to allow configuring one topic per signal.**
- `traces_topic` (default = otlp_spans): The name of the kafka topic to read traces from.
- `metrics_topic` (default = otlp_metrics): The name of the kafka topic to read metrics from.
- `logs_topic` (default = otlp_logs): The name of the kafka topic to read logs from.
Expand Down
5 changes: 5 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Config struct {
// Heartbeat interval for the Kafka consumer
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
// Deprecated: use instead "traces_topic", "metrics_topic" and "logs_topic".
Topic string `mapstructure:"topic"`
// The name of the kafka topic to consume traces from (default "otlp_spans")
TracesTopic string `mapstructure:"traces_topic"`
Expand Down Expand Up @@ -95,5 +97,8 @@ var _ component.Config = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.Topic != "" && (cfg.LogsTopic != "" || cfg.MetricsTopic != "" || cfg.TracesTopic != "") {
return fmt.Errorf("setting 'topic' and 'logs_topic'|'metrics_topic'|'traces_topic' is not allowed")
}
return nil
}
71 changes: 71 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,69 @@ func TestLoadConfig(t *testing.T) {
expected component.Config
expectedErr error
}{
{
id: component.NewIDWithName(metadata.Type, "deprecated"),
expected: &Config{
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ResolveCanonicalBootstrapServersOnly: true,
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "latest",
Authentication: kafka.Authentication{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
Metadata: kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 10,
Backoff: time.Second * 5,
},
},
AutoCommit: AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "deprecated_logs"),
expected: &Config{
Topic: "logs",
Encoding: "direct",
Brokers: []string{"coffee:123", "foobar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "earliest",
Authentication: kafka.Authentication{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
Metadata: kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 10,
Backoff: time.Second * 5,
},
},
AutoCommit: AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
},
},
{
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
Expand Down Expand Up @@ -115,3 +178,11 @@ func TestLoadConfig(t *testing.T) {
})
}
}

func TestLoadConfigError(t *testing.T) {
config := &Config{
Topic: "spans",
TracesTopic: "spans1",
}
assert.ErrorContains(t, component.ValidateConfig(config), "setting 'topic' and 'logs_topic'|'metrics_topic'|'traces_topic' is not allowed")
}
15 changes: 12 additions & 3 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
}

oCfg := *(cfg.(*Config))
if oCfg.TracesTopic == "" {
if oCfg.Topic != "" {
set.Logger.Warn("'topic' is deprecated and will be removed in a future release. Please remove it from the configuration.")
oCfg.TracesTopic = oCfg.Topic
} else if oCfg.TracesTopic == "" {
oCfg.TracesTopic = defaultTracesTopic
}
unmarshaler := f.tracesUnmarshalers[oCfg.Encoding]
Expand All @@ -166,7 +169,10 @@ func (f *kafkaReceiverFactory) createMetricsReceiver(
}

oCfg := *(cfg.(*Config))
if oCfg.MetricsTopic == "" {
if oCfg.Topic != "" {
set.Logger.Warn("'topic' is deprecated and will be removed in a future release. Please remove it from the configuration.")
oCfg.MetricsTopic = oCfg.Topic
} else if oCfg.MetricsTopic == "" {
oCfg.MetricsTopic = defaultMetricsTopic
}
unmarshaler := f.metricsUnmarshalers[oCfg.Encoding]
Expand All @@ -192,7 +198,10 @@ func (f *kafkaReceiverFactory) createLogsReceiver(
}

oCfg := *(cfg.(*Config))
if oCfg.LogsTopic == "" {
if oCfg.Topic != "" {
set.Logger.Warn("'topic' is deprecated and will be removed in a future release. Please remove it from the configuration.")
oCfg.LogsTopic = oCfg.Topic
} else if oCfg.LogsTopic == "" {
oCfg.LogsTopic = defaultLogsTopic
}
unmarshaler, err := getLogsUnmarshaler(oCfg.Encoding, f.logsUnmarshalers)
Expand Down
35 changes: 35 additions & 0 deletions receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,38 @@
kafka/deprecated:
topic: spans
brokers:
- "foo:123"
- "bar:456"
resolve_canonical_bootstrap_servers_only: true
client_id: otel-collector
group_id: otel-collector
auth:
tls:
ca_file: ca.pem
cert_file: cert.pem
key_file: key.pem
metadata:
retry:
max: 10
backoff: 5s
kafka/deprecated_logs:
topic: logs
encoding: direct
brokers:
- "coffee:123"
- "foobar:456"
client_id: otel-collector
group_id: otel-collector
initial_offset: earliest
auth:
tls:
ca_file: ca.pem
cert_file: cert.pem
key_file: key.pem
metadata:
retry:
max: 10
backoff: 5s
kafka:
traces_topic: spans
metrics_topic: metrics
Expand Down

0 comments on commit ed98bc7

Please sign in to comment.