Skip to content

Commit

Permalink
Merge pull request #241 from ulucinar/fix-192
Browse files Browse the repository at this point in the history
Explicitly Queue a Reconcile Request if a Shared Provider has Expired
  • Loading branch information
ulucinar authored Aug 1, 2023
2 parents 2ef67f5 + 7a9116f commit 06bdecc
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 43 deletions.
8 changes: 6 additions & 2 deletions pkg/controller/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const (
errReconcileRequestFmt = "cannot request the reconciliation of the resource %s/%s after an async %s"
)

const (
rateLimiterCallback = "asyncCallback"
)

var _ CallbackProvider = &APICallbacks{}

// APISecretClient is a client for getting k8s secrets
Expand Down Expand Up @@ -116,11 +120,11 @@ func (ac *APICallbacks) callbackFn(name, op string, requeue bool) terraform.Call
case err != nil:
// TODO: use the errors.Join from
// github.com/crossplane/crossplane-runtime.
if ok := ac.eventHandler.RequestReconcile(name); !ok {
if ok := ac.eventHandler.RequestReconcile(rateLimiterCallback, name, nil); !ok {
return errors.Errorf(errReconcileRequestFmt, tr.GetObjectKind().GroupVersionKind().String(), name, op)
}
default:
ac.eventHandler.Forget(name)
ac.eventHandler.Forget(rateLimiterCallback, name)
}
}
return uErr
Expand Down
67 changes: 58 additions & 9 deletions pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"time"

tferrors "github.com/upbound/upjet/pkg/terraform/errors"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
Expand All @@ -17,6 +19,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/upbound/upjet/pkg/config"
"github.com/upbound/upjet/pkg/controller/handler"
"github.com/upbound/upjet/pkg/metrics"
"github.com/upbound/upjet/pkg/resource"
"github.com/upbound/upjet/pkg/resource/json"
Expand All @@ -34,10 +37,15 @@ const (
errStartAsyncDestroy = "cannot start async destroy"
errApply = "cannot apply"
errDestroy = "cannot destroy"
errScheduleProvider = "cannot schedule native Terraform provider process"
errScheduleProvider = "cannot schedule native Terraform provider process, please consider increasing its TTL with the --provider-ttl command-line option"
errUpdateAnnotations = "cannot update managed resource annotations"
)

const (
rateLimiterScheduler = "scheduler"
retryLimit = 20
)

// Option allows you to configure Connector.
type Option func(*Connector)

Expand All @@ -57,6 +65,14 @@ func WithLogger(l logging.Logger) Option {
}
}

// WithConnectorEventHandler configures the EventHandler so that
// the external clients can requeue reconciliation requests.
func WithConnectorEventHandler(e *handler.EventHandler) Option {
return func(c *Connector) {
c.eventHandler = e
}
}

// NewConnector returns a new Connector object.
func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector {
c := &Connector{
Expand All @@ -80,6 +96,7 @@ type Connector struct {
getTerraformSetup terraform.SetupFn
config *config.Resource
callback CallbackProvider
eventHandler *handler.EventHandler
logger logging.Logger
}

Expand All @@ -106,6 +123,7 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
callback: c.callback,
providerScheduler: ts.Scheduler,
providerHandle: ws.ProviderHandle,
eventHandler: c.eventHandler,
kube: c.kube,
logger: c.logger.WithValues("uid", mg.GetUID()),
}, nil
Expand All @@ -117,22 +135,31 @@ type external struct {
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
eventHandler *handler.EventHandler
kube client.Client
logger logging.Logger
}

func (e *external) scheduleProvider() error {
func (e *external) scheduleProvider(name string) (bool, error) {
if e.providerScheduler == nil || e.workspace == nil {
return nil
return false, nil
}
inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle)
if err != nil {
return errors.Wrap(err, errScheduleProvider)
retryLimit := retryLimit
if tferrors.IsRetryScheduleError(err) && (e.eventHandler != nil && e.eventHandler.RequestReconcile(rateLimiterScheduler, name, &retryLimit)) {
// the reconcile request has been requeued for a rate-limited retry
return true, nil
}
return false, errors.Wrap(err, errScheduleProvider)
}
if e.eventHandler != nil {
e.eventHandler.Forget(rateLimiterScheduler, name)
}
if ps, ok := e.workspace.(ProviderSharer); ok {
ps.UseProvider(inuse, attachmentConfig)
}
return nil
return false, nil
}

func (e *external) stopProvider() {
Expand All @@ -149,10 +176,20 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.
// and serial.
// TODO(muvaf): Look for ways to reduce the cyclomatic complexity without
// increasing the difficulty of understanding the flow.
if err := e.scheduleProvider(); err != nil {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID())
}
if requeued {
// return a noop for Observe after requeuing the reconcile request
// for a retry.
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
}, nil
}
defer e.stopProvider()

tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalObservation{}, errors.New(errUnexpectedObject)
Expand Down Expand Up @@ -305,9 +342,13 @@ func addTTR(mg xpresource.Managed) {
}

