From 7a9116f274bcd85e5e15d7ceb44b9cd932e606f0 Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Thu, 27 Jul 2023 03:02:11 +0300 Subject: [PATCH] Use an EventHandler with the controller.external to retry on scheduling errors - The external client will requeue at most 20 times before reporting an error Signed-off-by: Alper Rifat Ulucinar --- pkg/controller/api.go | 8 ++- pkg/controller/external.go | 67 ++++++++++++++++++++--- pkg/controller/handler/eventhandler.go | 36 ++++++++---- pkg/pipeline/templates/controller.go.tmpl | 2 +- pkg/terraform/errors/errors.go | 23 ++++++++ pkg/terraform/provider_runner.go | 24 ++++---- pkg/terraform/provider_scheduler.go | 33 ++++++++--- 7 files changed, 150 insertions(+), 43 deletions(-) diff --git a/pkg/controller/api.go b/pkg/controller/api.go index 5f5d2c7a..da17a591 100644 --- a/pkg/controller/api.go +++ b/pkg/controller/api.go @@ -41,6 +41,10 @@ const ( errReconcileRequestFmt = "cannot request the reconciliation of the resource %s/%s after an async %s" ) +const ( + rateLimiterCallback = "asyncCallback" +) + var _ CallbackProvider = &APICallbacks{} // APISecretClient is a client for getting k8s secrets @@ -116,11 +120,11 @@ func (ac *APICallbacks) callbackFn(name, op string, requeue bool) terraform.Call case err != nil: // TODO: use the errors.Join from // github.com/crossplane/crossplane-runtime. - if ok := ac.eventHandler.RequestReconcile(name); !ok { + if ok := ac.eventHandler.RequestReconcile(rateLimiterCallback, name, nil); !ok { return errors.Errorf(errReconcileRequestFmt, tr.GetObjectKind().GroupVersionKind().String(), name, op) } default: - ac.eventHandler.Forget(name) + ac.eventHandler.Forget(rateLimiterCallback, name) } } return uErr diff --git a/pkg/controller/external.go b/pkg/controller/external.go index 36131276..48db6153 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -8,6 +8,8 @@ import ( "context" "time" + tferrors "github.com/upbound/upjet/pkg/terraform/errors" + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" @@ -17,6 +19,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/upbound/upjet/pkg/config" + "github.com/upbound/upjet/pkg/controller/handler" "github.com/upbound/upjet/pkg/metrics" "github.com/upbound/upjet/pkg/resource" "github.com/upbound/upjet/pkg/resource/json" @@ -34,10 +37,15 @@ const ( errStartAsyncDestroy = "cannot start async destroy" errApply = "cannot apply" errDestroy = "cannot destroy" - errScheduleProvider = "cannot schedule native Terraform provider process" + errScheduleProvider = "cannot schedule native Terraform provider process, please consider increasing its TTL with the --provider-ttl command-line option" errUpdateAnnotations = "cannot update managed resource annotations" ) +const ( + rateLimiterScheduler = "scheduler" + retryLimit = 20 +) + // Option allows you to configure Connector. type Option func(*Connector) @@ -57,6 +65,14 @@ func WithLogger(l logging.Logger) Option { } } +// WithConnectorEventHandler configures the EventHandler so that +// the external clients can requeue reconciliation requests. +func WithConnectorEventHandler(e *handler.EventHandler) Option { + return func(c *Connector) { + c.eventHandler = e + } +} + // NewConnector returns a new Connector object. func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector { c := &Connector{ @@ -80,6 +96,7 @@ type Connector struct { getTerraformSetup terraform.SetupFn config *config.Resource callback CallbackProvider + eventHandler *handler.EventHandler logger logging.Logger } @@ -106,6 +123,7 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed callback: c.callback, providerScheduler: ts.Scheduler, providerHandle: ws.ProviderHandle, + eventHandler: c.eventHandler, kube: c.kube, logger: c.logger.WithValues("uid", mg.GetUID()), }, nil @@ -117,22 +135,31 @@ type external struct { callback CallbackProvider providerScheduler terraform.ProviderScheduler providerHandle terraform.ProviderHandle + eventHandler *handler.EventHandler kube client.Client logger logging.Logger } -func (e *external) scheduleProvider() error { +func (e *external) scheduleProvider(name string) (bool, error) { if e.providerScheduler == nil || e.workspace == nil { - return nil + return false, nil } inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle) if err != nil { - return errors.Wrap(err, errScheduleProvider) + retryLimit := retryLimit + if tferrors.IsRetryScheduleError(err) && (e.eventHandler != nil && e.eventHandler.RequestReconcile(rateLimiterScheduler, name, &retryLimit)) { + // the reconcile request has been requeued for a rate-limited retry + return true, nil + } + return false, errors.Wrap(err, errScheduleProvider) + } + if e.eventHandler != nil { + e.eventHandler.Forget(rateLimiterScheduler, name) } if ps, ok := e.workspace.(ProviderSharer); ok { ps.UseProvider(inuse, attachmentConfig) } - return nil + return false, nil } func (e *external) stopProvider() { @@ -149,10 +176,20 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. // and serial. // TODO(muvaf): Look for ways to reduce the cyclomatic complexity without // increasing the difficulty of understanding the flow. - if err := e.scheduleProvider(); err != nil { + requeued, err := e.scheduleProvider(mg.GetName()) + if err != nil { return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID()) } + if requeued { + // return a noop for Observe after requeuing the reconcile request + // for a retry. + return managed.ExternalObservation{ + ResourceExists: true, + ResourceUpToDate: true, + }, nil + } defer e.stopProvider() + tr, ok := mg.(resource.Terraformed) if !ok { return managed.ExternalObservation{}, errors.New(errUnexpectedObject) @@ -305,9 +342,13 @@ func addTTR(mg xpresource.Managed) { } func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) { - if err := e.scheduleProvider(); err != nil { + requeued, err := e.scheduleProvider(mg.GetName()) + if err != nil { return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID()) } + if requeued { + return managed.ExternalCreation{}, nil + } defer e.stopProvider() if e.config.UseAsync { return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Create(mg.GetName())), errStartAsyncApply) @@ -336,9 +377,13 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) { - if err := e.scheduleProvider(); err != nil { + requeued, err := e.scheduleProvider(mg.GetName()) + if err != nil { return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID()) } + if requeued { + return managed.ExternalUpdate{}, nil + } defer e.stopProvider() if e.config.UseAsync { return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Update(mg.GetName())), errStartAsyncApply) @@ -359,9 +404,13 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error { - if err := e.scheduleProvider(); err != nil { + requeued, err := e.scheduleProvider(mg.GetName()) + if err != nil { return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID()) } + if requeued { + return nil + } defer e.stopProvider() if e.config.UseAsync { return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy) diff --git a/pkg/controller/handler/eventhandler.go b/pkg/controller/handler/eventhandler.go index 58f4f9fe..027d1798 100644 --- a/pkg/controller/handler/eventhandler.go +++ b/pkg/controller/handler/eventhandler.go @@ -28,24 +28,24 @@ import ( // EventHandler handles Kubernetes events by queueing reconcile requests for // objects and allows upjet components to queue reconcile requests. type EventHandler struct { - innerHandler handler.EventHandler - queue workqueue.RateLimitingInterface - rateLimiter workqueue.RateLimiter - mu *sync.Mutex + innerHandler handler.EventHandler + queue workqueue.RateLimitingInterface + rateLimiterMap map[string]workqueue.RateLimiter + mu *sync.RWMutex } // NewEventHandler initializes a new EventHandler instance. func NewEventHandler() *EventHandler { return &EventHandler{ - innerHandler: &handler.EnqueueRequestForObject{}, - mu: &sync.Mutex{}, - rateLimiter: workqueue.DefaultControllerRateLimiter(), + innerHandler: &handler.EnqueueRequestForObject{}, + mu: &sync.RWMutex{}, + rateLimiterMap: make(map[string]workqueue.RateLimiter), } } // RequestReconcile requeues a reconciliation request for the specified name. // Returns true if the reconcile request was successfully queued. -func (e *EventHandler) RequestReconcile(name string) bool { +func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLimit *int) bool { e.mu.Lock() defer e.mu.Unlock() if e.queue == nil { @@ -56,14 +56,28 @@ func (e *EventHandler) RequestReconcile(name string) bool { Name: name, }, } - e.queue.AddAfter(item, e.rateLimiter.When(item)) + rateLimiter := e.rateLimiterMap[rateLimiterName] + if rateLimiter == nil { + rateLimiter = workqueue.DefaultControllerRateLimiter() + e.rateLimiterMap[rateLimiterName] = rateLimiter + } + if failureLimit != nil && rateLimiter.NumRequeues(item) > *failureLimit { + return false + } + e.queue.AddAfter(item, rateLimiter.When(item)) return true } // Forget indicates that the reconcile retries is finished for // the specified name. -func (e *EventHandler) Forget(name string) { - e.rateLimiter.Forget(reconcile.Request{ +func (e *EventHandler) Forget(rateLimiterName, name string) { + e.mu.RLock() + defer e.mu.RUnlock() + rateLimiter := e.rateLimiterMap[rateLimiterName] + if rateLimiter == nil { + return + } + rateLimiter.Forget(reconcile.Request{ NamespacedName: types.NamespacedName{ Name: name, }, diff --git a/pkg/pipeline/templates/controller.go.tmpl b/pkg/pipeline/templates/controller.go.tmpl index 2a2aa2f3..6042e8ba 100644 --- a/pkg/pipeline/templates/controller.go.tmpl +++ b/pkg/pipeline/templates/controller.go.tmpl @@ -39,7 +39,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(o.EventHandler)) {{- end}} opts := []managed.ReconcilerOption{ - managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), + managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(o.EventHandler), {{- if .UseAsync }} tjcontroller.WithCallbackProvider(ac), {{- end}} diff --git a/pkg/terraform/errors/errors.go b/pkg/terraform/errors/errors.go index af6d1396..dbe91342 100644 --- a/pkg/terraform/errors/errors.go +++ b/pkg/terraform/errors/errors.go @@ -161,3 +161,26 @@ func IsPlanFailed(err error) bool { r := &planFailed{} return errors.As(err, &r) } + +type retrySchedule struct { + invocationCount int + ttl int +} + +func NewRetryScheduleError(invocationCount, ttl int) error { + return &retrySchedule{ + invocationCount: invocationCount, + ttl: ttl, + } +} + +func (r *retrySchedule) Error() string { + return fmt.Sprintf("native provider reuse budget has been exceeded: invocationCount: %d, ttl: %d", r.invocationCount, r.ttl) +} + +// IsRetryScheduleError returns whether the error is a retry error +// for the scheduler. +func IsRetryScheduleError(err error) bool { + r := &retrySchedule{} + return errors.As(err, &r) +} diff --git a/pkg/terraform/provider_runner.go b/pkg/terraform/provider_runner.go index d29446b4..101fe0f6 100644 --- a/pkg/terraform/provider_runner.go +++ b/pkg/terraform/provider_runner.go @@ -94,46 +94,46 @@ type SharedProviderOption func(runner *SharedProvider) // WithNativeProviderArgs are the arguments to be passed to the native provider func WithNativeProviderArgs(args ...string) SharedProviderOption { - return func(sr *SharedProvider) { - sr.nativeProviderArgs = args + return func(sp *SharedProvider) { + sp.nativeProviderArgs = args } } // WithNativeProviderExecutor sets the process executor to be used func WithNativeProviderExecutor(e exec.Interface) SharedProviderOption { - return func(sr *SharedProvider) { - sr.executor = e + return func(sp *SharedProvider) { + sp.executor = e } } // WithProtocolVersion sets the gRPC protocol version in use between // the Terraform CLI and the native provider. func WithProtocolVersion(protocolVersion int) SharedProviderOption { - return func(sr *SharedProvider) { - sr.protocolVersion = protocolVersion + return func(sp *SharedProvider) { + sp.protocolVersion = protocolVersion } } // WithNativeProviderPath configures the Terraform provider executable path // for the runner. func WithNativeProviderPath(p string) SharedProviderOption { - return func(sr *SharedProvider) { - sr.nativeProviderPath = p + return func(sp *SharedProvider) { + sp.nativeProviderPath = p } } // WithNativeProviderName configures the Terraform provider name // for the runner. func WithNativeProviderName(n string) SharedProviderOption { - return func(sr *SharedProvider) { - sr.nativeProviderName = n + return func(sp *SharedProvider) { + sp.nativeProviderName = n } } // WithNativeProviderLogger configures the logger for the runner. func WithNativeProviderLogger(logger logging.Logger) SharedProviderOption { - return func(sr *SharedProvider) { - sr.logger = logger + return func(sp *SharedProvider) { + sp.logger = logger } } diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go index 0639b83b..91b4985c 100644 --- a/pkg/terraform/provider_scheduler.go +++ b/pkg/terraform/provider_scheduler.go @@ -19,6 +19,8 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/pkg/errors" + + tferrors "github.com/upbound/upjet/pkg/terraform/errors" ) // ProviderHandle represents native plugin (Terraform provider) process @@ -122,16 +124,31 @@ type SharedProviderScheduler struct { logger logging.Logger } +// SharedProviderSchedulerOption represents an option to configure the +// SharedProviderScheduler. +type SharedProviderSchedulerOption func(scheduler *SharedProviderScheduler) + +// WithSharedProviderOptions configures the SharedProviderOptions to be +// passed down to the managed SharedProviders. +func WithSharedProviderOptions(opts ...SharedProviderOption) SharedProviderSchedulerOption { + return func(scheduler *SharedProviderScheduler) { + scheduler.runnerOpts = opts + } +} + // NewSharedProviderScheduler initializes a new SharedProviderScheduler // with the specified logger and options. -func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedProviderOption) *SharedProviderScheduler { - return &SharedProviderScheduler{ - runnerOpts: opts, - mu: &sync.Mutex{}, - runners: make(map[ProviderHandle]*schedulerEntry), - logger: l, - ttl: ttl, +func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedProviderSchedulerOption) *SharedProviderScheduler { + scheduler := &SharedProviderScheduler{ + mu: &sync.Mutex{}, + runners: make(map[ProviderHandle]*schedulerEntry), + logger: l, + ttl: ttl, + } + for _, o := range opts { + o(scheduler) } + return scheduler } func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { @@ -144,7 +161,7 @@ func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) case r != nil && (r.invocationCount < s.ttl || r.inUse > 0): if r.invocationCount > int(float64(s.ttl)*(1+ttlMargin)) { logger.Debug("Reuse budget has been exceeded. Caller will need to retry.") - return nil, "", errors.Errorf("native provider reuse budget has been exceeded: invocationCount: %d, ttl: %d", r.invocationCount, s.ttl) + return nil, "", tferrors.NewRetryScheduleError(r.invocationCount, s.ttl) } logger.Debug("Reusing the provider runner", "invocationCount", r.invocationCount)