Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable broker chaos, add debug logging for tests for MT Broker. #3599

Merged
merged 10 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/mtchannel_broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/reconciler/mtbroker"
mttrigger "knative.dev/eventing/pkg/reconciler/mtbroker/trigger"
)

func main() {
Expand All @@ -37,5 +38,5 @@ func main() {
cfg := sharedmain.ParseAndGetConfigOrDie()
cfg.QPS = float32(*clientQPS)
cfg.Burst = *clientBurst
sharedmain.MainWithConfig(signals.NewContext(), "mt-broker-controller", cfg, mtbroker.NewController)
sharedmain.MainWithConfig(signals.NewContext(), "mt-broker-controller", cfg, mtbroker.NewController, mttrigger.NewController)
}
15 changes: 15 additions & 0 deletions pkg/apis/eventing/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ const (
// BrokerChannelAddressStatusAnnotationKey is the broker status
// annotation key used to specify the address of its channel.
BrokerChannelAddressStatusAnnotationKey = "knative.dev/channelAddress"

// BrokerChannelAPIVersionStatusAnnotationKey is the broker status
// annotation key used to specify the APIVersion of the channel for
// the triggers to subscribe to.
BrokerChannelAPIVersionStatusAnnotationKey = "knative.dev/channelAPIVersion"

// BrokerChannelKindStatusAnnotationKey is the broker status
// annotation key used to specify the Kind of the channel for
// the triggers to subscribe to.
BrokerChannelKindStatusAnnotationKey = "knative.dev/channelKind"

// BrokerChannelNameStatusAnnotationKey is the broker status
// annotation key used to specify the name of the channel for
// the triggers to subscribe to.
BrokerChannelNameStatusAnnotationKey = "knative.dev/channelName"
)

var (
Expand Down
126 changes: 11 additions & 115 deletions pkg/reconciler/mtbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ import (
"context"
"errors"
"fmt"
"reflect"
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic"
corev1listers "k8s.io/client-go/listers/core/v1"

Expand All @@ -47,7 +44,6 @@ import (
"knative.dev/pkg/apis"
duckapis "knative.dev/pkg/apis/duck"
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
Expand Down Expand Up @@ -79,7 +75,6 @@ type Reconciler struct {

// Check that our Reconciler implements Interface
var _ brokerreconciler.Interface = (*Reconciler)(nil)
var _ brokerreconciler.Finalizer = (*Reconciler)(nil)

var brokerGVK = eventingv1.SchemeGroupVersion.WithKind("Broker")

Expand All @@ -92,71 +87,51 @@ type ReconcilerArgs struct {
}

func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event {
triggerChan, err := r.reconcileKind(ctx, b)
if err != nil {
logging.FromContext(ctx).Errorw("Problem reconciling broker", zap.Error(err))
}

if b.Status.IsReady() {
// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon, so reconcile them.
te := r.reconcileTriggers(ctx, b, triggerChan)
if te != nil {
logging.FromContext(ctx).Errorw("Problem reconciling triggers", zap.Error(te))
return fmt.Errorf("failed to reconcile triggers: %v", te)
}
} else {
// Broker is not ready, but propagate it's status to my triggers.
if te := r.propagateBrokerStatusToTriggers(ctx, b.Namespace, b.Name, &b.Status); te != nil {
return fmt.Errorf("Trigger reconcile failed: %v", te)
}
}
return err
}

func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (*corev1.ObjectReference, pkgreconciler.Event) {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Broker", b))

// 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel.
// 2. Check that Filter / Ingress deployment (shared within cluster are there)
chanMan, err := r.getChannelTemplate(ctx, b)
if err != nil {
b.Status.MarkTriggerChannelFailed("ChannelTemplateFailed", "Error on setting up the ChannelTemplate: %s", err)
return nil, err
return err
}

logging.FromContext(ctx).Infow("Reconciling the trigger channel")
c, err := resources.NewChannel("trigger", b, &chanMan.template, TriggerChannelLabels(b.Name))
if err != nil {
logging.FromContext(ctx).Errorw(fmt.Sprintf("Failed to create Trigger Channel object: %s/%s", chanMan.ref.Namespace, chanMan.ref.Name), zap.Error(err))
return nil, err
return err
}

triggerChan, err := r.reconcileChannel(ctx, chanMan.inf, chanMan.ref, c, b)
if err != nil {
logging.FromContext(ctx).Errorw("Problem reconciling the trigger channel", zap.Error(err))
b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err)
return nil, fmt.Errorf("Failed to reconcile trigger channel: %v", err)
return fmt.Errorf("Failed to reconcile trigger channel: %v", err)
}

if triggerChan.Status.Address == nil {
logging.FromContext(ctx).Debugw("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
// Ok to return nil for error here, once channel address becomes available, this will get requeued.
return &chanMan.ref, nil
return nil
}
if url := triggerChan.Status.Address.URL; url == nil || url.Host == "" {
logging.FromContext(ctx).Debugw("Trigger Channel does not have an address", zap.Any("triggerChan", triggerChan))
b.Status.MarkTriggerChannelFailed("NoAddress", "Channel does not have an address.")
// Ok to return nil for error here, once channel address becomes available, this will get requeued.
return &chanMan.ref, nil
return nil
}

// Attach the channel address as a status annotation.
if b.Status.Annotations == nil {
b.Status.Annotations = make(map[string]string, 1)
}
b.Status.Annotations[eventing.BrokerChannelAddressStatusAnnotationKey] = triggerChan.Status.Address.URL.String()
b.Status.Annotations[eventing.BrokerChannelKindStatusAnnotationKey] = chanMan.ref.Kind
b.Status.Annotations[eventing.BrokerChannelAPIVersionStatusAnnotationKey] = chanMan.ref.APIVersion
b.Status.Annotations[eventing.BrokerChannelNameStatusAnnotationKey] = chanMan.ref.Name

channelStatus := &duckv1.ChannelableStatus{AddressStatus: pkgduckv1.AddressStatus{Address: &pkgduckv1.Addressable{URL: triggerChan.Status.Address.URL}}}
b.Status.PropagateTriggerChannelReadiness(channelStatus)
Expand All @@ -165,15 +140,15 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (*
if err != nil {
logging.FromContext(ctx).Errorw("Problem getting endpoints for filter", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return nil, err
return err
}
b.Status.PropagateFilterAvailability(filterEndpoints)

ingressEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(names.BrokerIngressName)
if err != nil {
logging.FromContext(ctx).Errorw("Problem getting endpoints for ingress", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return nil, err
return err
}
b.Status.PropagateIngressAvailability(ingressEndpoints)

Expand All @@ -187,7 +162,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, b *eventingv1.Broker) (*

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return &chanMan.ref, nil
return nil
}

type channelTemplate struct {
Expand Down Expand Up @@ -255,13 +230,6 @@ func (r *Reconciler) getChannelTemplate(ctx context.Context, b *eventingv1.Broke
}, nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event {
n3wscott marked this conversation as resolved.
Show resolved Hide resolved
if err := r.propagateBrokerStatusToTriggers(ctx, b.Namespace, b.Name, nil); err != nil {
return fmt.Errorf("Trigger reconcile failed: %v", err)
}
return nil
}

// reconcileChannel reconciles Broker's 'b' underlying channel.
func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, channelObjRef corev1.ObjectReference, newChannel *unstructured.Unstructured, b *eventingv1.Broker) (*duckv1.Channelable, error) {
lister, err := r.channelableTracker.ListerFor(channelObjRef)
Expand Down Expand Up @@ -309,75 +277,3 @@ func TriggerChannelLabels(brokerName string) map[string]string {
"eventing.knative.dev/brokerEverything": "true",
}
}

// reconcileTriggers reconciles the Triggers that are pointed to this broker
func (r *Reconciler) reconcileTriggers(ctx context.Context, b *eventingv1.Broker, triggerChan *corev1.ObjectReference) error {
recorder := controller.GetEventRecorder(ctx)
triggers, err := r.triggerLister.Triggers(b.Namespace).List(labels.Everything())
if err != nil {
return err
}
for _, t := range triggers {
if t.Spec.Broker == b.Name {
trigger := t.DeepCopy()
tErr := r.reconcileTrigger(ctx, b, trigger, triggerChan)
if tErr != nil {
logging.FromContext(ctx).Errorw("Reconciling trigger failed:", zap.String("name", t.Name), zap.Error(err))
recorder.Eventf(trigger, corev1.EventTypeWarning, triggerReconcileFailed, "Trigger reconcile failed: %v", tErr)
}
trigger.Status.ObservedGeneration = t.Generation
if _, updateStatusErr := r.updateTriggerStatus(ctx, trigger); updateStatusErr != nil {
logging.FromContext(ctx).Errorw("Failed to update Trigger status", zap.Error(updateStatusErr))
recorder.Eventf(trigger, corev1.EventTypeWarning, triggerUpdateStatusFailed, "Failed to update Trigger's status: %v", updateStatusErr)
}
}
}
return nil
}

func (r *Reconciler) propagateBrokerStatusToTriggers(ctx context.Context, namespace, name string, bs *eventingv1.BrokerStatus) error {
recorder := controller.GetEventRecorder(ctx)
triggers, err := r.triggerLister.Triggers(namespace).List(labels.Everything())
if err != nil {
return err
}
for _, t := range triggers {
if t.Spec.Broker == name {
// Don't modify informers copy
trigger := t.DeepCopy()
trigger.Status.InitializeConditions()
if bs == nil {
trigger.Status.MarkBrokerFailed("BrokerDoesNotExist", "Broker %q does not exist", name)
} else {
trigger.Status.PropagateBrokerCondition(bs.GetTopLevelCondition())
}
if _, updateStatusErr := r.updateTriggerStatus(ctx, trigger); updateStatusErr != nil {
logging.FromContext(ctx).Errorw("Failed to update Trigger status", zap.Error(updateStatusErr))
recorder.Eventf(trigger, corev1.EventTypeWarning, triggerUpdateStatusFailed, "Failed to update Trigger's status: %v", updateStatusErr)
return updateStatusErr
}
}
}
return nil
}

// lifted from pkg/reconciler/reconcile_common for now...
// groomConditionsTransitionTime ensures that the LastTransitionTime only advances for resources
// where the condition has changed during reconciliation. This also ensures that all advanced
// conditions share the same timestamp.
func groomConditionsTransitionTime(resource, oldResource pkgduckv1.KRShaped) {
now := apis.VolatileTime{Inner: metav1.NewTime(time.Now())}
sts := resource.GetStatus()
for i := range sts.Conditions {
cond := &sts.Conditions[i]

if oldCond := oldResource.GetStatus().GetCondition(cond.Type); oldCond != nil {
cond.LastTransitionTime = oldCond.LastTransitionTime
if reflect.DeepEqual(cond, oldCond) {
continue
}
}

cond.LastTransitionTime = now
}
}
Loading