diff --git a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml index f09016f336..6058c60cf7 100644 --- a/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml +++ b/control-plane/config/eventing-kafka-broker/100-channel/100-kafka-channel.yaml @@ -86,6 +86,9 @@ spec: type: string CACerts: type: string + audience: + description: Audience is the OIDC audience for the deadLetterSink. + type: string retry: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer @@ -132,6 +135,9 @@ spec: type: string CACerts: type: string + audience: + description: Audience is the OIDC audience for the deadLetterSink. + type: string retry: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer diff --git a/control-plane/config/eventing-kafka-broker/100-source/100-kafka-source.yaml b/control-plane/config/eventing-kafka-broker/100-source/100-kafka-source.yaml index ac3b38148a..ac5391ba80 100644 --- a/control-plane/config/eventing-kafka-broker/100-source/100-kafka-source.yaml +++ b/control-plane/config/eventing-kafka-broker/100-source/100-kafka-source.yaml @@ -87,9 +87,6 @@ spec: description: DeadLetterSink is the sink receiving event that could not be sent to a destination. type: object properties: - CACerts: - description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any. - type: string ref: description: Ref points to an Addressable. type: object @@ -118,6 +115,12 @@ spec: uri: description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. type: string + CACerts: + description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any. + type: string + audience: + description: Audience is the OIDC audience for the deadLetterSink. + type: string retry: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer @@ -271,9 +274,6 @@ spec: description: Sink is a reference to an object that will resolve to a uri to use as the sink. type: object properties: - CACerts: - description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any. - type: string ref: description: Ref points to an Addressable. type: object @@ -302,6 +302,12 @@ spec: uri: description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. type: string + CACerts: + description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any. + type: string + audience: + description: Audience is the OIDC audience for the sink. + type: string topics: description: Topic topics to consume messages from type: array @@ -392,6 +398,16 @@ spec: sinkUri: description: SinkURI is the current active sink URI that has been configured for the Source. type: string + sinkAudience: + description: SinkAudience is the OIDC audience of the sink. + type: string + auth: + description: Auth provides the relevant information for OIDC authentication. + type: object + properties: + serviceAccountName: + description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. + type: string subresources: status: {} scale: diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index 15b19f8e22..6778c27e06 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -100,6 +100,10 @@ type ConsumerGroupSpec struct { // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors // +optional Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"` + + // OIDCServiceAccountName is the name of service account used for this components + // OIDC authentication. + OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"` } type ConsumerGroupStatus struct { @@ -120,6 +124,10 @@ type ConsumerGroupStatus struct { // +optional SubscriberCACerts *string `json:"subscriberCACerts,omitempty"` + // SubscriberAudience is the OIDC audience for the resolved URI + // +optional + SubscriberAudience *string `json:"subscriberAudience,omitempty"` + // DeliveryStatus contains a resolved URL to the dead letter sink address, and any other // resolved delivery options. eventingduckv1.DeliveryStatus `json:",inline"` diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go index f4bbce0c4f..1861099c23 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go @@ -103,6 +103,10 @@ type ConsumerSpec struct { // PodBind represents a reference to the pod in which the consumer should be placed. PodBind *PodBind `json:"podBind"` + + // OIDCServiceAccountName is the name of the generated service account + // used for this components OIDC authentication. + OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"` } type ReplyStrategy struct { @@ -208,6 +212,10 @@ type ConsumerStatus struct { // +optional SubscriberCACerts *string `json:"subscriberCACerts,omitempty"` + // SubscriberAudience is the OIDC audience for the resolved URI + // +optional + SubscriberAudience *string `json:"subscriberAudience,omitempty"` + // DeliveryStatus contains a resolved URL to the dead letter sink address, and any other // resolved delivery options. eventingduck.DeliveryStatus `json:",inline"` diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go index 3b0ec4bd77..ba53fa9199 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go @@ -221,6 +221,11 @@ func (in *ConsumerGroupSpec) DeepCopyInto(out *ConsumerGroupSpec) { (*out)[key] = val } } + if in.OIDCServiceAccountName != nil { + in, out := &in.OIDCServiceAccountName, &out.OIDCServiceAccountName + *out = new(string) + **out = **in + } return } @@ -249,6 +254,11 @@ func (in *ConsumerGroupStatus) DeepCopyInto(out *ConsumerGroupStatus) { *out = new(string) **out = **in } + if in.SubscriberAudience != nil { + in, out := &in.SubscriberAudience, &out.SubscriberAudience + *out = new(string) + **out = **in + } in.DeliveryStatus.DeepCopyInto(&out.DeliveryStatus) if in.Replicas != nil { in, out := &in.Replicas, &out.Replicas @@ -346,6 +356,11 @@ func (in *ConsumerSpec) DeepCopyInto(out *ConsumerSpec) { *out = new(PodBind) **out = **in } + if in.OIDCServiceAccountName != nil { + in, out := &in.OIDCServiceAccountName, &out.OIDCServiceAccountName + *out = new(string) + **out = **in + } return } @@ -373,6 +388,11 @@ func (in *ConsumerStatus) DeepCopyInto(out *ConsumerStatus) { *out = new(string) **out = **in } + if in.SubscriberAudience != nil { + in, out := &in.SubscriberAudience, &out.SubscriberAudience + *out = new(string) + **out = **in + } in.DeliveryStatus.DeepCopyInto(&out.DeliveryStatus) return } diff --git a/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go b/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go index 64028ec369..d8c966c2dd 100644 --- a/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go +++ b/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go @@ -95,6 +95,7 @@ func (s *KafkaSourceStatus) MarkSink(addr *duckv1.Addressable) { if addr.URL != nil && !addr.URL.IsEmpty() { s.SinkURI = addr.URL s.SinkCACerts = addr.CACerts + s.SinkAudience = addr.Audience KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionSinkProvided) } else { KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "") diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index e7d8dff3cb..fa45f52966 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -77,6 +77,9 @@ func EgressConfigFromDelivery( if deadLetterSinkAddr.CACerts != nil { egressConfig.DeadLetterCACerts = *deadLetterSinkAddr.CACerts } + if deadLetterSinkAddr.Audience != nil { + egressConfig.DeadLetterAudience = *deadLetterSinkAddr.Audience + } } if delivery.Retry != nil { diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 85baf16e36..9cee1b17ef 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -641,6 +641,10 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, } } + if broker.Status.Address != nil && broker.Status.Address.Audience != nil { + resource.Ingress.Audience = *broker.Status.Address.Audience + } + egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.DefaultBackoffDelayMs) if err != nil { return nil, err diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 4207ba2afe..2d2cb918b2 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -618,6 +618,12 @@ func (r *Reconciler) getSubscriberConfig(ctx context.Context, channel *messaging if subscriber.SubscriberCACerts != nil && *subscriber.SubscriberCACerts != "" { egress.DestinationCACerts = *subscriber.SubscriberCACerts } + if subscriber.SubscriberAudience != nil && *subscriber.SubscriberAudience != "" { + egress.DestinationAudience = *subscriber.SubscriberAudience + } + if subscriber.Auth != nil && subscriber.Auth.ServiceAccountName != nil { + egress.OidcServiceAccountName = *subscriber.Auth.ServiceAccountName + } if subscriptionName != "" { egress.Reference = &contract.Reference{ @@ -634,6 +640,9 @@ func (r *Reconciler) getSubscriberConfig(ctx context.Context, channel *messaging if subscriber.ReplyCACerts != nil && *subscriber.ReplyCACerts != "" { egress.ReplyUrlCACerts = *subscriber.ReplyCACerts } + if subscriber.ReplyAudience != nil && *subscriber.ReplyAudience != "" { + egress.ReplyUrlAudience = *subscriber.ReplyAudience + } } subscriptionEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, subscriber.Delivery, r.DefaultBackoffDelayMs) @@ -713,6 +722,10 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin } } + if channel.Status.Address != nil && channel.Status.Address.Audience != nil { + resource.Ingress.Audience = *channel.Status.Address.Audience + } + egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs) if err != nil { return nil, err diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 715a73de0d..8c2c7645f4 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -703,6 +703,10 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin } } + if channel.Status.Address != nil && channel.Status.Address.Audience != nil { + resource.Ingress.Audience = *channel.Status.Address.Audience + } + egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs) if err != nil { return nil, err diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index 684ed5485e..eda4cc81bc 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -141,6 +141,7 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern } c.Status.SubscriberURI = destinationAddr.URL c.Status.SubscriberCACerts = destinationAddr.CACerts + c.Status.SubscriberAudience = destinationAddr.Audience egressConfig := &contract.EgressConfig{} if c.Spec.Delivery != nil { @@ -154,6 +155,9 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern if egressConfig.DeadLetterCACerts != "" { c.Status.DeliveryStatus.DeadLetterSinkCACerts = pointer.String(egressConfig.DeadLetterCACerts) } + if egressConfig.DeadLetterAudience != "" { + c.Status.DeliveryStatus.DeadLetterSinkAudience = pointer.String(egressConfig.DeadLetterAudience) + } } egress := &contract.Egress{ @@ -176,11 +180,18 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern if destinationAddr.CACerts != nil { egress.DestinationCACerts = *destinationAddr.CACerts } + if destinationAddr.Audience != nil { + egress.DestinationAudience = *destinationAddr.Audience + } if c.Spec.Configs.KeyType != nil { egress.KeyType = coreconfig.KeyTypeFromString(*c.Spec.Configs.KeyType) } + if c.Spec.OIDCServiceAccountName != nil { + egress.OidcServiceAccountName = *c.Spec.OIDCServiceAccountName + } + if err := r.reconcileReplyStrategy(ctx, c, egress); err != nil { return nil, fmt.Errorf("failed to reconcile reply strategy: %w", err) } @@ -290,6 +301,9 @@ func (r *Reconciler) reconcileReplyStrategy(ctx context.Context, c *kafkainterna if destination.CACerts != nil { egress.ReplyUrlCACerts = *destination.CACerts } + if destination.Audience != nil { + egress.ReplyUrlAudience = *destination.Audience + } return nil } if c.Spec.Reply.TopicReply != nil && c.Spec.Reply.TopicReply.Enabled { diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 7aee0f6567..1776486a07 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -506,12 +506,18 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con if c.Status.SubscriberCACerts != nil { cg.Status.SubscriberCACerts = c.Status.SubscriberCACerts } + if c.Status.SubscriberAudience != nil { + cg.Status.SubscriberAudience = c.Status.SubscriberAudience + } if c.Status.DeliveryStatus.DeadLetterSinkURI != nil { cg.Status.DeliveryStatus.DeadLetterSinkURI = c.Status.DeadLetterSinkURI } if c.Status.DeliveryStatus.DeadLetterSinkCACerts != nil { cg.Status.DeliveryStatus.DeadLetterSinkCACerts = c.Status.DeadLetterSinkCACerts } + if c.Status.DeliveryStatus.DeadLetterSinkAudience != nil { + cg.Status.DeliveryStatus.DeadLetterSinkAudience = c.Status.DeadLetterSinkAudience + } } else if condition == nil { // Propagate only a single false condition cond := c.GetConditionSet().Manage(c.GetStatus()).GetTopLevelCondition() if cond.IsFalse() { @@ -530,6 +536,7 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con } cg.Status.SubscriberURI = subscriber.URL cg.Status.SubscriberCACerts = subscriber.CACerts + cg.Status.SubscriberAudience = subscriber.Audience } return condition, nil diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 213ada2bca..a77d09b915 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -211,6 +211,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) }, } } + + if ks.Status.Address != nil && ks.Status.Address.Audience != nil { + sinkConfig.Ingress.Audience = *ks.Status.Address.Audience + } + statusConditionManager.ConfigResolved() sinkIndex := coreconfig.FindResource(ct, ks.UID) diff --git a/control-plane/pkg/reconciler/source/source.go b/control-plane/pkg/reconciler/source/source.go index 07d28fb71f..acd5387cdb 100644 --- a/control-plane/pkg/reconciler/source/source.go +++ b/control-plane/pkg/reconciler/source/source.go @@ -201,6 +201,10 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, ks *sources.Kafk expectedCg.Spec.Template.Spec.Configs.KeyType = &kt } + if ks.Status.Auth != nil { + expectedCg.Spec.Template.Spec.OIDCServiceAccountName = ks.Status.Auth.ServiceAccountName + } + // TODO: make keda annotation values configurable and maybe unexposed expectedCg.Annotations = keda.SetAutoscalingAnnotations(ks.Annotations) @@ -260,8 +264,9 @@ func propagateConsumerGroupStatus(cg *internalscg.ConsumerGroup, ks *sources.Kaf } } ks.Status.MarkSink(&duckv1.Addressable{ - URL: cg.Status.SubscriberURI, - CACerts: cg.Status.SubscriberCACerts, + URL: cg.Status.SubscriberURI, + CACerts: cg.Status.SubscriberCACerts, + Audience: cg.Status.SubscriberAudience, }) ks.Status.Placeable = cg.Status.Placeable if cg.Status.Replicas != nil { diff --git a/control-plane/pkg/reconciler/source/source_test.go b/control-plane/pkg/reconciler/source/source_test.go index badac46617..cfa162741e 100644 --- a/control-plane/pkg/reconciler/source/source_test.go +++ b/control-plane/pkg/reconciler/source/source_test.go @@ -1566,6 +1566,7 @@ func TestReconcileKind(t *testing.T) { ), ConsumerSubscriber(NewSourceSinkReference()), ConsumerReply(ConsumerNoReply()), + ConsumerOIDCServiceAccountName(makeKafkaSourceOIDCServiceAccount().Name), )), ConsumerGroupReplicas(1), ), diff --git a/control-plane/pkg/reconciler/testing/objects_consumer.go b/control-plane/pkg/reconciler/testing/objects_consumer.go index a4cebb4c3d..433295d315 100644 --- a/control-plane/pkg/reconciler/testing/objects_consumer.go +++ b/control-plane/pkg/reconciler/testing/objects_consumer.go @@ -200,6 +200,12 @@ func ConsumerTopics(topics ...string) ConsumerSpecOption { } } +func ConsumerOIDCServiceAccountName(sa string) ConsumerSpecOption { + return func(c *kafkainternals.ConsumerSpec) { + c.OIDCServiceAccountName = &sa + } +} + func ConsumerPlacement(pb kafkainternals.PodBind) ConsumerSpecOption { return func(c *kafkainternals.ConsumerSpec) { c.PodBind = &pb diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index fc32db6958..52eace88fa 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -339,6 +339,12 @@ func (r *Reconciler) reconcileTriggerEgress(ctx context.Context, broker *eventin if destination.CACerts != nil { egress.DestinationCACerts = *destination.CACerts } + if destination.Audience != nil { + egress.DestinationAudience = *destination.Audience + } + if trigger.Status.Auth != nil && trigger.Status.Auth.ServiceAccountName != nil { + egress.OidcServiceAccountName = *trigger.Status.Auth.ServiceAccountName + } newFiltersEnabled := func() bool { r.FlagsLock.RLock() diff --git a/control-plane/pkg/reconciler/trigger/trigger_test.go b/control-plane/pkg/reconciler/trigger/trigger_test.go index 95cb803ef1..d5ea8b66dd 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/trigger_test.go @@ -2182,10 +2182,11 @@ func triggerReconciliation(t *testing.T, format string, env config.Env, useNewFi Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, Egresses: []*contract.Egress{ { - Destination: ServiceURL, - ConsumerGroup: triggerConsumerGroup, - Uid: TriggerUUID, - Reference: TriggerReference(), + Destination: ServiceURL, + ConsumerGroup: triggerConsumerGroup, + Uid: TriggerUUID, + Reference: TriggerReference(), + OidcServiceAccountName: makeTriggerOIDCServiceAccount().Name, }, }, }, @@ -2262,10 +2263,11 @@ func triggerReconciliation(t *testing.T, format string, env config.Env, useNewFi Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, Egresses: []*contract.Egress{ { - Destination: ServiceURL, - ConsumerGroup: triggerConsumerGroup, - Uid: TriggerUUID, - Reference: TriggerReference(), + Destination: ServiceURL, + ConsumerGroup: triggerConsumerGroup, + Uid: TriggerUUID, + Reference: TriggerReference(), + OidcServiceAccountName: makeTriggerOIDCServiceAccount().Name, }, }, }, diff --git a/control-plane/pkg/reconciler/trigger/v2/triggerv2.go b/control-plane/pkg/reconciler/trigger/v2/triggerv2.go index a5aab80fd4..dc049ed94c 100644 --- a/control-plane/pkg/reconciler/trigger/v2/triggerv2.go +++ b/control-plane/pkg/reconciler/trigger/v2/triggerv2.go @@ -229,6 +229,10 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, broker *eventing } } + if trigger.Status.Auth != nil { + expectedCg.Spec.OIDCServiceAccountName = trigger.Status.Auth.ServiceAccountName + } + cg, err := r.ConsumerGroupLister.ConsumerGroups(trigger.GetNamespace()).Get(groupId) //Get by consumer group name if err != nil && !apierrors.IsNotFound(err) { return nil, err