func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if err := e.scheduleProvider(); err != nil {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID())
}
if requeued {
return managed.ExternalCreation{}, nil
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Create(mg.GetName())), errStartAsyncApply)
Expand Down Expand Up @@ -336,9 +377,13 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
if err := e.scheduleProvider(); err != nil {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID())
}
if requeued {
return managed.ExternalUpdate{}, nil
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Update(mg.GetName())), errStartAsyncApply)
Expand All @@ -359,9 +404,13 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error {
if err := e.scheduleProvider(); err != nil {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID())
}
if requeued {
return nil
}
defer e.stopProvider()
if e.config.UseAsync {
return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy)
Expand Down
36 changes: 25 additions & 11 deletions pkg/controller/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ import (
// EventHandler handles Kubernetes events by queueing reconcile requests for
// objects and allows upjet components to queue reconcile requests.
type EventHandler struct {
innerHandler handler.EventHandler
queue workqueue.RateLimitingInterface
rateLimiter workqueue.RateLimiter
mu *sync.Mutex
innerHandler handler.EventHandler
queue workqueue.RateLimitingInterface
rateLimiterMap map[string]workqueue.RateLimiter
mu *sync.RWMutex
}

// NewEventHandler initializes a new EventHandler instance.
func NewEventHandler() *EventHandler {
return &EventHandler{
innerHandler: &handler.EnqueueRequestForObject{},
mu: &sync.Mutex{},
rateLimiter: workqueue.DefaultControllerRateLimiter(),
innerHandler: &handler.EnqueueRequestForObject{},
mu: &sync.RWMutex{},
rateLimiterMap: make(map[string]workqueue.RateLimiter),
}
}

// RequestReconcile requeues a reconciliation request for the specified name.
// Returns true if the reconcile request was successfully queued.
func (e *EventHandler) RequestReconcile(name string) bool {
func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLimit *int) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.queue == nil {
Expand All @@ -56,14 +56,28 @@ func (e *EventHandler) RequestReconcile(name string) bool {
Name: name,
},
}
e.queue.AddAfter(item, e.rateLimiter.When(item))
rateLimiter := e.rateLimiterMap[rateLimiterName]
if rateLimiter == nil {
rateLimiter = workqueue.DefaultControllerRateLimiter()
e.rateLimiterMap[rateLimiterName] = rateLimiter
}
if failureLimit != nil && rateLimiter.NumRequeues(item) > *failureLimit {
return false
}
e.queue.AddAfter(item, rateLimiter.When(item))
return true
}

// Forget indicates that the reconcile retries is finished for
// the specified name.
func (e *EventHandler) Forget(name string) {
e.rateLimiter.Forget(reconcile.Request{
func (e *EventHandler) Forget(rateLimiterName, name string) {
e.mu.RLock()
defer e.mu.RUnlock()
rateLimiter := e.rateLimiterMap[rateLimiterName]
if rateLimiter == nil {
return
}
rateLimiter.Forget(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(o.EventHandler))
{{- end}}
opts := []managed.ReconcilerOption{
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger),
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(o.EventHandler),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(ac),
{{- end}}
Expand Down
23 changes: 23 additions & 0 deletions pkg/terraform/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,26 @@ func IsPlanFailed(err error) bool {
r := &planFailed{}
return errors.As(err, &r)
}

type retrySchedule struct {
invocationCount int
ttl int
}

func NewRetryScheduleError(invocationCount, ttl int) error {
return &retrySchedule{
invocationCount: invocationCount,
ttl: ttl,
}
}

func (r *retrySchedule) Error() string {
return fmt.Sprintf("native provider reuse budget has been exceeded: invocationCount: %d, ttl: %d", r.invocationCount, r.ttl)
}

// IsRetryScheduleError returns whether the error is a retry error
// for the scheduler.
func IsRetryScheduleError(err error) bool {
r := &retrySchedule{}
return errors.As(err, &r)
}
24 changes: 12 additions & 12 deletions pkg/terraform/provider_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,46 +94,46 @@ type SharedProviderOption func(runner *SharedProvider)

// WithNativeProviderArgs are the arguments to be passed to the native provider
func WithNativeProviderArgs(args ...string) SharedProviderOption {
return func(sr *SharedProvider) {
sr.nativeProviderArgs = args
return func(sp *SharedProvider) {
sp.nativeProviderArgs = args
}
}

// WithNativeProviderExecutor sets the process executor to be used
func WithNativeProviderExecutor(e exec.Interface) SharedProviderOption {
return func(sr *SharedProvider) {
sr.executor = e
return func(sp *SharedProvider) {
sp.executor = e
}
}

// WithProtocolVersion sets the gRPC protocol version in use between
// the Terraform CLI and the native provider.
func WithProtocolVersion(protocolVersion int) SharedProviderOption {
return func(sr *SharedProvider) {
sr.protocolVersion = protocolVersion
return func(sp *SharedProvider) {
sp.protocolVersion = protocolVersion
}
}

// WithNativeProviderPath configures the Terraform provider executable path
// for the runner.
func WithNativeProviderPath(p string) SharedProviderOption {
return func(sr *SharedProvider) {
sr.nativeProviderPath = p
return func(sp *SharedProvider) {
sp.nativeProviderPath = p
}
}

// WithNativeProviderName configures the Terraform provider name
// for the runner.
func WithNativeProviderName(n string) SharedProviderOption {
return func(sr *SharedProvider) {
sr.nativeProviderName = n
return func(sp *SharedProvider) {
sp.nativeProviderName = n
}
}

// WithNativeProviderLogger configures the logger for the runner.
func WithNativeProviderLogger(logger logging.Logger) SharedProviderOption {
return func(sr *SharedProvider) {
sr.logger = logger
return func(sp *SharedProvider) {
sp.logger = logger
}
}

Expand Down
Loading

0 comments on commit 06bdecc

Please sign in to comment.