Skip to content

Commit

Permalink
Provision contract with OIDC information (knative-extensions#3646)
Browse files Browse the repository at this point in the history
* Provision contract with OIDC information

* Add DLS audience in KafkaChannel CRD

* Update KafkaSource to expose its sinks audience in status

* Update Trigger test to include OIDC SA in contract

* Propagate KafkaSources OIDC serviceAccountName to consumer and consumergroup

* Propagate triggerv2s serviceAccountName to consumergroup

* Fix unit test
  • Loading branch information
creydr committed Feb 4, 2024
1 parent 2d5745e commit f78191c
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ spec:
type: string
CACerts:
type: string
audience:
description: Audience is the OIDC audience for the deadLetterSink.
type: string
retry:
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
type: integer
Expand Down Expand Up @@ -132,6 +135,9 @@ spec:
type: string
CACerts:
type: string
audience:
description: Audience is the OIDC audience for the deadLetterSink.
type: string
retry:
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ spec:
description: DeadLetterSink is the sink receiving event that could not be sent to a destination.
type: object
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
type: object
Expand Down Expand Up @@ -118,6 +115,12 @@ spec:
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
CACerts:
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
type: string
audience:
description: Audience is the OIDC audience for the deadLetterSink.
type: string
retry:
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
type: integer
Expand Down Expand Up @@ -271,9 +274,6 @@ spec:
description: Sink is a reference to an object that will resolve to a uri to use as the sink.
type: object
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
type: object
Expand Down Expand Up @@ -302,6 +302,12 @@ spec:
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
CACerts:
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
type: string
audience:
description: Audience is the OIDC audience for the sink.
type: string
topics:
description: Topic topics to consume messages from
type: array
Expand Down Expand Up @@ -392,6 +398,16 @@ spec:
sinkUri:
description: SinkURI is the current active sink URI that has been configured for the Source.
type: string
sinkAudience:
description: SinkAudience is the OIDC audience of the sink.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
subresources:
status: {}
scale:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type ConsumerGroupSpec struct {
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
// +optional
Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"`

// OIDCServiceAccountName is the name of service account used for this components
// OIDC authentication.
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`
}

type ConsumerGroupStatus struct {
Expand All @@ -124,6 +128,10 @@ type ConsumerGroupStatus struct {
// +optional
SubscriberCACerts *string `json:"subscriberCACerts,omitempty"`

// SubscriberAudience is the OIDC audience for the resolved URI
// +optional
SubscriberAudience *string `json:"subscriberAudience,omitempty"`

// DeliveryStatus contains a resolved URL to the dead letter sink address, and any other
// resolved delivery options.
eventingduckv1.DeliveryStatus `json:",inline"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ type ConsumerSpec struct {

// PodBind represents a reference to the pod in which the consumer should be placed.
PodBind *PodBind `json:"podBind"`

// OIDCServiceAccountName is the name of the generated service account
// used for this components OIDC authentication.
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`
}

type ReplyStrategy struct {
Expand Down Expand Up @@ -208,6 +212,10 @@ type ConsumerStatus struct {
// +optional
SubscriberCACerts *string `json:"subscriberCACerts,omitempty"`

// SubscriberAudience is the OIDC audience for the resolved URI
// +optional
SubscriberAudience *string `json:"subscriberAudience,omitempty"`

// DeliveryStatus contains a resolved URL to the dead letter sink address, and any other
// resolved delivery options.
eventingduck.DeliveryStatus `json:",inline"`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (s *KafkaSourceStatus) MarkSink(addr *duckv1.Addressable) {
if addr.URL != nil && !addr.URL.IsEmpty() {
s.SinkURI = addr.URL
s.SinkCACerts = addr.CACerts
s.SinkAudience = addr.Audience
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionSinkProvided)
} else {
KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
Expand Down
3 changes: 3 additions & 0 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func EgressConfigFromDelivery(
if deadLetterSinkAddr.CACerts != nil {
egressConfig.DeadLetterCACerts = *deadLetterSinkAddr.CACerts
}
if deadLetterSinkAddr.Audience != nil {
egressConfig.DeadLetterAudience = *deadLetterSinkAddr.Audience
}
}

if delivery.Retry != nil {
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
}
}

if broker.Status.Address != nil && broker.Status.Address.Audience != nil {
resource.Ingress.Audience = *broker.Status.Address.Audience
}

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.DefaultBackoffDelayMs)
if err != nil {
return nil, err
Expand Down
13 changes: 13 additions & 0 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,12 @@ func (r *Reconciler) getSubscriberConfig(ctx context.Context, channel *messaging
if subscriber.SubscriberCACerts != nil && *subscriber.SubscriberCACerts != "" {
egress.DestinationCACerts = *subscriber.SubscriberCACerts
}
if subscriber.SubscriberAudience != nil && *subscriber.SubscriberAudience != "" {
egress.DestinationAudience = *subscriber.SubscriberAudience
}
if subscriber.Auth != nil && subscriber.Auth.ServiceAccountName != nil {
egress.OidcServiceAccountName = *subscriber.Auth.ServiceAccountName
}

if subscriptionName != "" {
egress.Reference = &contract.Reference{
Expand All @@ -634,6 +640,9 @@ func (r *Reconciler) getSubscriberConfig(ctx context.Context, channel *messaging
if subscriber.ReplyCACerts != nil && *subscriber.ReplyCACerts != "" {
egress.ReplyUrlCACerts = *subscriber.ReplyCACerts
}
if subscriber.ReplyAudience != nil && *subscriber.ReplyAudience != "" {
egress.ReplyUrlAudience = *subscriber.ReplyAudience
}
}

subscriptionEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, subscriber.Delivery, r.DefaultBackoffDelayMs)
Expand Down Expand Up @@ -713,6 +722,10 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
}
}

if channel.Status.Address != nil && channel.Status.Address.Audience != nil {
resource.Ingress.Audience = *channel.Status.Address.Audience
}

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,10 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
}
}

if channel.Status.Address != nil && channel.Status.Address.Audience != nil {
resource.Ingress.Audience = *channel.Status.Address.Audience
}

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs)
if err != nil {
return nil, err
Expand Down
14 changes: 14 additions & 0 deletions control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern
}
c.Status.SubscriberURI = destinationAddr.URL
c.Status.SubscriberCACerts = destinationAddr.CACerts
c.Status.SubscriberAudience = destinationAddr.Audience

egressConfig := &contract.EgressConfig{}
if c.Spec.Delivery != nil {
Expand All @@ -158,6 +159,9 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern
if egressConfig.DeadLetterCACerts != "" {
c.Status.DeliveryStatus.DeadLetterSinkCACerts = pointer.String(egressConfig.DeadLetterCACerts)
}
if egressConfig.DeadLetterAudience != "" {
c.Status.DeliveryStatus.DeadLetterSinkAudience = pointer.String(egressConfig.DeadLetterAudience)
}
}

egress := &contract.Egress{
Expand All @@ -180,11 +184,18 @@ func (r *Reconciler) reconcileContractEgress(ctx context.Context, c *kafkaintern
if destinationAddr.CACerts != nil {
egress.DestinationCACerts = *destinationAddr.CACerts
}
if destinationAddr.Audience != nil {
egress.DestinationAudience = *destinationAddr.Audience
}

if c.Spec.Configs.KeyType != nil {
egress.KeyType = coreconfig.KeyTypeFromString(*c.Spec.Configs.KeyType)
}

if c.Spec.OIDCServiceAccountName != nil {
egress.OidcServiceAccountName = *c.Spec.OIDCServiceAccountName
}

if err := r.reconcileReplyStrategy(ctx, c, egress); err != nil {
return nil, fmt.Errorf("failed to reconcile reply strategy: %w", err)
}
Expand Down Expand Up @@ -294,6 +305,9 @@ func (r *Reconciler) reconcileReplyStrategy(ctx context.Context, c *kafkainterna
if destination.CACerts != nil {
egress.ReplyUrlCACerts = *destination.CACerts
}
if destination.Audience != nil {
egress.ReplyUrlAudience = *destination.Audience
}
return nil
}
if c.Spec.Reply.TopicReply != nil && c.Spec.Reply.TopicReply.Enabled {
Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,12 +500,18 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con
if c.Status.SubscriberCACerts != nil {
cg.Status.SubscriberCACerts = c.Status.SubscriberCACerts
}
if c.Status.SubscriberAudience != nil {
cg.Status.SubscriberAudience = c.Status.SubscriberAudience
}
if c.Status.DeliveryStatus.DeadLetterSinkURI != nil {
cg.Status.DeliveryStatus.DeadLetterSinkURI = c.Status.DeadLetterSinkURI
}
if c.Status.DeliveryStatus.DeadLetterSinkCACerts != nil {
cg.Status.DeliveryStatus.DeadLetterSinkCACerts = c.Status.DeadLetterSinkCACerts
}
if c.Status.DeliveryStatus.DeadLetterSinkAudience != nil {
cg.Status.DeliveryStatus.DeadLetterSinkAudience = c.Status.DeadLetterSinkAudience
}
} else if condition == nil { // Propagate only a single false condition
cond := c.GetConditionSet().Manage(c.GetStatus()).GetTopLevelCondition()
if cond.IsFalse() {
Expand All @@ -524,6 +530,7 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con
}
cg.Status.SubscriberURI = subscriber.URL
cg.Status.SubscriberCACerts = subscriber.CACerts
cg.Status.SubscriberAudience = subscriber.Audience
}

return condition, nil
Expand Down
5 changes: 5 additions & 0 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
},
}
}

if ks.Status.Address != nil && ks.Status.Address.Audience != nil {
sinkConfig.Ingress.Audience = *ks.Status.Address.Audience
}

statusConditionManager.ConfigResolved()

sinkIndex := coreconfig.FindResource(ct, ks.UID)
Expand Down
9 changes: 7 additions & 2 deletions control-plane/pkg/reconciler/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, ks *sources.Kafk
expectedCg.Spec.Template.Spec.Configs.KeyType = &kt
}

if ks.Status.Auth != nil {
expectedCg.Spec.Template.Spec.OIDCServiceAccountName = ks.Status.Auth.ServiceAccountName
}

// TODO: make keda annotation values configurable and maybe unexposed
expectedCg.Annotations = keda.SetAutoscalingAnnotations(ks.Annotations)

Expand Down Expand Up @@ -260,8 +264,9 @@ func propagateConsumerGroupStatus(cg *internalscg.ConsumerGroup, ks *sources.Kaf
}
}
ks.Status.MarkSink(&duckv1.Addressable{
URL: cg.Status.SubscriberURI,
CACerts: cg.Status.SubscriberCACerts,
URL: cg.Status.SubscriberURI,
CACerts: cg.Status.SubscriberCACerts,
Audience: cg.Status.SubscriberAudience,
})
ks.Status.Placeable = cg.Status.Placeable
if cg.Status.Replicas != nil {
Expand Down
1 change: 1 addition & 0 deletions control-plane/pkg/reconciler/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,7 @@ func TestReconcileKind(t *testing.T) {
),
ConsumerSubscriber(NewSourceSinkReference()),
ConsumerReply(ConsumerNoReply()),
ConsumerOIDCServiceAccountName(makeKafkaSourceOIDCServiceAccount().Name),
)),
ConsumerGroupReplicas(1),
),
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func ConsumerTopics(topics ...string) ConsumerSpecOption {
}
}

func ConsumerOIDCServiceAccountName(sa string) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.OIDCServiceAccountName = &sa
}
}

func ConsumerPlacement(pb kafkainternals.PodBind) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.PodBind = &pb
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ func (r *Reconciler) reconcileTriggerEgress(ctx context.Context, broker *eventin
if destination.CACerts != nil {
egress.DestinationCACerts = *destination.CACerts
}
if destination.Audience != nil {
egress.DestinationAudience = *destination.Audience
}
if trigger.Status.Auth != nil && trigger.Status.Auth.ServiceAccountName != nil {
egress.OidcServiceAccountName = *trigger.Status.Auth.ServiceAccountName
}

newFiltersEnabled := func() bool {
r.FlagsLock.RLock()
Expand Down
Loading

0 comments on commit f78191c

Please sign in to comment.