Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change backingchannel to return a combinedchannelable #3088

Merged
merged 12 commits into from
May 8, 2020
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
k8s.io/utils v0.0.0-20200124190032-861946025e34
knative.dev/pkg v0.0.0-20200507011344-2581370e4a37
knative.dev/test-infra v0.0.0-20200506231144-c8dd15bb7f0b

sigs.k8s.io/yaml v1.2.0
)

Expand Down
42 changes: 38 additions & 4 deletions pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/messaging"
"knative.dev/eventing/pkg/apis/messaging/v1beta1"
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/channel"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1beta1"
Expand Down Expand Up @@ -90,13 +92,45 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1beta1.Channel) pkgr
}

c.Status.Channel = &backingChannelObjRef
c.Status.PropagateStatuses(&backingChannel.Status)
bCS := r.getChannelableStatus(ctx, &backingChannel.Status, backingChannel.Annotations)
c.Status.PropagateStatuses(bCS)

return newReconciledNormal(c.Namespace, c.Name)
}

func (r *Reconciler) getChannelableStatus(ctx context.Context, bc *duckv1alpha1.ChannelableCombinedStatus, cAnnotations map[string]string) *duckv1beta1.ChannelableStatus {

channelableStatus := &duckv1beta1.ChannelableStatus{}
if bc.AddressStatus.Address != nil {
channelableStatus.AddressStatus.Address = &duckv1.Addressable{}
bc.AddressStatus.Address.ConvertTo(ctx, channelableStatus.AddressStatus.Address)
}
channelableStatus.Status = bc.Status
if cAnnotations != nil {
if cAnnotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" {
if len(bc.SubscribableStatus.Subscribers) > 0 {
channelableStatus.SubscribableStatus.Subscribers = bc.SubscribableStatus.Subscribers
}
} else { //v1alpha1
if bc.SubscribableTypeStatus.SubscribableStatus != nil &&
len(bc.SubscribableTypeStatus.SubscribableStatus.Subscribers) > 0 {
channelableStatus.SubscribableStatus.Subscribers = make([]duckv1beta1.SubscriberStatus, len(bc.SubscribableTypeStatus.SubscribableStatus.Subscribers))
for i, ss := range bc.SubscribableTypeStatus.SubscribableStatus.Subscribers {
channelableStatus.SubscribableStatus.Subscribers[i] = duckv1beta1.SubscriberStatus{
UID: ss.UID,
ObservedGeneration: ss.ObservedGeneration,
Ready: ss.Ready,
Message: ss.Message,
}
}
}
}
}
return channelableStatus
}

// reconcileBackingChannel reconciles Channel's 'c' underlying CRD channel.
func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, c *v1beta1.Channel, backingChannelObjRef duckv1.KReference) (*duckv1beta1.Channelable, error) {
func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, c *v1beta1.Channel, backingChannelObjRef duckv1.KReference) (*duckv1alpha1.ChannelableCombined, error) {
lister, err := r.channelableTracker.ListerForKReference(backingChannelObjRef)
lberk marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logging.FromContext(ctx).Error("Error getting lister for Channel", zap.Any("backingChannel", backingChannelObjRef), zap.Error(err))
Expand All @@ -118,7 +152,7 @@ func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourc
return nil, err
}
logging.FromContext(ctx).Debug("Created backing Channel", zap.Any("backingChannel", newBackingChannel))
channelable := &duckv1beta1.Channelable{}
channelable := &duckv1alpha1.ChannelableCombined{}
err = duckapis.FromUnstructured(created, channelable)
if err != nil {
logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("backingChannel", backingChannelObjRef), zap.Any("createdChannel", created), zap.Error(err))
Expand All @@ -131,7 +165,7 @@ func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourc
return nil, err
}
logging.FromContext(ctx).Debug("Found backing Channel", zap.Any("backingChannel", backingChannelObjRef))
channelable, ok := backingChannel.(*duckv1beta1.Channelable)
channelable, ok := backingChannel.(*duckv1alpha1.ChannelableCombined)
lberk marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("backingChannel", backingChannel), zap.Error(err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
channelinformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel"
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/channel"
"knative.dev/eventing/pkg/duck"
Expand All @@ -44,7 +44,7 @@ func NewController(
}
impl := channelreconciler.NewImpl(ctx, r)

r.channelableTracker = duck.NewListableTracker(ctx, channelable.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx))
r.channelableTracker = duck.NewListableTracker(ctx, channelablecombined.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx))

logging.FromContext(ctx).Info("Setting up event handlers")

Expand Down