diff --git a/kafka/client.go b/kafka/client.go index 0873930fbf..7799f9fedf 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -33,6 +33,7 @@ type AdapterSASL struct { Enable bool `envconfig:"KAFKA_NET_SASL_ENABLE" required:"false"` User string `envconfig:"KAFKA_NET_SASL_USER" required:"false"` Password string `envconfig:"KAFKA_NET_SASL_PASSWORD" required:"false"` + Type string `envconfig:"KAFKA_NET_SASL_TYPE" required:"false"` } type AdapterTLS struct { @@ -65,8 +66,22 @@ func NewConfig(ctx context.Context) ([]string, *sarama.Config, error) { if env.Net.SASL.Enable { cfg.Net.SASL.Enable = true + cfg.Net.SASL.Handshake = true cfg.Net.SASL.User = env.Net.SASL.User cfg.Net.SASL.Password = env.Net.SASL.Password + + // We default to plain sasl type + cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext + + if env.Net.SASL.Type == sarama.SASLTypeSCRAMSHA256 { + cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } + cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + } + + if env.Net.SASL.Type == sarama.SASLTypeSCRAMSHA512 { + cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } } if env.Net.TLS.Enable { diff --git a/kafka/client_test.go b/kafka/client_test.go index 7360377cfa..7782d369db 100644 --- a/kafka/client_test.go +++ b/kafka/client_test.go @@ -333,16 +333,160 @@ func generateCert(t *testing.T) (string, string) { } func TestNewConfig(t *testing.T) { - ctx := context.Background() - // Increasing coverage - _ = os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "my-cluster-kafka-bootstrap.my-kafka-namespace:9092") + defaultBootstrapServer := "my-cluster-kafka-bootstrap.my-kafka-namespace:9092" + defaultSASLUser := "secret-user" + defaultSASLPassword := "super-seekrit-password" + testCases := map[string]struct { + env map[string]string + enabledTLS bool + enabledSASL bool + wantErr bool + saslMechanism string + bootstrapServer string + saslUser string + saslPassword string + }{ + "Just bootstrap Server": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + }, + bootstrapServer: defaultBootstrapServer, + }, + "Incorrect bootstrap Server": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + }, + bootstrapServer: "ImADoctorNotABootstrapServerJim!", + wantErr: true, + }, + /* + TODO + "Multiple bootstrap servers": { + env: map[string]string{ - servers, config, err := NewConfig(ctx) + }, - require.NoError(t, err) - require.NotNil(t, config) - require.Equal(t, []string{"my-cluster-kafka-bootstrap.my-kafka-namespace:9092"}, servers) + },*/ + "No Auth": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + }, + enabledTLS: false, + enabledSASL: false, + bootstrapServer: defaultBootstrapServer, + }, + "Defaulting to SASL-Plain Auth (none specified)": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + "KAFKA_NET_SASL_ENABLE": "true", + }, + enabledSASL: true, + saslMechanism: sarama.SASLTypePlaintext, + bootstrapServer: defaultBootstrapServer, + }, + "Only SASL-PLAIN Auth": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + "KAFKA_NET_SASL_ENABLE": "true", + "KAFKA_NET_SASL_USER": defaultSASLUser, + "KAFKA_NET_SASL_PASSWORD": defaultSASLPassword, + }, + enabledSASL: true, + saslUser: defaultSASLUser, + saslPassword: defaultSASLPassword, + saslMechanism: sarama.SASLTypePlaintext, + bootstrapServer: defaultBootstrapServer, + }, + "SASL-PLAIN Auth, Forgot User": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + "KAFKA_NET_SASL_ENABLE": "true", + "KAFKA_NET_SASL_PASSWORD": defaultSASLPassword, + }, + enabledSASL: true, + wantErr: true, + saslUser: defaultSASLUser, + saslPassword: defaultSASLPassword, + saslMechanism: sarama.SASLTypePlaintext, + bootstrapServer: defaultBootstrapServer, + }, + "SASL-PLAIN Auth, Forgot Password": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + "KAFKA_NET_SASL_ENABLE": "true", + "KAFKA_NET_SASL_USER": defaultSASLUser, + }, + enabledSASL: true, + wantErr: true, + saslUser: defaultSASLUser, + saslPassword: defaultSASLPassword, + saslMechanism: sarama.SASLTypePlaintext, + bootstrapServer: defaultBootstrapServer, + }, + "Only SASL-SCRAM-SHA-256 Auth": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + "KAFKA_NET_SASL_ENABLE": "true", + "KAFKA_NET_SASL_USER": defaultSASLUser, + "KAFKA_NET_SASL_PASSWORD": defaultSASLPassword, + "KAFKA_NET_SASL_TYPE": sarama.SASLTypeSCRAMSHA256, + }, + enabledSASL: true, + saslUser: defaultSASLUser, + saslPassword: defaultSASLPassword, + saslMechanism: sarama.SASLTypeSCRAMSHA256, + bootstrapServer: defaultBootstrapServer, + }, + "Only SASL-SCRAM-SHA-512 Auth": { + env: map[string]string{ + "KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer, + "KAFKA_NET_SASL_ENABLE": "true", + "KAFKA_NET_SASL_USER": defaultSASLUser, + "KAFKA_NET_SASL_PASSWORD": defaultSASLPassword, + "KAFKA_NET_SASL_TYPE": sarama.SASLTypeSCRAMSHA512, + }, + enabledSASL: true, + saslUser: defaultSASLUser, + saslPassword: defaultSASLPassword, + saslMechanism: sarama.SASLTypeSCRAMSHA512, + bootstrapServer: defaultBootstrapServer, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + for k, v := range tc.env { + _ = os.Setenv(k, v) + } + servers, config, err := NewConfig(context.Background()) + if err != nil && tc.wantErr != true { + t.Fatal(err) + } + if servers[0] != tc.bootstrapServer && tc.wantErr != true { + t.Fatalf("Incorrect bootstrapServers, got: %s vs want: %s", servers[0], tc.bootstrapServer) + } + if tc.enabledSASL { + if tc.saslMechanism != string(config.Net.SASL.Mechanism) { + t.Fatalf("Incorrect SASL mechanism, got: %s vs want: %s", string(config.Net.SASL.Mechanism), tc.saslMechanism) + } + + if config.Net.SASL.Enable != true { + t.Fatal("Incorrect SASL Configuration (not enabled)") + } + + if config.Net.SASL.User != tc.saslUser && !tc.wantErr { + t.Fatalf("Incorrect SASL User, got: %s vs want: %s", config.Net.SASL.User, tc.saslUser) + } + if config.Net.SASL.Password != tc.saslPassword && !tc.wantErr { + t.Fatalf("Incorrect SASL Password, got: %s vs want: %s", config.Net.SASL.Password, tc.saslPassword) + } + } + require.NotNil(t, config) + for k := range tc.env { + _ = os.Unsetenv(k) + } + }) + } } func TestAdminClient(t *testing.T) { diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go index 7124084324..0a2cee0e20 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go @@ -83,6 +83,9 @@ func (source *KafkaAuthSpec) ConvertTo(_ context.Context, obj apis.Convertible) }, Password: bindingsv1beta1.SecretValueFromSource{ SecretKeyRef: source.Net.SASL.Password.SecretKeyRef}, + Type: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: source.Net.SASL.Type.SecretKeyRef, + }, }, TLS: bindingsv1beta1.KafkaTLSSpec{ Enable: source.Net.TLS.Enable, @@ -117,6 +120,9 @@ func (sink *KafkaAuthSpec) ConvertFrom(_ context.Context, obj apis.Convertible) }, Password: SecretValueFromSource{ SecretKeyRef: source.Net.SASL.Password.SecretKeyRef}, + Type: SecretValueFromSource{ + SecretKeyRef: source.Net.SASL.Type.SecretKeyRef, + }, }, TLS: KafkaTLSSpec{ Enable: source.Net.TLS.Enable, diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go index f74bf62ec7..ac87a324d2 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go @@ -107,6 +107,15 @@ func TestKafkaBindingConversionRoundTripV1alpha1(t *testing.T) { Optional: pointer.BoolPtr(true), }, }, + Type: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-type-secret-local-obj-ref", + }, + Key: "sasl-type-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, }, TLS: KafkaTLSSpec{ Enable: true, @@ -248,6 +257,15 @@ func TestKafkaBindingConversionRoundTripV1beta1(t *testing.T) { Optional: pointer.BoolPtr(true), }, }, + Type: v1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "sasl-type-secret-local-obj-ref", + }, + Key: "sasl-type-secret-key", + Optional: pointer.BoolPtr(true), + }, + }, }, TLS: v1beta1.KafkaTLSSpec{ Enable: false, diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go index e51887fe0f..1c50bb4318 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go @@ -102,6 +102,11 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) { ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef, }, + }, corev1.EnvVar{ + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef, + }, }) } if kfb.Spec.Net.TLS.Enable { @@ -147,6 +152,11 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) { ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef, }, + }, corev1.EnvVar{ + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef, + }, }) } if kfb.Spec.Net.TLS.Enable { @@ -184,7 +194,7 @@ func (kfb *KafkaBinding) Undo(ctx context.Context, ps *duckv1.WithPod) { for j, ev := range c.Env { switch ev.Name { case "KAFKA_NET_TLS_ENABLE", "KAFKA_NET_TLS_CERT", "KAFKA_NET_TLS_KEY", "KAFKA_NET_TLS_CA_CERT", - "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", + "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", "KAFKA_NET_SASL_TYPE", "KAFKA_BOOTSTRAP_SERVERS": continue @@ -203,7 +213,7 @@ func (kfb *KafkaBinding) Undo(ctx context.Context, ps *duckv1.WithPod) { for j, ev := range c.Env { switch ev.Name { case "KAFKA_NET_TLS_ENABLE", "KAFKA_NET_TLS_CERT", "KAFKA_NET_TLS_KEY", "KAFKA_NET_TLS_CA_CERT", - "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", + "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", "KAFKA_NET_SASL_TYPE", "KAFKA_BOOTSTRAP_SERVERS": continue default: diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go index b7d957a45b..ba19014ee9 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle_test.go @@ -144,6 +144,16 @@ func TestKafkaBindingUndo(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -273,6 +283,14 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + Type: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }, }, }, @@ -318,6 +336,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -357,6 +385,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -410,6 +448,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -437,6 +485,9 @@ func TestKafkaBindingDoSASL(t *testing.T) { }, { Name: "KAFKA_NET_SASL_PASSWORD", Value: "bad", + }, { + Name: "KAFKA_NET_SASL_TYPE", + Value: "bad", }}, }}, }, @@ -476,6 +527,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go index c5d35f5a8e..37b44bd34b 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/kafka_types.go @@ -56,6 +56,10 @@ type KafkaSASLSpec struct { // Password is the Kubernetes secret containing the SASL password. // +optional Password SecretValueFromSource `json:"password,omitempty"` + + // Type of saslType, defaults to plain (vs SCRAM-SHA-512 or SCRAM-SHA-256) + // +optional + Type SecretValueFromSource `json:"type,omitempty"` } type KafkaTLSSpec struct { diff --git a/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go b/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go index 660dce5da1..956bc59098 100644 --- a/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go +++ b/kafka/source/pkg/apis/bindings/v1alpha1/zz_generated.deepcopy.go @@ -166,6 +166,7 @@ func (in *KafkaSASLSpec) DeepCopyInto(out *KafkaSASLSpec) { *out = *in in.User.DeepCopyInto(&out.User) in.Password.DeepCopyInto(&out.Password) + in.Type.DeepCopyInto(&out.Type) return } diff --git a/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle.go b/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle.go index 2232f255b9..6e27e7fd47 100644 --- a/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle.go +++ b/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle.go @@ -102,6 +102,11 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) { ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef, }, + }, corev1.EnvVar{ + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef, + }, }) } if kfb.Spec.Net.TLS.Enable { @@ -147,6 +152,11 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) { ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef, }, + }, corev1.EnvVar{ + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef, + }, }) } if kfb.Spec.Net.TLS.Enable { @@ -184,7 +194,7 @@ func (kfb *KafkaBinding) Undo(ctx context.Context, ps *duckv1.WithPod) { for j, ev := range c.Env { switch ev.Name { case "KAFKA_NET_TLS_ENABLE", "KAFKA_NET_TLS_CERT", "KAFKA_NET_TLS_KEY", "KAFKA_NET_TLS_CA_CERT", - "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", + "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", "KAFKA_NET_SASL_TYPE", "KAFKA_BOOTSTRAP_SERVERS": continue @@ -203,7 +213,7 @@ func (kfb *KafkaBinding) Undo(ctx context.Context, ps *duckv1.WithPod) { for j, ev := range c.Env { switch ev.Name { case "KAFKA_NET_TLS_ENABLE", "KAFKA_NET_TLS_CERT", "KAFKA_NET_TLS_KEY", "KAFKA_NET_TLS_CA_CERT", - "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", + "KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", "KAFKA_NET_SASL_TYPE", "KAFKA_BOOTSTRAP_SERVERS": continue default: diff --git a/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle_test.go b/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle_test.go index e3ddc01dc1..db5e9c920c 100644 --- a/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle_test.go +++ b/kafka/source/pkg/apis/bindings/v1beta1/kafka_lifecycle_test.go @@ -144,6 +144,16 @@ func TestKafkaBindingUndo(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -273,6 +283,14 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + Type: SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }, }, }, @@ -318,6 +336,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -357,6 +385,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -410,6 +448,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, @@ -437,6 +485,9 @@ func TestKafkaBindingDoSASL(t *testing.T) { }, { Name: "KAFKA_NET_SASL_PASSWORD", Value: "bad", + }, { + Name: "KAFKA_NET_SASL_TYPE", + Value: "bad", }}, }}, }, @@ -476,6 +527,16 @@ func TestKafkaBindingDoSASL(t *testing.T) { Key: corev1.BasicAuthPasswordKey, }, }, + }, { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "saslType", + }, + }, }}, }}, }, diff --git a/kafka/source/pkg/apis/bindings/v1beta1/kafka_types.go b/kafka/source/pkg/apis/bindings/v1beta1/kafka_types.go index 43b05b26dc..11b3289ed1 100644 --- a/kafka/source/pkg/apis/bindings/v1beta1/kafka_types.go +++ b/kafka/source/pkg/apis/bindings/v1beta1/kafka_types.go @@ -56,6 +56,10 @@ type KafkaSASLSpec struct { // Password is the Kubernetes secret containing the SASL password. // +optional Password SecretValueFromSource `json:"password,omitempty"` + + // Type of saslType, defaults to plain (vs SCRAM-SHA-512 or SCRAM-SHA-256) + // +optional + Type SecretValueFromSource `json:"type,omitempty"` } type KafkaTLSSpec struct { diff --git a/kafka/source/pkg/apis/bindings/v1beta1/zz_generated.deepcopy.go b/kafka/source/pkg/apis/bindings/v1beta1/zz_generated.deepcopy.go index 5a114cd7ec..bbc7a01957 100644 --- a/kafka/source/pkg/apis/bindings/v1beta1/zz_generated.deepcopy.go +++ b/kafka/source/pkg/apis/bindings/v1beta1/zz_generated.deepcopy.go @@ -166,6 +166,7 @@ func (in *KafkaSASLSpec) DeepCopyInto(out *KafkaSASLSpec) { *out = *in in.User.DeepCopyInto(&out.User) in.Password.DeepCopyInto(&out.Password) + in.Type.DeepCopyInto(&out.Type) return } diff --git a/kafka/source/pkg/reconciler/source/resources/receive_adapter.go b/kafka/source/pkg/reconciler/source/resources/receive_adapter.go index d68abc3238..4c6330290c 100644 --- a/kafka/source/pkg/reconciler/source/resources/receive_adapter.go +++ b/kafka/source/pkg/reconciler/source/resources/receive_adapter.go @@ -75,6 +75,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_SASL_USER", args.Source.Spec.Net.SASL.User.SecretKeyRef) env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_SASL_PASSWORD", args.Source.Spec.Net.SASL.Password.SecretKeyRef) + env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_SASL_TYPE", args.Source.Spec.Net.SASL.Type.SecretKeyRef) env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_CERT", args.Source.Spec.Net.TLS.Cert.SecretKeyRef) env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_KEY", args.Source.Spec.Net.TLS.Key.SecretKeyRef) env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_CA_CERT", args.Source.Spec.Net.TLS.CACert.SecretKeyRef) diff --git a/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go b/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go index b940a3843d..52fa9ea029 100644 --- a/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go +++ b/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go @@ -59,6 +59,14 @@ func TestMakeReceiveAdapter(t *testing.T) { Key: "password", }, }, + Type: bindingsv1beta1.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "the-sasltype-secret", + }, + Key: "saslType", + }, + }, }, TLS: bindingsv1beta1.KafkaTLSSpec{ Enable: true, @@ -191,6 +199,17 @@ func TestMakeReceiveAdapter(t *testing.T) { }, }, }, + { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "the-saslType-secret", + }, + Key: "saslType", + }, + }, + }, { Name: "KAFKA_NET_TLS_CERT", ValueFrom: &corev1.EnvVarSource{ @@ -411,6 +430,17 @@ func TestMakeReceiveAdapterNoNet(t *testing.T) { }, }, }, + { + Name: "KAFKA_NET_SASL_TYPE", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "the-saslType-secret", + }, + Key: "saslType", + }, + }, + }, { Name: "KAFKA_NET_TLS_CERT", ValueFrom: &corev1.EnvVarSource{ diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 2d8443df7f..aa1a5171fa 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -282,10 +282,10 @@ function test_channel_sasl() { initialize $@ --skip-istio-addon -test_channel_plain || fail_test create_tls_secrets -test_channel_tls || fail_test create_sasl_secrets +test_channel_plain || fail_test +test_channel_tls || fail_test test_channel_sasl || fail_test # If you wish to use this script just as test setup, *without* teardown, just uncomment this line and comment all go_test_e2e commands diff --git a/test/e2e/kafka_binding_test.go b/test/e2e/kafka_binding_test.go index d1a6034da7..b4a6817662 100644 --- a/test/e2e/kafka_binding_test.go +++ b/test/e2e/kafka_binding_test.go @@ -53,13 +53,13 @@ func testKafkaBinding(t *testing.T, version string, messageKey string, messageHe switch version { case "v1alpha1": contribtestlib.CreateKafkaSourceV1Alpha1OrFail(client, contribresources.KafkaSourceV1Alpha1( - kafkaBootstrapUrl, + kafkaBootstrapUrlPlain, kafkaTopicName, resources.ServiceRef(loggerPodName), )) case "v1beta1": contribtestlib.CreateKafkaSourceV1Beta1OrFail(client, contribresources.KafkaSourceV1Beta1( - kafkaBootstrapUrl, + kafkaBootstrapUrlPlain, kafkaTopicName, resources.ServiceRef(loggerPodName), )) @@ -74,7 +74,7 @@ func testKafkaBinding(t *testing.T, version string, messageKey string, messageHe switch version { case "v1alpha1": contribtestlib.CreateKafkaBindingV1Alpha1OrFail(client, contribresources.KafkaBindingV1Alpha1( - kafkaBootstrapUrl, + kafkaBootstrapUrlPlain, &tracker.Reference{ APIVersion: "batch/v1", Kind: "Job", @@ -85,7 +85,7 @@ func testKafkaBinding(t *testing.T, version string, messageKey string, messageHe )) case "v1beta1": contribtestlib.CreateKafkaBindingV1Beta1OrFail(client, contribresources.KafkaBindingV1Beta1( - kafkaBootstrapUrl, + kafkaBootstrapUrlPlain, &tracker.Reference{ APIVersion: "batch/v1", Kind: "Job", diff --git a/test/e2e/kafka_source_reconciler_test.go b/test/e2e/kafka_source_reconciler_test.go index ab7a75fce2..12e9a272f5 100644 --- a/test/e2e/kafka_source_reconciler_test.go +++ b/test/e2e/kafka_source_reconciler_test.go @@ -112,7 +112,7 @@ func createKafkaSourceWithSinkMissing(c *testlib.Client) { helpers.MustCreateTopic(c, kafkaClusterName, kafkaClusterNamespace, rtKafkaTopicName) contribtestlib.CreateKafkaSourceV1Beta1OrFail(c, contribresources.KafkaSourceV1Beta1( - kafkaBootstrapUrl, + kafkaBootstrapUrlPlain, rtKafkaTopicName, pkgTest.CoreV1ObjectReference(resources.InMemoryChannelKind, resources.MessagingAPIVersion, rtChannelName), contribresources.WithNameV1Beta1(rtKafkaSourceName), diff --git a/test/e2e/kafka_source_test.go b/test/e2e/kafka_source_test.go index 0d356a20af..c2ba5c6715 100644 --- a/test/e2e/kafka_source_test.go +++ b/test/e2e/kafka_source_test.go @@ -30,6 +30,9 @@ import ( . "github.com/cloudevents/sdk-go/v2/test" cetypes "github.com/cloudevents/sdk-go/v2/types" "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" + + "knative.dev/eventing/pkg/utils" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" @@ -42,26 +45,168 @@ import ( ) const ( - kafkaBootstrapUrl = "my-cluster-kafka-bootstrap.kafka.svc:9092" - kafkaClusterName = "my-cluster" - kafkaClusterNamespace = "kafka" + kafkaBootstrapUrlPlain = "my-cluster-kafka-bootstrap.kafka.svc:9092" + kafkaBootstrapUrlTLS = "my-cluster-kafka-bootstrap.kafka.svc:9093" + kafkaBootstrapUrlSASL = "my-cluster-kafka-bootstrap.kafka.svc:9094" + kafkaClusterName = "my-cluster" + kafkaClusterNamespace = "kafka" + kafkaClusterSASLUsername = "my-sasl-user" + kafkaClusterTLSUsername = "my-tls-user" + + kafkaSASLSecret = "strimzi-sasl-secret" + kafkaTLSSecret = "strimzi-tls-secret" ) -func testKafkaSource(t *testing.T, name string, version string, messageKey string, messageHeaders map[string]string, messagePayload string, matcherGen func(cloudEventsSourceName, cloudEventsEventType string) EventMatcher) { +type authSetup struct { + bootStrapServer string + SASLEnabled bool + TLSEnabled bool +} + +func withAuthEnablementV1Beta1(auth authSetup) contribresources.KafkaSourceV1Beta1Option { + // We test with sasl512 and enable tls with it, so check tls first + if auth.TLSEnabled == true { + return func(ks *sourcesv1beta1.KafkaSource) { + ks.Spec.KafkaAuthSpec.Net.TLS.Enable = true + ks.Spec.KafkaAuthSpec.Net.TLS.CACert.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaTLSSecret, + }, + Key: "ca.crt", + } + ks.Spec.KafkaAuthSpec.Net.TLS.Cert.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaTLSSecret, + }, + Key: "user.crt", + } + ks.Spec.KafkaAuthSpec.Net.TLS.Key.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaTLSSecret, + }, + Key: "user.key", + } + } + } + if auth.SASLEnabled == true { + return func(ks *sourcesv1beta1.KafkaSource) { + ks.Spec.KafkaAuthSpec.Net.SASL.Enable = true + ks.Spec.KafkaAuthSpec.Net.SASL.User.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "user", + } + ks.Spec.KafkaAuthSpec.Net.SASL.Password.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "password", + } + ks.Spec.KafkaAuthSpec.Net.SASL.Type.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "saslType", + } + ks.Spec.KafkaAuthSpec.Net.TLS.Enable = true + ks.Spec.KafkaAuthSpec.Net.TLS.CACert.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "ca.crt", + } + } + } + return func(ks *sourcesv1beta1.KafkaSource) {} +} + +func withAuthEnablementV1Alpha1(auth authSetup) contribresources.KafkaSourceV1Alpha1Option { + // We test with sasl512 and enable tls with it, so check tls first + if auth.TLSEnabled == true { + return func(ks *sourcesv1alpha1.KafkaSource) { + ks.Spec.KafkaAuthSpec.Net.TLS.Enable = true + ks.Spec.KafkaAuthSpec.Net.TLS.CACert.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaTLSSecret, + }, + Key: "ca.crt", + } + ks.Spec.KafkaAuthSpec.Net.TLS.Cert.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaTLSSecret, + }, + Key: "user.crt", + } + ks.Spec.KafkaAuthSpec.Net.TLS.Key.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaTLSSecret, + }, + Key: "user.key", + } + } + } + if auth.SASLEnabled == true { + return func(ks *sourcesv1alpha1.KafkaSource) { + ks.Spec.KafkaAuthSpec.Net.SASL.Enable = true + ks.Spec.KafkaAuthSpec.Net.SASL.User.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "user", + } + ks.Spec.KafkaAuthSpec.Net.SASL.Password.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "password", + } + ks.Spec.KafkaAuthSpec.Net.SASL.Type.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "saslType", + } + ks.Spec.KafkaAuthSpec.Net.TLS.Enable = true + ks.Spec.KafkaAuthSpec.Net.TLS.CACert.SecretKeyRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: kafkaSASLSecret, + }, + Key: "ca.crt", + } + } + } + return func(ks *sourcesv1alpha1.KafkaSource) {} +} +func testKafkaSource(t *testing.T, name string, version string, messageKey string, messageHeaders map[string]string, messagePayload string, matcherGen func(cloudEventsSourceName, cloudEventsEventType string) EventMatcher, auth authSetup) { name = fmt.Sprintf("%s-%s", name, version) var ( kafkaTopicName = uuid.New().String() consumerGroup = uuid.New().String() - recordEventPodName = "e2e-kafka-recordevent-" + strings.ReplaceAll(name, "_", "-") + recordEventPodName = "e2e-kafka-r-" + strings.ReplaceAll(name, "_", "-") kafkaSourceName = "e2e-kafka-source-" + strings.ReplaceAll(name, "_", "-") ) client := testlib.Setup(t, true) defer testlib.TearDown(client) + if auth.SASLEnabled { + _, err := utils.CopySecret(client.Kube.Kube.CoreV1(), "knative-eventing", kafkaSASLSecret, client.Namespace, "default") + if err != nil { + t.Fatalf("could not copy SASL secret(%s): %v", kafkaSASLSecret, err) + } + } + if auth.TLSEnabled { + _, err := utils.CopySecret(client.Kube.Kube.CoreV1(), "knative-eventing", kafkaTLSSecret, client.Namespace, "default") + if err != nil { + t.Fatalf("could not copy secret(%s): %v", kafkaTLSSecret, err) + } + } helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName) - + if len(recordEventPodName) > 63 { + recordEventPodName = recordEventPodName[:63] + } eventTracker, _ := recordevents.StartEventRecordOrFail(context.Background(), client, recordEventPodName) var ( @@ -73,21 +218,23 @@ func testKafkaSource(t *testing.T, name string, version string, messageKey strin switch version { case "v1alpha1": contribtestlib.CreateKafkaSourceV1Alpha1OrFail(client, contribresources.KafkaSourceV1Alpha1( - kafkaBootstrapUrl, + auth.bootStrapServer, kafkaTopicName, resources.ServiceRef(recordEventPodName), contribresources.WithNameV1Alpha1(kafkaSourceName), contribresources.WithConsumerGroupV1Alpha1(consumerGroup), + withAuthEnablementV1Alpha1(auth), )) cloudEventsSourceName = sourcesv1alpha1.KafkaEventSource(client.Namespace, kafkaSourceName, kafkaTopicName) cloudEventsEventType = sourcesv1alpha1.KafkaEventType case "v1beta1": contribtestlib.CreateKafkaSourceV1Beta1OrFail(client, contribresources.KafkaSourceV1Beta1( - kafkaBootstrapUrl, + auth.bootStrapServer, kafkaTopicName, resources.ServiceRef(recordEventPodName), contribresources.WithNameV1Beta1(kafkaSourceName), contribresources.WithConsumerGroupV1Beta1(consumerGroup), + withAuthEnablementV1Beta1(auth), )) cloudEventsSourceName = sourcesv1beta1.KafkaEventSource(client.Namespace, kafkaSourceName, kafkaTopicName) cloudEventsEventType = sourcesv1beta1.KafkaEventType @@ -97,7 +244,7 @@ func testKafkaSource(t *testing.T, name string, version string, messageKey strin client.WaitForAllTestResourcesReadyOrFail(context.Background()) - helpers.MustPublishKafkaMessage(client, kafkaBootstrapUrl, kafkaTopicName, messageKey, messageHeaders, messagePayload) + helpers.MustPublishKafkaMessage(client, kafkaBootstrapUrlPlain, kafkaTopicName, messageKey, messageHeaders, messagePayload) eventTracker.AssertExact(1, recordevents.MatchEvent(matcherGen(cloudEventsSourceName, cloudEventsEventType))) } @@ -105,6 +252,31 @@ func testKafkaSource(t *testing.T, name string, version string, messageKey strin func TestKafkaSource(t *testing.T) { time, _ := cetypes.ParseTime("2018-04-05T17:31:00Z") + auths := map[string]struct { + auth authSetup + }{ + "plain": { + auth: authSetup{ + bootStrapServer: kafkaBootstrapUrlPlain, + SASLEnabled: false, + TLSEnabled: false, + }, + }, + "s512": { + auth: authSetup{ + bootStrapServer: kafkaBootstrapUrlSASL, + SASLEnabled: true, + TLSEnabled: false, + }, + }, + "tls": { + auth: authSetup{ + bootStrapServer: kafkaBootstrapUrlTLS, + SASLEnabled: false, + TLSEnabled: true, + }, + }, + } tests := map[string]struct { messageKey string messageHeaders map[string]string @@ -139,7 +311,7 @@ func TestKafkaSource(t *testing.T) { ) }, }, - "no_event_no_content_type_no_key": { + "no_event_content_type_or_key": { messagePayload: `{"value":5}`, matcherGen: func(cloudEventsSourceName, cloudEventsEventType string) EventMatcher { return AllOf( @@ -229,14 +401,17 @@ func TestKafkaSource(t *testing.T) { }, }, } - for name, test := range tests { - test := test - t.Run(name+"-v1alpha1", func(t *testing.T) { - testKafkaSource(t, name, "v1alpha1", test.messageKey, test.messageHeaders, test.messagePayload, test.matcherGen) - }) - t.Run(name+"-v1beta1", func(t *testing.T) { - testKafkaSource(t, name, "v1beta1", test.messageKey, test.messageHeaders, test.messagePayload, test.matcherGen) - }) + for authName, auth := range auths { + for name, test := range tests { + test := test + name := name + "_" + authName + t.Run(name+"-v1alpha1", func(t *testing.T) { + testKafkaSource(t, name, "v1alpha1", test.messageKey, test.messageHeaders, test.messagePayload, test.matcherGen, auth.auth) + }) + t.Run(name+"-v1beta1", func(t *testing.T) { + testKafkaSource(t, name, "v1beta1", test.messageKey, test.messageHeaders, test.messagePayload, test.matcherGen, auth.auth) + }) + } } }