Skip to content

Commit

Permalink
feat(controller): refactor reconcile loop, update status conditions w…
Browse files Browse the repository at this point in the history
…hen necessary

- Use new up-to-date check with generations instead of hashes.
- Add finalizer without returning early.
- Use status conditions to reflect more persistent messages, e.g.
  configuration errors or validation failures.
  • Loading branch information
tronghn committed Sep 5, 2024
1 parent c7946ae commit 2acefd3
Showing 1 changed file with 135 additions and 125 deletions.
260 changes: 135 additions & 125 deletions controllers/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/go-jose/go-jose/v4"
"github.com/google/uuid"
naisiov1 "github.com/nais/liberator/pkg/apis/nais.io/v1"
libfinalizer "github.com/nais/liberator/pkg/finalizer"
"github.com/nais/liberator/pkg/kubernetes"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/nais/digdirator/pkg/clients"
"github.com/nais/digdirator/pkg/config"
Expand All @@ -29,8 +29,6 @@ import (
"github.com/nais/digdirator/pkg/metrics"
)

const RequeueInterval = 10 * time.Second

type Reconciler struct {
Client client.Client
Reader client.Reader
Expand Down Expand Up @@ -70,78 +68,108 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request, instance c
return ctrl.Result{}, client.IgnoreNotFound(err)
}

defer func() {
tx.Logger.Infof("finished processing request")
}()
if markedForDeletion(tx.Instance) {
return r.finalize(tx)
}

finalizer := r.finalizer(tx)
if !controllerutil.ContainsFinalizer(tx.Instance, FinalizerName) {
controllerutil.AddFinalizer(tx.Instance, FinalizerName)
if err := r.Client.Update(tx.Ctx, tx.Instance); err != nil {
return ctrl.Result{}, fmt.Errorf("registering finalizer: %w", err)
}
}

