Skip to content

Commit

Permalink
Expose OIDC audience of KafkaChannel in its status (#3622)
Browse files Browse the repository at this point in the history
* Provision .status.address.audience and .status.addresses[*].audience in KafkaChannel

* Add kafka Channel e2e test to check if audience is provisioned

* Run goimport

* Update deps
  • Loading branch information
creydr authored Jan 31, 2024
1 parent 02a879b commit e558308
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,28 @@ spec:
replyCACerts:
description: replyCACerts is the CA certs to trust for the reply.
type: string
replyAudience:
description: ReplyAudience is the OIDC audience for the replyUri.
type: string
subscriberUri:
description: SubscriberURI is the endpoint for the subscriber
type: string
subscriberCACerts:
description: SubscriberCACerts is the CA certs to trust for the subscriber.
type: string
subscriberAudience:
description: SubscriberAudience is the OIDC audience for the subscriberUri.
type: string
uid:
description: UID is used to understand the origin of the subscriber.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
status:
description: Status represents the current state of the KafkaChannel. This data may be out of date.
type: object
Expand Down Expand Up @@ -239,6 +252,9 @@ spec:
type: string
deadLetterSinkCACerts:
type: string
deadLetterSinkAudience:
description: OIDC audience of the dead letter sink.
type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
Expand All @@ -262,6 +278,13 @@ spec:
uid:
description: UID is used to understand the origin of the subscriber.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
additionalPrinterColumns:
- name: Ready
type: string
Expand Down
23 changes: 13 additions & 10 deletions control-plane/pkg/receiver/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,32 @@ func Address(host string, object metav1.Object) *url.URL {
}

// HTTPAddress returns the addressable
func HTTPAddress(host string, object metav1.Object) duckv1.Addressable {
func HTTPAddress(host string, audience *string, object metav1.Object) duckv1.Addressable {
httpAddress := duckv1.Addressable{
Name: pointer.String("http"),
URL: apis.HTTP(host),
Name: pointer.String("http"),
URL: apis.HTTP(host),
Audience: audience,
}
httpAddress.URL.Path = fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName())
return httpAddress
}

// HTTPAddress returns the addressable
func ChannelHTTPAddress(host string) duckv1.Addressable {
func ChannelHTTPAddress(host string, audience *string) duckv1.Addressable {
httpAddress := duckv1.Addressable{
Name: pointer.String("http"),
URL: apis.HTTP(host),
Name: pointer.String("http"),
URL: apis.HTTP(host),
Audience: audience,
}
return httpAddress
}

func HTTPSAddress(host string, object metav1.Object, caCerts *string) duckv1.Addressable {
func HTTPSAddress(host string, audience *string, object metav1.Object, caCerts *string) duckv1.Addressable {
httpsAddress := duckv1.Addressable{
Name: pointer.String("https"),
URL: apis.HTTPS(host),
CACerts: caCerts,
Name: pointer.String("https"),
URL: apis.HTTPS(host),
CACerts: caCerts,
Audience: audience,
}
httpsAddress.URL.Path = fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName())
return httpsAddress
Expand Down
8 changes: 6 additions & 2 deletions control-plane/pkg/receiver/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ func TestHTTPSAddress(t *testing.T) {
Name: "ks",
},
}
httpsAddress := HTTPSAddress(host, ks, pointer.String(string(eventingtlstesting.CA)))
aud := "my-audience"
httpsAddress := HTTPSAddress(host, &aud, ks, pointer.String(string(eventingtlstesting.CA)))

require.Equal(t, httpsAddress.URL.Host, host)
require.Equal(t, httpsAddress.Audience, &aud)
require.Equal(t, httpsAddress.URL.Scheme, "https")
require.Contains(t, httpsAddress.URL.Path, ks.GetNamespace())
require.Contains(t, httpsAddress.URL.Path, ks.GetName())
Expand All @@ -69,9 +71,11 @@ func TestHTTPAddress(t *testing.T) {
Name: "ks",
},
}
httpAddress := HTTPAddress(host, ks)
aud := "my-audience"
httpAddress := HTTPAddress(host, &aud, ks)

require.Equal(t, host, httpAddress.URL.Host)
require.Equal(t, httpAddress.Audience, &aud)
require.Equal(t, httpAddress.URL.Scheme, "http")
require.Contains(t, httpAddress.URL.Path, ks.GetNamespace())
require.Contains(t, httpAddress.URL.Path, ks.GetName())
Expand Down
10 changes: 5 additions & 5 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return err
}

