Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose OIDC audience of KafkaChannel in its status #3622

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,28 @@ spec:
replyCACerts:
description: replyCACerts is the CA certs to trust for the reply.
type: string
replyAudience:
description: ReplyAudience is the OIDC audience for the replyUri.
type: string
subscriberUri:
description: SubscriberURI is the endpoint for the subscriber
type: string
subscriberCACerts:
description: SubscriberCACerts is the CA certs to trust for the subscriber.
type: string
subscriberAudience:
description: SubscriberAudience is the OIDC audience for the subscriberUri.
type: string
uid:
description: UID is used to understand the origin of the subscriber.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
status:
description: Status represents the current state of the KafkaChannel. This data may be out of date.
type: object
Expand Down Expand Up @@ -239,6 +252,9 @@ spec:
type: string
deadLetterSinkCACerts:
type: string
deadLetterSinkAudience:
description: OIDC audience of the dead letter sink.
type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
Expand All @@ -262,6 +278,13 @@ spec:
uid:
description: UID is used to understand the origin of the subscriber.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
additionalPrinterColumns:
- name: Ready
type: string
Expand Down
23 changes: 13 additions & 10 deletions control-plane/pkg/receiver/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,32 @@
}

// HTTPAddress returns the addressable
func HTTPAddress(host string, object metav1.Object) duckv1.Addressable {
func HTTPAddress(host string, audience *string, object metav1.Object) duckv1.Addressable {
httpAddress := duckv1.Addressable{
Name: pointer.String("http"),
URL: apis.HTTP(host),
Name: pointer.String("http"),
URL: apis.HTTP(host),
Audience: audience,
}
httpAddress.URL.Path = fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName())
return httpAddress
}

// HTTPAddress returns the addressable
func ChannelHTTPAddress(host string) duckv1.Addressable {
func ChannelHTTPAddress(host string, audience *string) duckv1.Addressable {

Check warning on line 49 in control-plane/pkg/receiver/address.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/receiver/address.go#L49

Added line #L49 was not covered by tests
httpAddress := duckv1.Addressable{
Name: pointer.String("http"),
URL: apis.HTTP(host),
Name: pointer.String("http"),
URL: apis.HTTP(host),
Audience: audience,

Check warning on line 53 in control-plane/pkg/receiver/address.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/receiver/address.go#L51-L53

Added lines #L51 - L53 were not covered by tests
}
return httpAddress
}

func HTTPSAddress(host string, object metav1.Object, caCerts *string) duckv1.Addressable {
func HTTPSAddress(host string, audience *string, object metav1.Object, caCerts *string) duckv1.Addressable {
httpsAddress := duckv1.Addressable{
Name: pointer.String("https"),
URL: apis.HTTPS(host),
CACerts: caCerts,
Name: pointer.String("https"),
URL: apis.HTTPS(host),
CACerts: caCerts,
Audience: audience,
}
httpsAddress.URL.Path = fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName())
return httpsAddress
Expand Down
8 changes: 6 additions & 2 deletions control-plane/pkg/receiver/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ func TestHTTPSAddress(t *testing.T) {
Name: "ks",
},
}
httpsAddress := HTTPSAddress(host, ks, pointer.String(string(eventingtlstesting.CA)))
aud := "my-audience"
httpsAddress := HTTPSAddress(host, &aud, ks, pointer.String(string(eventingtlstesting.CA)))

require.Equal(t, httpsAddress.URL.Host, host)
require.Equal(t, httpsAddress.Audience, &aud)
require.Equal(t, httpsAddress.URL.Scheme, "https")
require.Contains(t, httpsAddress.URL.Path, ks.GetNamespace())
require.Contains(t, httpsAddress.URL.Path, ks.GetName())
Expand All @@ -69,9 +71,11 @@ func TestHTTPAddress(t *testing.T) {
Name: "ks",
},
}
httpAddress := HTTPAddress(host, ks)
aud := "my-audience"
httpAddress := HTTPAddress(host, &aud, ks)

require.Equal(t, host, httpAddress.URL.Host)
require.Equal(t, httpAddress.Audience, &aud)
require.Equal(t, httpAddress.URL.Scheme, "http")
require.Contains(t, httpAddress.URL.Path, ks.GetNamespace())
require.Contains(t, httpAddress.URL.Path, ks.GetName())
Expand Down
10 changes: 5 additions & 5 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return err
}

httpAddress := receiver.HTTPAddress(ingressHost, broker)
httpsAddress := receiver.HTTPSAddress(ingressHost, broker, caCerts)
httpAddress := receiver.HTTPAddress(ingressHost, nil, broker)
httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress, httpsAddress}
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
Expand All @@ -257,11 +257,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return err
}