if libfinalizer.IsBeingDeleted(tx.Instance) {
return finalizer.Process()
if clients.IsUpToDate(tx.Instance) {
tx.Logger.Info("resource is up-to-date; skipping reconciliation")
return ctrl.Result{}, nil
}

if !libfinalizer.HasFinalizer(tx.Instance, FinalizerName) {
return finalizer.Register()
errs := make([]error, 0)

if err := r.process(tx); err != nil {
r.observeError(tx, err)
errs = append(errs, err)
}

if isUpToDate, err := clients.IsUpToDate(tx.Instance); isUpToDate {
if err != nil {
return ctrl.Result{}, err
}
tx.Logger.Info("object state already reconciled, nothing to do")
return ctrl.Result{}, nil
// object is overwritten with response from apiserver after Update, so status is unset
// preserve copy for update of status subresource later on
status := tx.Instance.GetStatus().DeepCopy()

// remove processed annotations from object
if err := r.Client.Update(tx.Ctx, tx.Instance); err != nil {
errs = append(errs, fmt.Errorf("updating object: %w", err))
}

err = r.process(tx)
if err != nil {
return r.handleError(tx, err)
// finally, set status
tx.Instance.SetStatus(*status)
if err := r.Client.Status().Update(tx.Ctx, tx.Instance); err != nil {
errs = append(errs, fmt.Errorf("updating status: %w", err))
}

return r.complete(tx)
// non-nil errors are retried with exponential backoff
// only applies if updates to the status subresource are ignored, otherwise an immediate reconcile is triggered
return ctrl.Result{}, errors.Join(errs...)
}

func (r *Reconciler) prepare(ctx context.Context, req ctrl.Request, instance clients.Instance) (*Transaction, error) {
instanceType := clients.GetInstanceType(instance)
correlationID := uuid.New().String()

logger := *log.WithFields(log.Fields{
if err := r.Reader.Get(ctx, req.NamespacedName, instance); err != nil {
return nil, err
}

status := instance.GetStatus()
status.CorrelationID = correlationID

fields := log.Fields{
"instance_type": instanceType,
"instance_name": req.Name,
"instance_namespace": req.Namespace,
"correlationID": correlationID,
})
"correlation_id": correlationID,
"client_id": status.ClientID,
"key_ids": strings.Join(status.KeyIDs, ", "),
}

if err := r.Reader.Get(ctx, req.NamespacedName, instance); err != nil {
return nil, err
// set status to Unknown if none available
if status.Conditions == nil || len(*status.Conditions) == 0 {
status.SetCondition(metav1.Condition{
Type: EventSynchronized,
Status: metav1.ConditionUnknown,
Reason: "Reconciling",
Message: "Starting reconciliation",
ObservedGeneration: instance.GetGeneration(),
})
}

err := r.Client.Status().Update(ctx, instance)
if err != nil {
return nil, fmt.Errorf("updating status: %w", err)
}
instance.GetStatus().SetCorrelationID(correlationID)

digdirClient, err := digdir.NewClient(r.HttpClient, r.Signer, r.Config, instance, r.ClientID)
if err != nil {
return nil, fmt.Errorf("creating Digdir client: %w", err)
}

logger.Infof("processing %s...", instanceType)

transaction := NewTransaction(
ctx,
instance,
&logger,
log.WithFields(fields),
&digdirClient,
r.Config,
)
return &transaction, nil
}

func (r *Reconciler) process(tx *Transaction) error {
instanceClient, err := r.createOrUpdateClient(tx)
registration, err := r.createOrUpdateClient(tx)
if err != nil {
return err
}

if len(tx.Instance.GetStatus().GetClientID()) == 0 && instanceClient != nil {
tx.Instance.GetStatus().SetClientID(instanceClient.ClientID)
}
status := tx.Instance.GetStatus()
status.ClientID = registration.ClientID

secretsClient := r.secrets(tx)
managedSecrets, err := secretsClient.GetManaged()
Expand All @@ -157,7 +185,7 @@ func (r *Reconciler) process(tx *Transaction) error {
return fmt.Errorf("generating jwk: %w", err)
}

if err := r.registerJwk(tx, *jwk, *managedSecrets, instanceClient.ClientID); err != nil {
if err := r.registerJwk(tx, *jwk, *managedSecrets, registration.ClientID); err != nil {
return err
}

Expand All @@ -178,9 +206,68 @@ func (r *Reconciler) process(tx *Transaction) error {
return err
}

hash, err := tx.Instance.Hash()
if err != nil {
return err
}

generation := tx.Instance.GetGeneration()
status.ObservedGeneration = ptr.To(generation)
status.SynchronizationHash = hash
status.SynchronizationSecretName = clients.GetSecretName(tx.Instance)
status.SetStateSynchronized()
status.SetCondition(metav1.Condition{
Type: EventSynchronized,
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: "Resource is up-to-date with DigDir",
ObservedGeneration: generation,
})

a := tx.Instance.GetAnnotations()
delete(a, clients.AnnotationResynchronize)
delete(a, clients.AnnotationRotate)

r.reportEvent(tx, corev1.EventTypeNormal, EventSynchronized, "Resource is up-to-date")
tx.Logger.Info("successfully reconciled")
metrics.IncClientsProcessed(tx.Instance)

return nil
}

func (r *Reconciler) observeError(tx *Transaction, reconcileErr error) {
tx.Logger.Error(fmt.Errorf("processing resource: %w", reconcileErr))

setStatusCondition := func(message string) {
r.reportEvent(tx, corev1.EventTypeWarning, EventFailedSynchronization, message)
tx.Instance.GetStatus().SetCondition(metav1.Condition{
Type: EventSynchronized,
Status: metav1.ConditionFalse,
Reason: "Failing",
Message: message,
ObservedGeneration: tx.Instance.GetGeneration(),
})
}

setStatusCondition(reconcileErr.Error())

var digdirErr *digdir.Error
if errors.As(reconcileErr, &digdirErr) {
setStatusCondition(digdirErr.Message)

if errors.Is(reconcileErr, digdir.ClientError) {
// Client errors usually happen due to external state or configuration
// that needs to be resolved upstream. We retry after a longer interval than default.
//
// For example, a desired consumer scope may not exist nor be active,
// or the organization has not been granted access to the scope at the time of reconciliation.
metrics.IncClientsFailedInvalidConfig(tx.Instance)
}
}

metrics.IncClientsFailedProcessing(tx.Instance)
}

func (r *Reconciler) createOrUpdateClient(tx *Transaction) (*types.ClientRegistration, error) {
registration, err := tx.DigdirClient.GetRegistration(tx.Instance, tx.Ctx, tx.Config.ClusterName)
if err != nil {
Expand Down Expand Up @@ -244,21 +331,21 @@ func (r *Reconciler) createClient(tx *Transaction, payload types.ClientRegistrat
return nil, fmt.Errorf("registering client: %w", err)
}

tx.Logger = tx.Logger.WithField("ClientID", registrationResponse.ClientID)
tx.Logger = tx.Logger.WithField("client_id", registrationResponse.ClientID)
tx.Logger.Info("client registered")
return registrationResponse, nil
}

func (r *Reconciler) updateClient(tx *Transaction, payload types.ClientRegistration, clientID string) (*types.ClientRegistration, error) {
tx.Logger = tx.Logger.WithField("ClientID", clientID)
tx.Logger = tx.Logger.WithField("client_id", clientID)
tx.Logger.Debug("client already exists, updating...")

registrationResponse, err := tx.DigdirClient.Update(tx.Ctx, payload, clientID)
if err != nil {
return nil, fmt.Errorf("updating client: %w", err)
}

tx.Logger.WithField("ClientID", registrationResponse.ClientID).Info("client updated")
tx.Logger.Info("client updated")
return registrationResponse, err
}

Expand Down Expand Up @@ -288,10 +375,11 @@ func (r *Reconciler) filterConsumedScopes(tx *Transaction, client *naisiov1.Mask
}

if len(invalid) > 0 {
// TODO: this should be an error that is reflected as a status condition
msg := fmt.Sprintf("organization has no access to scopes: [%s]", strings.Join(invalid, ", "))
tx.Logger.Warnf("%s; skipping...", msg)
r.reportEvent(tx, corev1.EventTypeWarning, EventSkipped, msg)
return valid, &digdir.Error{
Err: digdir.ClientError,
Status: http.StatusText(http.StatusBadRequest),
Message: fmt.Sprintf("Organization has no access to scopes: [%s]", strings.Join(invalid, ", ")),
}
}

return valid, nil
Expand All @@ -311,92 +399,14 @@ func (r *Reconciler) registerJwk(tx *Transaction, jwk jose.JSONWebKey, managedSe
return fmt.Errorf("registering JWKS: %w", err)
}

tx.Instance.GetStatus().SetKeyIDs(crypto.KeyIDsFromJwks(&jwksResponse.JSONWebKeySet))
tx.Logger = tx.Logger.WithField("KeyIDs", strings.Join(tx.Instance.GetStatus().GetKeyIDs(), ", "))
tx.Instance.GetStatus().KeyIDs = crypto.KeyIDsFromJwks(&jwksResponse.JSONWebKeySet)
tx.Logger = tx.Logger.WithField("key_ids", strings.Join(tx.Instance.GetStatus().KeyIDs, ", "))
tx.Logger.Info("new JWKS for client registered")

return nil
}

func (r *Reconciler) handleError(tx *Transaction, err error) (ctrl.Result, error) {
tx.Logger.Error(fmt.Errorf("processing client: %w", err))
r.reportEvent(tx, corev1.EventTypeWarning, EventFailedSynchronization, "Failed to synchronize client")

var digdirErr *digdir.Error
requeue := true

if errors.As(err, &digdirErr) {
if errors.Is(err, digdir.ServerError) {
r.reportEvent(tx, corev1.EventTypeNormal, EventRetrying, digdirErr.Message)
} else if errors.Is(err, digdir.ClientError) {
r.reportEvent(tx, corev1.EventTypeWarning, EventSkipped, digdirErr.Message)
requeue = false
}
}

if requeue {
metrics.IncClientsFailedProcessing(tx.Instance)
tx.Logger.Infof("requeuing failed reconciliation after %s", RequeueInterval)
return ctrl.Result{RequeueAfter: RequeueInterval}, nil
}

return ctrl.Result{}, nil
}

func (r *Reconciler) complete(tx *Transaction) (ctrl.Result, error) {
tx.Logger.Debugf("updating status for %s", clients.GetInstanceType(tx.Instance))

hash, err := tx.Instance.Hash()
if err != nil {
return ctrl.Result{}, err
}
tx.Instance.GetStatus().SetHash(hash)

tx.Instance.GetStatus().SetStateSynchronized()
tx.Instance.GetStatus().SetSynchronizationSecretName(clients.GetSecretName(tx.Instance))

if err := r.updateInstance(tx.Ctx, tx.Instance, func(existing clients.Instance) error {
existing.SetStatus(*tx.Instance.GetStatus())
return r.Client.Status().Update(tx.Ctx, existing)
}); err != nil {
r.reportEvent(tx, corev1.EventTypeWarning, EventFailedStatusUpdate, "Failed to update status")
return ctrl.Result{}, fmt.Errorf("updating status subresource: %w", err)
}

tx.Logger.Info("status subresource successfully updated")

r.reportEvent(tx, corev1.EventTypeNormal, EventSynchronized, "Client is up-to-date")
tx.Logger.Info("successfully reconciled")
metrics.IncClientsProcessed(tx.Instance)

return ctrl.Result{}, nil
}

func (r *Reconciler) reportEvent(tx *Transaction, eventType, event, message string) {
tx.Instance.GetStatus().SetSynchronizationState(event)
tx.Instance.GetStatus().SetState(event)
r.Recorder.Event(tx.Instance, eventType, event, message)
}

var instancesync sync.Mutex

func (r *Reconciler) updateInstance(ctx context.Context, instance clients.Instance, updateFunc func(existing clients.Instance) error) error {
instancesync.Lock()
defer instancesync.Unlock()

existing := func(instance clients.Instance) clients.Instance {
switch instance.(type) {
case *naisiov1.IDPortenClient:
return &naisiov1.IDPortenClient{}
case *naisiov1.MaskinportenClient:
return &naisiov1.MaskinportenClient{}
}
return nil
}(instance)

err := r.Reader.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: instance.GetName()}, existing)
if err != nil {
return fmt.Errorf("get newest version of %s: %s", clients.GetInstanceType(instance), err)
}

return updateFunc(existing)
}

0 comments on commit 2acefd3

Please sign in to comment.