httpAddress := receiver.HTTPAddress(ingressHost, broker)
httpsAddress := receiver.HTTPSAddress(ingressHost, broker, caCerts)
httpAddress := receiver.HTTPAddress(ingressHost, nil, broker)
httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress, httpsAddress}
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
Expand All @@ -257,11 +257,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return err
}

httpsAddress := receiver.HTTPSAddress(ingressHost, broker, caCerts)
httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts)
addressableStatus.Address = &httpsAddress
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
} else {
httpAddress := receiver.HTTPAddress(ingressHost, broker)
httpAddress := receiver.HTTPAddress(ingressHost, nil, broker)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}
Expand Down Expand Up @@ -360,7 +360,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker)
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.HTTPAddress(ingressHost, broker)
address := receiver.HTTPAddress(ingressHost, nil, broker)
proberAddressable := prober.ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Expand Down
30 changes: 21 additions & 9 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"strconv"
"time"

"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/auth"
"knative.dev/pkg/logging"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/apis/feature"
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/network"
Expand Down Expand Up @@ -313,26 +316,35 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
return err
}

featureFlags := feature.FromContext(ctx)
var audience *string
if featureFlags.IsOIDCAuthentication() {
audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta))
logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience))
} else {
logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled")
audience = nil
}

var addressableStatus duckv1.AddressStatus
channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace)
channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace)
transportEncryptionFlags := feature.FromContext(ctx)
if transportEncryptionFlags.IsPermissiveTransportEncryption() {
if featureFlags.IsPermissiveTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return err
}

httpAddress := receiver.ChannelHTTPAddress(channelHttpHost)
httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts)
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience)
httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts)
// Permissive mode:
// - status.address http address with path-based routing
// - status.addresses:
// - https address with path-based routing
// - http address with path-based routing
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress}
addressableStatus.Address = &httpAddress
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
} else if featureFlags.IsStrictTransportEncryption() {
// Strict mode: (only https addresses)
// - status.address https address with path-based routing
// - status.addresses:
Expand All @@ -342,11 +354,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
return err
}

httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts)
httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts)
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
addressableStatus.Address = &httpsAddress
} else {
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost)
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}
Expand Down Expand Up @@ -433,7 +445,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.HTTPAddress(r.IngressHost, channel)
address := receiver.HTTPAddress(r.IngressHost, nil, channel)
proberAddressable := prober.ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Expand Down
85 changes: 85 additions & 0 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,91 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
},
{
Name: "Reconciled normal - OIDC enabled",
Objects: []runtime.Object{
NewChannel(),
NewService(),
NewPerChannelService(DefaultEnv),
ChannelReceiverPod(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "0",
"annotation_to_preserve": "value_to_preserve",
}),
ChannelDispatcherPod(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "0",
"annotation_to_preserve": "value_to_preserve",
}),
NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{
kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers,
}),
},
Key: testKey,
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: ChannelUUID,
Topics: []string{ChannelTopic()},
BootstrapServers: ChannelBootstrapServers,
Reference: ChannelReference(),
Ingress: &contract.Ingress{
Path: receiver.Path(ChannelNamespace, ChannelName),
Host: receiver.Host(ChannelNamespace, ChannelName),
},
},
},
}),
ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
ChannelDispatcherPodUpdate(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
},
SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it
WantCreates: []runtime.Object{
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewChannel(
WithInitKafkaChannelConditions,
StatusConfigParsed,
StatusConfigMapUpdatedReady(&env),
WithChannelTopicStatusAnnotation(ChannelTopic()),
StatusTopicReadyWithName(ChannelTopic()),
StatusDataPlaneAvailable,
ChannelAddressable(&env),
StatusProbeSucceeded,
WithChannelAddresses([]duckv1.Addressable{
{
Name: pointer.String("http"),
URL: ChannelAddress(),
Audience: pointer.String(ChannelAudience),
},
}),
WithChannelAddress(duckv1.Addressable{
Name: pointer.String("http"),
URL: ChannelAddress(),
Audience: pointer.String(ChannelAudience),
}),
WithChannelAddessable(),
),
},
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantEvents: []string{
finalizerUpdatedEvent,
},
},
}

useTable(t, table, env)
Expand Down
9 changes: 7 additions & 2 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

logger := logging.FromContext(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(watcher)

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
Expand Down Expand Up @@ -136,7 +141,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)

globalResync := func(_ interface{}) {
globalResync = func(_ interface{}) {
impl.GlobalResync(channelInformer.Informer())
}

Expand Down
Loading

0 comments on commit e558308

Please sign in to comment.