diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 8eadfc9952a..f9dc432ce49 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -14,6 +14,23 @@ The following settings can be optionally configured: - `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`. - `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`. +- `authentication` + - `type` (default = none): The authentication type. Supported types are `plain_text`, `tls`, `kerberos`. + - `plain_text` + - `username`: The username to use. + - `password`: The password to use + - `tls` + - `ca_file` path to the CA cert. For a client this verifies the server certificate. Should + only be used if `insecure` is set to true. + - `cert_file` path to the TLS cert to use for TLS required connections. Should + only be used if `insecure` is set to true. + - `key_file` path to the TLS key to use for TLS required connections. Should + only be used if `insecure` is set to true. + - `insecure` (default = false): Disable verifying the server's certificate chain and host + name (`InsecureSkipVerify` in the tls config) + - `server_name_override`: ServerName indicates the name of the server requested by the client + in order to support virtual hosting. +>>>>>>> Add authentication support to kafka - `metadata` - `full` (default = true): Whether to maintain a full set of metadata. When disabled the client does not make the initial request to broker at the startup. diff --git a/exporter/kafkaexporter/authentication.go b/exporter/kafkaexporter/authentication.go new file mode 100644 index 00000000000..d9f4bc173ae --- /dev/null +++ b/exporter/kafkaexporter/authentication.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkaexporter + +import ( + "fmt" + "strings" + + "github.com/Shopify/sarama" + + "go.opentelemetry.io/collector/config/configtls" +) + +// ConfigureAuthentication configures authentication in sarama.Config. +func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error { + switch AuthType(strings.TrimSpace(string(config.Type))) { + case AuthTypeNone, "": + return nil + case AuthTypeTLS: + return configureTLS(config.TLS, saramaConfig) + case AuthTypeKerberos: + configureKerberos(config.Kerberos, saramaConfig) + return nil + case AuthTypePlaintext: + configurePlaintext(config.PlainText, saramaConfig) + return nil + default: + return fmt.Errorf("unknown/unsupported authentication method %v to kafka cluster", config.Type) + } +} + +func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) { + saramaConfig.Net.SASL.Enable = true + saramaConfig.Net.SASL.User = config.Username + saramaConfig.Net.SASL.Password = config.Password +} + +func configureTLS(config configtls.TLSClientSetting, saramaConfig *sarama.Config) error { + tlsConfig, err := config.LoadTLSConfig() + if err != nil { + return fmt.Errorf("error loading tls config: %w", err) + } + saramaConfig.Net.TLS.Enable = true + saramaConfig.Net.TLS.Config = tlsConfig + return nil +} + +func configureKerberos(config KerberosConfig, saramaConfig *sarama.Config) { + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + saramaConfig.Net.SASL.Enable = true + if config.UseKeyTab { + saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath + saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH + } else { + saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH + saramaConfig.Net.SASL.GSSAPI.Password = config.Password + } + saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath + saramaConfig.Net.SASL.GSSAPI.Username = config.Username + saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm + saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName +} diff --git a/exporter/kafkaexporter/authentication_test.go b/exporter/kafkaexporter/authentication_test.go new file mode 100644 index 00000000000..190dd439c07 --- /dev/null +++ b/exporter/kafkaexporter/authentication_test.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkaexporter + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config/configtls" +) + +func TestAuthentication(t *testing.T) { + saramaPlaintext := &sarama.Config{} + saramaPlaintext.Net.SASL.Enable = true + saramaPlaintext.Net.SASL.User = "jdoe" + saramaPlaintext.Net.SASL.Password = "pass" + + saramaTLSCfg := &sarama.Config{} + saramaTLSCfg.Net.TLS.Enable = true + tlsClient := configtls.TLSClientSetting{} + tlscfg, err := tlsClient.LoadTLSConfig() + require.NoError(t, err) + saramaTLSCfg.Net.TLS.Config = tlscfg + + saramaKerberosCfg := &sarama.Config{} + saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + saramaKerberosCfg.Net.SASL.Enable = true + saramaKerberosCfg.Net.SASL.GSSAPI.ServiceName = "foobar" + saramaKerberosCfg.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH + + saramaKerberosKeyTabCfg := &sarama.Config{} + saramaKerberosKeyTabCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + saramaKerberosKeyTabCfg.Net.SASL.Enable = true + saramaKerberosKeyTabCfg.Net.SASL.GSSAPI.KeyTabPath = "/path" + saramaKerberosKeyTabCfg.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH + + tests := []struct { + auth Authentication + saramaConfig *sarama.Config + err string + }{ + { + auth: Authentication{Type: AuthType("foo")}, + err: "unknown/unsupported authentication method", + }, + { + auth: Authentication{Type: AuthTypePlaintext, PlainText: PlainTextConfig{Username: "jdoe", Password: "pass"}}, + saramaConfig: saramaPlaintext, + }, + { + auth: Authentication{Type: AuthTypeTLS, TLS: configtls.TLSClientSetting{}}, + saramaConfig: saramaTLSCfg, + }, + { + auth: Authentication{Type: AuthTypeTLS, TLS: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{CAFile: "/doesnotexists"}, + }}, + saramaConfig: saramaTLSCfg, + err: "failed to load TLS config", + }, + { + auth: Authentication{Type: AuthTypeKerberos, Kerberos: KerberosConfig{ServiceName: "foobar"}}, + saramaConfig: saramaKerberosCfg, + }, + { + auth: Authentication{Type: AuthTypeKerberos, Kerberos: KerberosConfig{UseKeyTab: true, KeyTabPath: "/path"}}, + saramaConfig: saramaKerberosKeyTabCfg, + }, + } + for _, test := range tests { + t.Run(string(test.auth.Type), func(t *testing.T) { + config := &sarama.Config{} + err := ConfigureAuthentication(test.auth, config) + if test.err != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.err) + } else { + assert.Equal(t, test.saramaConfig, config) + } + }) + } +} diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 1b3124d6102..1af02ef43c7 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -18,6 +18,7 @@ import ( "time" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" ) @@ -41,8 +42,47 @@ type Config struct { // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` - // TODO authentication - // TODO batch settings + // Authentication defines used authentication mechanism. + Authentication Authentication `mapstructure:"authentication"` +} + +// AuthType defines authentication type +type AuthType string + +const ( + // No authentication + AuthTypeNone AuthType = "none" + // Plain text authentication + AuthTypePlaintext AuthType = "plain_text" + // TLS authentication + AuthTypeTLS AuthType = "tls" + // Kerberos authentication + AuthTypeKerberos AuthType = "kerberos" +) + +// Authentication defines authentication. +type Authentication struct { + Type AuthType `mapstructure:"type"` + PlainText PlainTextConfig `mapstructure:"plain_text"` + TLS configtls.TLSClientSetting `mapstructure:"tls"` + Kerberos KerberosConfig `mapstructure:"kerberos"` +} + +// PlainTextConfig defines plaintext authentication. +type PlainTextConfig struct { + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + +// KerberosConfig defines kereros configuration. +type KerberosConfig struct { + ServiceName string `mapstructure:"service_name"` + Realm string `mapstructure:"realm"` + UseKeyTab bool `mapstructure:"use_keytab"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + ConfigPath string `mapstructure:"config_file"` + KeyTabPath string `mapstructure:"keytab_file"` } // Metadata defines configuration for retrieving metadata from the broker. diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index aee27033d45..20f524c8c8a 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -61,6 +61,13 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", Brokers: []string{"foo:123", "bar:456"}, + Authentication: Authentication{ + Type: "plain_text", + PlainText: PlainTextConfig{ + Username: "jdoe", + Password: "pass", + }, + }, Metadata: Metadata{ Full: false, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 788126bf987..0b63fd3f7d5 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -77,6 +77,9 @@ func createDefaultConfig() configmodels.Exporter { Brokers: []string{defaultBroker}, Topic: defaultTopic, Encoding: defaultEncoding, + Authentication: Authentication{ + Type: AuthTypeNone, + }, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 9255dfb3eb6..88bbbae5a22 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -32,6 +32,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.NoError(t, configcheck.ValidateConfig(cfg)) assert.Equal(t, []string{defaultBroker}, cfg.Brokers) assert.Equal(t, defaultTopic, cfg.Topic) + assert.Equal(t, AuthTypeNone, cfg.Authentication.Type) } func TestCreateTracesExporter(t *testing.T) { diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index ce392fcfb06..0f6774974b9 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -61,6 +61,9 @@ func newExporter(config Config, params component.ExporterCreateParams, marshalle } c.Version = version } + if err := ConfigureAuthentication(config.Authentication, c); err != nil { + return nil, err + } producer, err := sarama.NewSyncProducer(config.Brokers, c) if err != nil { return nil, err diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 3b27c98dbf1..a2d4702d48c 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -39,11 +39,26 @@ func TestNewExporter_err_version(t *testing.T) { func TestNewExporter_err_encoding(t *testing.T) { c := Config{Encoding: "foo"} - exp, err := newExporter(c, component.ExporterCreateParams{}, map[string]Marshaller{}) + exp, err := newExporter(c, component.ExporterCreateParams{}, defaultMarshallers()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, exp) } +func TestNewExporter_err_auth_type(t *testing.T) { + c := Config{ + ProtocolVersion: "2.0.0", + Authentication: Authentication{Type: AuthType("foo")}, + Encoding: defaultEncoding, + Metadata: Metadata{ + Full: false, + }, + } + exp, err := newExporter(c, component.ExporterCreateParams{}, defaultMarshallers()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unknown/unsupported authentication method") + assert.Nil(t, exp) +} + func TestTraceDataPusher(t *testing.T) { c := sarama.NewConfig() producer := mocks.NewSyncProducer(t, c) diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 907d80f1926..943e785f17d 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -9,6 +9,11 @@ exporters: retry: max: 15 timeout: 10s + authentication: + type: plain_text + plain_text: + username: jdoe + password: pass sending_queue: enabled: true num_consumers: 2 diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index e25c7e4f761..e36fa672f26 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -17,6 +17,22 @@ The following settings can be optionally configured: - `zipkin_thrift`: the payload is deserialized into Zipkin Thrift spans. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use +- `authentication` + - `type` (default = none): The authentication type. Supported types are `plain_text`, `tls`, `kerberos`. + - `plain_text` + - `username`: The username to use. + - `password`: The password to use + - `tls` + - `ca_file` path to the CA cert. For a client this verifies the server certificate. Should + only be used if `insecure` is set to true. + - `cert_file` path to the TLS cert to use for TLS required connections. Should + only be used if `insecure` is set to true. + - `key_file` path to the TLS key to use for TLS required connections. Should + only be used if `insecure` is set to true. + - `insecure` (default = false): Disable verifying the server's certificate chain and host + name (`InsecureSkipVerify` in the tls config) + - `server_name_override`: ServerName indicates the name of the server requested by the client + in order to support virtual hosting. - `metadata` - `full` (default = true): Whether to maintain a full set of metadata. When disabled the client does not make the initial request to broker at the startup. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index aa274b10fe4..84a41071545 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -39,5 +39,5 @@ type Config struct { // Client, and shared by the Producer/Consumer. Metadata kafkaexporter.Metadata `mapstructure:"metadata"` - // TODO authentication + Authentication kafkaexporter.Authentication `mapstructure:"authentication"` } diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index f627c8e393f..5bdd8c76929 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/kafkaexporter" ) @@ -49,6 +50,16 @@ func TestLoadConfig(t *testing.T) { Brokers: []string{"foo:123", "bar:456"}, ClientID: "otel-collector", GroupID: "otel-collector", + Authentication: kafkaexporter.Authentication{ + Type: kafkaexporter.AuthTypeTLS, + TLS: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + }, + }, + }, Metadata: kafkaexporter.Metadata{ Full: true, Retry: kafkaexporter.MetadataRetry{ diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 7232c54c1c5..45525914d38 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -78,6 +78,9 @@ func createDefaultConfig() configmodels.Receiver { Brokers: []string{defaultBroker}, ClientID: defaultClientID, GroupID: defaultGroupID, + Authentication: kafkaexporter.Authentication{ + Type: kafkaexporter.AuthTypeNone, + }, Metadata: kafkaexporter.Metadata{ Full: defaultMetadataFull, Retry: kafkaexporter.MetadataRetry{ diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index c094815bbb0..a4f80d866f1 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/exporter/kafkaexporter" ) func TestCreateDefaultConfig(t *testing.T) { @@ -34,6 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, defaultTopic, cfg.Topic) assert.Equal(t, defaultGroupID, cfg.GroupID) assert.Equal(t, defaultClientID, cfg.ClientID) + assert.Equal(t, kafkaexporter.AuthTypeNone, cfg.Authentication.Type) } func TestCreateTraceReceiver(t *testing.T) { diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 9ec72841bd5..99a66a95170 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/kafkaexporter" "go.opentelemetry.io/collector/obsreport" ) @@ -67,6 +68,9 @@ func newReceiver(config Config, params component.ReceiverCreateParams, unmarshal } c.Version = version } + if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil { + return nil, err + } client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c) if err != nil { return nil, err diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index 3b7983613f2..71352d21802 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -6,6 +6,12 @@ receivers: - "bar:456" client_id: otel-collector group_id: otel-collector + authentication: + type: tls + tls: + ca_file: ca.pem + cert_file: cert.pem + key_file: key.pem metadata: retry: max: 10