Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconcile eventing with v1 resources, generate v1 dependent resources #3587

Merged
merged 1 commit into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/apis/duck/v1/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2020 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: "duck.knative.dev", Version: "v1"}

// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Channelable{},
&ChannelableList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
51 changes: 51 additions & 0 deletions pkg/apis/duck/v1/register_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1

import (
"testing"

"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// Kind takes an unqualified resource and returns a Group qualified GroupKind
func TestKind(t *testing.T) {
want := schema.GroupKind{
Group: "duck.knative.dev",
Kind: "kind",
}

got := Kind("kind")

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected resource (-want, +got) = %v", diff)
}
}

// TestKnownTypes makes sure that expected types get added.
func TestKnownTypes(t *testing.T) {
scheme := runtime.NewScheme()
addKnownTypes(scheme)
types := scheme.KnownTypes(SchemeGroupVersion)

for _, name := range []string{
"Channelable",
"ChannelableList",
} {
if _, ok := types[name]; !ok {
t.Errorf("Did not find %q as registered type", name)
}
}
}
41 changes: 20 additions & 21 deletions pkg/reconciler/mtbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"

duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/eventing/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1beta1/broker"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1beta1"
eventingv1beta1listers "knative.dev/eventing/pkg/client/listers/eventing/v1beta1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1beta1"
brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/mtbroker/resources"
"knative.dev/eventing/pkg/reconciler/names"
Expand All @@ -68,7 +67,7 @@ type Reconciler struct {
brokerLister eventinglisters.BrokerLister
endpointsLister corev1listers.EndpointsLister
subscriptionLister messaginglisters.SubscriptionLister
triggerLister eventingv1beta1listers.TriggerLister
triggerLister eventinglisters.TriggerLister

channelableTracker duck.ListableTracker

Expand All @@ -88,7 +87,7 @@ type Reconciler struct {
var _ brokerreconciler.Interface = (*Reconciler)(nil)
var _ brokerreconciler.Finalizer = (*Reconciler)(nil)

var brokerGVK = v1beta1.SchemeGroupVersion.WithKind("Broker")
var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker")

// ReconcilerArgs are the arguments needed to create a broker.Reconciler.
type ReconcilerArgs struct {
Expand All @@ -102,7 +101,7 @@ func newReconciledNormal(namespace, name string) pkgreconciler.Event {
return pkgreconciler.NewEvent(corev1.EventTypeNormal, brokerReconciled, "Broker reconciled: \"%s/%s\"", namespace, name)
}

func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1beta1.Broker) pkgreconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event {
triggerChan, err := r.reconcileKind(ctx, b)
if err != nil {
logging.FromContext(ctx).Errorw("Problem reconciling broker", zap.Error(err))
Expand All @@ -125,7 +124,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *v1beta1.Broker) pkgre
return err
}

func (r *Reconciler) reconcileKind(ctx context.Context, b *v1beta1.Broker) (*corev1.ObjectReference, pkgreconciler.Event) {
func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (*corev1.ObjectReference, pkgreconciler.Event) {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Broker", b))

// 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel.
Expand Down Expand Up @@ -163,7 +162,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1beta1.Broker) (*cor
return &chanMan.ref, nil
}

channelStatus := &duckv1beta1.ChannelableStatus{AddressStatus: pkgduckv1.AddressStatus{Address: &pkgduckv1.Addressable{URL: triggerChan.Status.Address.URL}}}
channelStatus := &duckv1.ChannelableStatus{AddressStatus: pkgduckv1.AddressStatus{Address: &pkgduckv1.Addressable{URL: triggerChan.Status.Address.URL}}}
b.Status.PropagateTriggerChannelReadiness(channelStatus)

filterEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(names.BrokerFilterName)
Expand Down Expand Up @@ -198,16 +197,16 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *v1beta1.Broker) (*cor
type channelTemplate struct {
ref corev1.ObjectReference
inf dynamic.ResourceInterface
template messagingv1beta1.ChannelTemplateSpec
template messagingv1.ChannelTemplateSpec
}

func (r *Reconciler) getChannelTemplate(ctx context.Context, b *v1beta1.Broker) (*channelTemplate, error) {
func (r *Reconciler) getChannelTemplate(ctx context.Context, b *eventingv1.Broker) (*channelTemplate, error) {
triggerChannelName := resources.BrokerChannelName(b.Name, "trigger")
ref := corev1.ObjectReference{
Name: triggerChannelName,
Namespace: b.Namespace,
}
var template *messagingv1beta1.ChannelTemplateSpec
var template *messagingv1.ChannelTemplateSpec

if b.Spec.Config != nil {
if b.Spec.Config.Kind == "ConfigMap" && b.Spec.Config.APIVersion == "v1" {
Expand Down Expand Up @@ -259,15 +258,15 @@ func (r *Reconciler) getChannelTemplate(ctx context.Context, b *v1beta1.Broker)
}, nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, b *v1beta1.Broker) pkgreconciler.Event {
func (r *Reconciler) FinalizeKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event {
if err := r.propagateBrokerStatusToTriggers(ctx, b.Namespace, b.Name, nil); err != nil {
return fmt.Errorf("Trigger reconcile failed: %v", err)
}
return newReconciledNormal(b.Namespace, b.Name)
}

// reconcileChannel reconciles Broker's 'b' underlying channel.
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *v1beta1.Broker) (*duckv1beta1.Channelable, error) {
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *eventingv1.Broker) (*duckv1.Channelable, error) {
lister, err := r.channelableTracker.ListerFor(channelObjRef)
if err != nil {
logging.FromContext(ctx).Errorw(fmt.Sprintf("Error getting lister for Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Error(err))
Expand All @@ -284,7 +283,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, err
}
logging.FromContext(ctx).Info(fmt.Sprintf("Created Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Any("NewChannel", newChannel))
channelable := &duckv1beta1.Channelable{}
channelable := &duckv1.Channelable{}
err = duckapis.FromUnstructured(created, channelable)
if err != nil {
logging.FromContext(ctx).Errorw(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Any("createdChannel", created), zap.Error(err))
Expand All @@ -297,7 +296,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, err
}
logging.FromContext(ctx).Debugw(fmt.Sprintf("Found Channel: %s/%s", channelObjRef.Namespace, channelObjRef.Name))
channelable, ok := c.(*duckv1beta1.Channelable)
channelable, ok := c.(*duckv1.Channelable)
if !ok {
logging.FromContext(ctx).Errorw(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", channelObjRef.Namespace, channelObjRef.Name), zap.Error(err))
return nil, err
Expand All @@ -315,7 +314,7 @@ func TriggerChannelLabels(brokerName string) map[string]string {
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1beta1.Broker, triggerChan *corev1.ObjectReference) error {
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *eventingv1.Broker, triggerChan *corev1.ObjectReference) error {
triggers, err := r.triggerLister.Triggers(b.Namespace).List(labels.Everything())
if err != nil {
return err
Expand All @@ -340,7 +339,7 @@ func (r *Reconciler) reconcileTriggers(ctx context.Context, b *v1beta1.Broker, t
return nil
}

func (r *Reconciler) propagateBrokerStatusToTriggers(ctx context.Context, namespace, name string, bs *v1beta1.BrokerStatus) error {
func (r *Reconciler) propagateBrokerStatusToTriggers(ctx context.Context, namespace, name string, bs *eventingv1.BrokerStatus) error {
triggers, err := r.triggerLister.Triggers(namespace).List(labels.Everything())
if err != nil {
return err
Expand Down
Loading