httpsAddress := receiver.HTTPSAddress(ingressHost, broker, caCerts)
httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts)
addressableStatus.Address = &httpsAddress
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
} else {
httpAddress := receiver.HTTPAddress(ingressHost, broker)
httpAddress := receiver.HTTPAddress(ingressHost, nil, broker)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}
Expand Down Expand Up @@ -360,7 +360,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker)
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.HTTPAddress(ingressHost, broker)
address := receiver.HTTPAddress(ingressHost, nil, broker)
proberAddressable := prober.ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Expand Down
30 changes: 21 additions & 9 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"strconv"
"time"

"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/auth"
"knative.dev/pkg/logging"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/apis/feature"
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/network"
Expand Down Expand Up @@ -313,26 +316,35 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
return err
}

featureFlags := feature.FromContext(ctx)
var audience *string
if featureFlags.IsOIDCAuthentication() {
audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta))
logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience))
} else {
logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled")
audience = nil
}

var addressableStatus duckv1.AddressStatus
channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace)
channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace)
transportEncryptionFlags := feature.FromContext(ctx)
if transportEncryptionFlags.IsPermissiveTransportEncryption() {
if featureFlags.IsPermissiveTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return err
}

httpAddress := receiver.ChannelHTTPAddress(channelHttpHost)
httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts)
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience)
httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts)
// Permissive mode:
// - status.address http address with path-based routing
// - status.addresses:
// - https address with path-based routing
// - http address with path-based routing
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress}
addressableStatus.Address = &httpAddress
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
} else if featureFlags.IsStrictTransportEncryption() {
// Strict mode: (only https addresses)
// - status.address https address with path-based routing
// - status.addresses:
Expand All @@ -342,11 +354,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
return err
}

httpsAddress := receiver.HTTPSAddress(channelHttpsHost, channel, caCerts)
httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts)
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
addressableStatus.Address = &httpsAddress
} else {
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost)
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}
Expand Down Expand Up @@ -433,7 +445,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.HTTPAddress(r.IngressHost, channel)
address := receiver.HTTPAddress(r.IngressHost, nil, channel)
proberAddressable := prober.ProberAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Expand Down
85 changes: 85 additions & 0 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,91 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
},
{
Name: "Reconciled normal - OIDC enabled",
Objects: []runtime.Object{
NewChannel(),
NewService(),
NewPerChannelService(DefaultEnv),
ChannelReceiverPod(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "0",
"annotation_to_preserve": "value_to_preserve",
}),
ChannelDispatcherPod(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "0",
"annotation_to_preserve": "value_to_preserve",
}),
NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{
kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers,
}),
},
Key: testKey,
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: ChannelUUID,
Topics: []string{ChannelTopic()},
BootstrapServers: ChannelBootstrapServers,
Reference: ChannelReference(),
Ingress: &contract.Ingress{
Path: receiver.Path(ChannelNamespace, ChannelName),
Host: receiver.Host(ChannelNamespace, ChannelName),
},
},
},
}),
ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
ChannelDispatcherPodUpdate(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
},
SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it
WantCreates: []runtime.Object{
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewChannel(
WithInitKafkaChannelConditions,
StatusConfigParsed,
StatusConfigMapUpdatedReady(&env),
WithChannelTopicStatusAnnotation(ChannelTopic()),
StatusTopicReadyWithName(ChannelTopic()),
StatusDataPlaneAvailable,
ChannelAddressable(&env),
StatusProbeSucceeded,
WithChannelAddresses([]duckv1.Addressable{
{
Name: pointer.String("http"),
URL: ChannelAddress(),
Audience: pointer.String(ChannelAudience),
},
}),
WithChannelAddress(duckv1.Addressable{
Name: pointer.String("http"),
URL: ChannelAddress(),
Audience: pointer.String(ChannelAudience),
}),
WithChannelAddessable(),
),
},
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantEvents: []string{
finalizerUpdatedEvent,
},
},
}

useTable(t, table, env)
Expand Down
9 changes: 7 additions & 2 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@

logger := logging.FromContext(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}

Check warning on line 94 in control-plane/pkg/reconciler/channel/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/channel/controller.go#L93-L94

Added lines #L93 - L94 were not covered by tests
})
featureStore.WatchConfigs(watcher)

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
Expand Down Expand Up @@ -136,7 +141,7 @@

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)

globalResync := func(_ interface{}) {
globalResync = func(_ interface{}) {
impl.GlobalResync(channelInformer.Informer())
}

Expand Down
Loading
Loading