diff --git a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml index 1dceedae6e..f09016f336 100644 --- a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml +++ b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml @@ -147,15 +147,28 @@ spec: replyCACerts: description: replyCACerts is the CA certs to trust for the reply. type: string + replyAudience: + description: ReplyAudience is the OIDC audience for the replyUri. + type: string subscriberUri: description: SubscriberURI is the endpoint for the subscriber type: string subscriberCACerts: description: SubscriberCACerts is the CA certs to trust for the subscriber. type: string + subscriberAudience: + description: SubscriberAudience is the OIDC audience for the subscriberUri. + type: string uid: description: UID is used to understand the origin of the subscriber. type: string + auth: + description: Auth provides the relevant information for OIDC authentication. + type: object + properties: + serviceAccountName: + description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. + type: string status: description: Status represents the current state of the KafkaChannel. This data may be out of date. type: object @@ -239,6 +252,9 @@ spec: type: string deadLetterSinkCACerts: type: string + deadLetterSinkAudience: + description: OIDC audience of the dead letter sink. + type: string observedGeneration: description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller. type: integer @@ -262,6 +278,13 @@ spec: uid: description: UID is used to understand the origin of the subscriber. type: string + auth: + description: Auth provides the relevant information for OIDC authentication. + type: object + properties: + serviceAccountName: + description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. + type: string additionalPrinterColumns: - name: Ready type: string diff --git a/control-plane/pkg/receiver/address.go b/control-plane/pkg/receiver/address.go index 07deba0c16..9e57492b17 100644 --- a/control-plane/pkg/receiver/address.go +++ b/control-plane/pkg/receiver/address.go @@ -35,29 +35,32 @@ func Address(host string, object metav1.Object) *url.URL { } // HTTPAddress returns the addressable -func HTTPAddress(host string, object metav1.Object) duckv1.Addressable { +func HTTPAddress(host string, audience *string, object metav1.Object) duckv1.Addressable { httpAddress := duckv1.Addressable{ - Name: pointer.String("http"), - URL: apis.HTTP(host), + Name: pointer.String("http"), + URL: apis.HTTP(host), + Audience: audience, } httpAddress.URL.Path = fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName()) return httpAddress } // HTTPAddress returns the addressable -func ChannelHTTPAddress(host string) duckv1.Addressable { +func ChannelHTTPAddress(host string, audience *string) duckv1.Addressable { httpAddress := duckv1.Addressable{ - Name: pointer.String("http"), - URL: apis.HTTP(host), + Name: pointer.String("http"), + URL: apis.HTTP(host), + Audience: audience, } return httpAddress } -func HTTPSAddress(host string, object metav1.Object, caCerts *string) duckv1.Addressable { +func HTTPSAddress(host string, audience *string, object metav1.Object, caCerts *string) duckv1.Addressable { httpsAddress := duckv1.Addressable{ - Name: pointer.String("https"), - URL: apis.HTTPS(host), - CACerts: caCerts, + Name: pointer.String("https"), + URL: apis.HTTPS(host), + CACerts: caCerts, + Audience: audience, } httpsAddress.URL.Path = fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName()) return httpsAddress diff --git a/control-plane/pkg/receiver/address_test.go b/control-plane/pkg/receiver/address_test.go index cbbb3fe9f7..f17b8a0b8b 100644 --- a/control-plane/pkg/receiver/address_test.go +++ b/control-plane/pkg/receiver/address_test.go @@ -52,9 +52,11 @@ func TestHTTPSAddress(t *testing.T) { Name: "ks", }, } - httpsAddress := HTTPSAddress(host, ks, pointer.String(string(eventingtlstesting.CA))) + aud := "my-audience" + httpsAddress := HTTPSAddress(host, &aud, ks, pointer.String(string(eventingtlstesting.CA))) require.Equal(t, httpsAddress.URL.Host, host) + require.Equal(t, httpsAddress.Audience, &aud) require.Equal(t, httpsAddress.URL.Scheme, "https") require.Contains(t, httpsAddress.URL.Path, ks.GetNamespace()) require.Contains(t, httpsAddress.URL.Path, ks.GetName()) @@ -69,9 +71,11 @@ func TestHTTPAddress(t *testing.T) { Name: "ks", }, } - httpAddress := HTTPAddress(host, ks) + aud := "my-audience" + httpAddress := HTTPAddress(host, &aud, ks) require.Equal(t, host, httpAddress.URL.Host) + require.Equal(t, httpAddress.Audience, &aud) require.Equal(t, httpAddress.URL.Scheme, "http") require.Contains(t, httpAddress.URL.Path, ks.GetNamespace()) require.Contains(t, httpAddress.URL.Path, ks.GetName()) diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index c5538dd379..85baf16e36 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -247,8 +247,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) return err } - httpAddress := receiver.HTTPAddress(ingressHost, broker) - httpsAddress := receiver.HTTPSAddress(ingressHost, broker, caCerts) + httpAddress := receiver.HTTPAddress(ingressHost, nil, broker) + httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress, httpsAddress} } else if transportEncryptionFlags.IsStrictTransportEncryption() { @@ -257,11 +257,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) return err } - httpsAddress := receiver.HTTPSAddress(ingressHost, broker, caCerts) + httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts) addressableStatus.Address = &httpsAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} } else { - httpAddress := receiver.HTTPAddress(ingressHost, broker) + httpAddress := receiver.HTTPAddress(ingressHost, nil, broker) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } @@ -360,7 +360,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.HTTPAddress(ingressHost, broker) + address := receiver.HTTPAddress(ingressHost, nil, broker) proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 92030261c3..4207ba2afe 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -22,10 +22,13 @@ import ( "strconv" "time" + "k8s.io/utils/pointer" + "knative.dev/eventing/pkg/auth" + "knative.dev/pkg/logging" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/pointer" "knative.dev/eventing/pkg/apis/feature" messaging "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/pkg/network" @@ -313,18 +316,27 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return err } + featureFlags := feature.FromContext(ctx) + var audience *string + if featureFlags.IsOIDCAuthentication() { + audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled") + audience = nil + } + var addressableStatus duckv1.AddressStatus channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace) channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace) - transportEncryptionFlags := feature.FromContext(ctx) - if transportEncryptionFlags.IsPermissiveTransportEncryption() { + if featureFlags.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err } - httpAddress := receiver.ChannelHTTPAddress(channelHttpHost) - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts) + httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -332,7 +344,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta // - http address with path-based routing addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress} addressableStatus.Address = &httpAddress - } else if transportEncryptionFlags.IsStrictTransportEncryption() { + } else if featureFlags.IsStrictTransportEncryption() { // Strict mode: (only https addresses) // - status.address https address with path-based routing // - status.addresses: @@ -342,11 +354,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return err } - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts) addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} addressableStatus.Address = &httpsAddress } else { - httpAddress := receiver.ChannelHTTPAddress(channelHttpHost) + httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } @@ -433,7 +445,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.HTTPAddress(r.IngressHost, channel) + address := receiver.HTTPAddress(r.IngressHost, nil, channel) proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 9a4c901dcd..eda161a896 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -2005,6 +2005,91 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, }, + { + Name: "Reconciled normal - OIDC enabled", + Objects: []runtime.Object{ + NewChannel(), + NewService(), + NewPerChannelService(DefaultEnv), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + ChannelDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + }, + Key: testKey, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{ChannelTopic()}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Path: receiver.Path(ChannelNamespace, ChannelName), + Host: receiver.Host(ChannelNamespace, ChannelName), + }, + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + ChannelDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), + StatusTopicReadyWithName(ChannelTopic()), + StatusDataPlaneAvailable, + ChannelAddressable(&env), + StatusProbeSucceeded, + WithChannelAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }, + }), + WithChannelAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }), + WithChannelAddessable(), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + }, } useTable(t, table, env) diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index e682e6d110..95538cbc84 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -87,7 +87,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf logger := logging.FromContext(ctx) - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + var globalResync func(obj interface{}) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) featureStore.WatchConfigs(watcher) _, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx) @@ -136,7 +141,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) - globalResync := func(_ interface{}) { + globalResync = func(_ interface{}) { impl.GlobalResync(channelInformer.Informer()) } diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 40c434f105..715a73de0d 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -24,6 +24,10 @@ import ( "strings" "time" + "k8s.io/utils/pointer" + "knative.dev/eventing/pkg/auth" + "knative.dev/pkg/logging" + "github.com/IBM/sarama" "go.uber.org/multierr" "go.uber.org/zap" @@ -34,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/utils/pointer" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/network" "knative.dev/pkg/resolver" @@ -278,18 +281,27 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta return err } + featureFlags := feature.FromContext(ctx) + var audience *string + if featureFlags.IsOIDCAuthentication() { + audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled") + audience = nil + } + var addressableStatus duckv1.AddressStatus channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace) channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace) - transportEncryptionFlags := feature.FromContext(ctx) - if transportEncryptionFlags.IsPermissiveTransportEncryption() { + if featureFlags.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err } - httpAddress := receiver.ChannelHTTPAddress(channelHttpHost) - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channelService, caCerts) + httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channelService, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -297,7 +309,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta // - http address with path-based routing addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress} addressableStatus.Address = &httpAddress - } else if transportEncryptionFlags.IsStrictTransportEncryption() { + } else if featureFlags.IsStrictTransportEncryption() { // Strict mode: (only https addresses) // - status.address https address with path-based routing // - status.addresses: @@ -307,11 +319,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta return err } - httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channelService, caCerts) + httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channelService, caCerts) addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} addressableStatus.Address = &httpsAddress } else { - httpAddress := receiver.ChannelHTTPAddress(channelHttpHost) + httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } @@ -421,7 +433,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.HTTPAddress(r.IngressHost, channel) + address := receiver.HTTPAddress(r.IngressHost, nil, channel) proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index 84b7fbfac7..6a51b5b97f 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -1904,6 +1904,82 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, }, + + { + Name: "Reconciled normal - OIDC enabled", + Objects: []runtime.Object{ + NewChannel(), + NewConfigMapWithTextData(env.SystemNamespace, DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{ChannelTopic()}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + }, + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(&env), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), + StatusTopicReadyWithName(ChannelTopic()), + ChannelAddressable(&env), + StatusProbeSucceeded, + StatusChannelSubscribers(), + WithChannelAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }, + }), + WithChannelAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: ChannelAddress(), + Audience: pointer.String(ChannelAudience), + }), + WithChannelAddessable(), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + }, } table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 91ce8242cd..213ada2bca 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -255,8 +255,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) return err } - httpAddress := receiver.HTTPAddress(r.IngressHost, ks) - httpsAddress := receiver.HTTPSAddress(r.IngressHost, ks, caCerts) + httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks) + httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -273,14 +273,14 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) if err != nil { return err } - httpsAddress := receiver.HTTPSAddress(r.IngressHost, ks, caCerts) + httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts) addressableStatus.Address = &httpsAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} } else { // Disabled mode: // Unchange - httpAddress := receiver.HTTPAddress(r.IngressHost, ks) + httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} @@ -357,7 +357,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.HTTPAddress(r.IngressHost, ks) + address := receiver.HTTPAddress(r.IngressHost, nil, ks) proberAddressable := prober.ProberAddressable{ AddressStatus: &duckv1.AddressStatus{ Address: &address, diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index b87873c204..76d3c5850c 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -52,6 +52,7 @@ const ( ChannelUUID = "c1234567-8901-2345-6789-123456789101" ChannelBootstrapServers = "kafka-1:9092,kafka-2:9093" ChannelServiceName = "kc-kn-channel" + ChannelAudience = "messaging.knative.dev/kafkachannel/" + ChannelNamespace + "/" + ChannelName Subscription1Name = "sub-1" Subscription2Name = "sub-2" diff --git a/test/e2e_new_channel/kafka_channel_test.go b/test/e2e_new_channel/kafka_channel_test.go index a3f01e16c4..5471791b66 100644 --- a/test/e2e_new_channel/kafka_channel_test.go +++ b/test/e2e_new_channel/kafka_channel_test.go @@ -23,6 +23,9 @@ import ( "testing" "time" + "knative.dev/eventing/test/rekt/features/channel" + "knative.dev/eventing/test/rekt/features/oidc" + "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" @@ -31,6 +34,7 @@ import ( "knative.dev/reconciler-test/pkg/state" "knative.dev/eventing-kafka-broker/test/rekt/features/kafkachannel" + kafkachannelresource "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkachannel" ) const ( @@ -69,3 +73,23 @@ func TestKafkaChannelReadiness(t *testing.T) { env.Test(ctx, t, f) } } + +func TestKafkaChannelOIDC(t *testing.T) { + // Run Test In Parallel With Others + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.WithPollTimings(3*time.Second, 120*time.Second), + environment.Managed(t), + ) + + name := feature.MakeRandomK8sName("kafkaChannel") + env.Prerequisite(ctx, t, channel.ImplGoesReady(name)) + + env.Test(ctx, t, oidc.AddressableHasAudiencePopulated(kafkachannelresource.GVR(), kafkachannelresource.GVK().Kind, name, env.Namespace())) + // when the KafkaChannel supports all the OIDC features, we can do `TestKafkaChannelOIDC = rekt.TestChannelImplSupportsOIDC` too +} diff --git a/vendor/knative.dev/eventing/test/rekt/features/oidc/addressable_oidc_conformance.go b/vendor/knative.dev/eventing/test/rekt/features/oidc/addressable_oidc_conformance.go new file mode 100644 index 0000000000..bee8f85adb --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/features/oidc/addressable_oidc_conformance.go @@ -0,0 +1,189 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package oidc + +import ( + "fmt" + + "knative.dev/eventing/test/rekt/features/featureflags" + + "github.com/cloudevents/sdk-go/v2/test" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/eventing/pkg/auth" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/reconciler-test/pkg/eventshub" + eventassert "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" +) + +func AddressableOIDCConformance(gvr schema.GroupVersionResource, kind, name, namespace string) *feature.FeatureSet { + fs := feature.FeatureSet{ + Name: fmt.Sprintf("%s handles requests with OIDC tokens correctly", kind), + Features: AddressableOIDCTokenConformance(gvr, kind, name).Features, + } + + fs.Features = append(fs.Features, + AddressableHasAudiencePopulated(gvr, kind, name, namespace), + ) + + return &fs +} + +func AddressableOIDCTokenConformance(gvr schema.GroupVersionResource, kind, name string) *feature.FeatureSet { + fs := feature.FeatureSet{ + Name: fmt.Sprintf("%s handles requests with OIDC tokens correctly", kind), + Features: []*feature.Feature{ + addressableRejectInvalidAudience(gvr, kind, name), + addressableRejectCorruptedSignature(gvr, kind, name), + addressableRejectExpiredToken(gvr, kind, name), + addressableAllowsValidRequest(gvr, kind, name), + }, + } + + return &fs +} + +func AddressableHasAudiencePopulated(gvr schema.GroupVersionResource, kind, name, namespace string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s populates its .status.address.audience correctly", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + + f.Requirement(fmt.Sprintf("%s is ready", kind), k8s.IsReady(gvr, name)) + f.Requirement(fmt.Sprintf("%s is addressable", kind), k8s.IsAddressable(gvr, name)) + + expectedAudience := auth.GetAudience(gvr.GroupVersion().WithKind(kind), metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }) + + f.Alpha(kind).Must("have audience set", addressable.ValidateAddress(gvr, name, addressable.AssertAddressWithAudience(expectedAudience))) + + return f +} + +func addressableRejectInvalidAudience(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s reject event for wrong OIDC audience", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + source := feature.MakeRandomK8sName("source") + + event := test.FullEvent() + + f.Requirement(fmt.Sprintf("%s is ready", kind), k8s.IsReady(gvr, name)) + f.Requirement(fmt.Sprintf("%s is addressable", kind), k8s.IsAddressable(gvr, name)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.OIDCInvalidAudience(), + eventshub.InputEvent(event), + )) + + f.Alpha(kind). + Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). + Must("get 401 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(401)).Exact(1)) + + return f +} + +func addressableRejectExpiredToken(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s reject event with expired OIDC token", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + source := feature.MakeRandomK8sName("source") + + event := test.FullEvent() + + f.Requirement(fmt.Sprintf("%s is ready", kind), k8s.IsReady(gvr, name)) + f.Requirement(fmt.Sprintf("%s is addressable", kind), k8s.IsAddressable(gvr, name)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.OIDCExpiredToken(), + eventshub.InputEvent(event), + )) + + f.Alpha(kind). + Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). + Must("get 401 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(401)).Exact(1)) + + return f +} + +func addressableRejectCorruptedSignature(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s reject event with corrupted OIDC token signature", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + source := feature.MakeRandomK8sName("source") + + event := test.FullEvent() + + f.Requirement(fmt.Sprintf("%s is ready", kind), k8s.IsReady(gvr, name)) + f.Requirement(fmt.Sprintf("%s is addressable", kind), k8s.IsAddressable(gvr, name)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.OIDCCorruptedSignature(), + eventshub.InputEvent(event), + )) + + f.Alpha(kind). + Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). + Must("get 401 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(401)).Exact(1)) + + return f +} + +func addressableAllowsValidRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s handles event with valid OIDC token", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + source := feature.MakeRandomK8sName("source") + + event := test.FullEvent() + + f.Requirement(fmt.Sprintf("%s is ready", kind), k8s.IsReady(gvr, name)) + f.Requirement(fmt.Sprintf("%s is addressable", kind), k8s.IsAddressable(gvr, name)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event), + )) + + f.Alpha(kind). + Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). + Must("get 202 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(202)).Exact(1)) + + return f +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b00c27d60a..0eed1c37b7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1459,6 +1459,7 @@ knative.dev/eventing/test/rekt/features/channel knative.dev/eventing/test/rekt/features/featureflags knative.dev/eventing/test/rekt/features/knconf knative.dev/eventing/test/rekt/features/new_trigger_filters +knative.dev/eventing/test/rekt/features/oidc knative.dev/eventing/test/rekt/features/source knative.dev/eventing/test/rekt/features/trigger knative.dev/eventing/test/rekt/resources/account_role