-
Notifications
You must be signed in to change notification settings - Fork 87
/
external.go
468 lines (428 loc) · 17 KB
/
external.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
/*
Copyright 2021 Upbound Inc.
*/
package controller
import (
"context"
"time"
tferrors "github.com/upbound/upjet/pkg/terraform/errors"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/upbound/upjet/pkg/config"
"github.com/upbound/upjet/pkg/controller/handler"
"github.com/upbound/upjet/pkg/metrics"
"github.com/upbound/upjet/pkg/resource"
"github.com/upbound/upjet/pkg/resource/json"
"github.com/upbound/upjet/pkg/terraform"
)
const (
errUnexpectedObject = "the custom resource is not a Terraformed resource"
errGetTerraformSetup = "cannot get terraform setup"
errGetWorkspace = "cannot get a terraform workspace for resource"
errRefresh = "cannot run refresh"
errImport = "cannot run import"
errPlan = "cannot run plan"
errStartAsyncApply = "cannot start async apply"
errStartAsyncDestroy = "cannot start async destroy"
errApply = "cannot apply"
errDestroy = "cannot destroy"
errScheduleProvider = "cannot schedule native Terraform provider process, please consider increasing its TTL with the --provider-ttl command-line option"
errUpdateAnnotations = "cannot update managed resource annotations"
)
const (
rateLimiterScheduler = "scheduler"
retryLimit = 20
)
// Option allows you to configure Connector.
type Option func(*Connector)
// WithCallbackProvider configures the controller to use async variant of the functions
// of the Terraform client and run given callbacks once those operations are
// completed.
func WithCallbackProvider(ac CallbackProvider) Option {
return func(c *Connector) {
c.callback = ac
}
}
// WithLogger configures a logger for the Connector.
func WithLogger(l logging.Logger) Option {
return func(c *Connector) {
c.logger = l
}
}
// WithConnectorEventHandler configures the EventHandler so that
// the external clients can requeue reconciliation requests.
func WithConnectorEventHandler(e *handler.EventHandler) Option {
return func(c *Connector) {
c.eventHandler = e
}
}
// NewConnector returns a new Connector object.
func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector {
c := &Connector{
kube: kube,
getTerraformSetup: sf,
store: ws,
config: cfg,
logger: logging.NewNopLogger(),
}
for _, f := range opts {
f(c)
}
return c
}
// Connector initializes the external client with credentials and other configuration
// parameters.
type Connector struct {
kube client.Client
store Store
getTerraformSetup terraform.SetupFn
config *config.Resource
callback CallbackProvider
eventHandler *handler.EventHandler
logger logging.Logger
}
// Connect makes sure the underlying client is ready to issue requests to the
// provider API.
func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed.ExternalClient, error) {
tr, ok := mg.(resource.Terraformed)
if !ok {
return nil, errors.New(errUnexpectedObject)
}
ts, err := c.getTerraformSetup(ctx, c.kube, mg)
if err != nil {
return nil, errors.Wrap(err, errGetTerraformSetup)
}
ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
if err != nil {
return nil, errors.Wrap(err, errGetWorkspace)
}
return &external{
workspace: ws,
config: c.config,
callback: c.callback,
providerScheduler: ts.Scheduler,
providerHandle: ws.ProviderHandle,
eventHandler: c.eventHandler,
kube: c.kube,
logger: c.logger.WithValues("uid", mg.GetUID(), "name", mg.GetName(), "gvk", mg.GetObjectKind().GroupVersionKind().String()),
}, nil
}
type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
eventHandler *handler.EventHandler
kube client.Client
logger logging.Logger
}
func (e *external) scheduleProvider(name string) (bool, error) {
if e.providerScheduler == nil || e.workspace == nil {
return false, nil
}
inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle)
if err != nil {
retryLimit := retryLimit
if tferrors.IsRetryScheduleError(err) && (e.eventHandler != nil && e.eventHandler.RequestReconcile(rateLimiterScheduler, name, &retryLimit)) {
// the reconcile request has been requeued for a rate-limited retry
return true, nil
}
return false, errors.Wrap(err, errScheduleProvider)
}
if e.eventHandler != nil {
e.eventHandler.Forget(rateLimiterScheduler, name)
}
if ps, ok := e.workspace.(ProviderSharer); ok {
ps.UseProvider(inuse, attachmentConfig)
}
return false, nil
}
func (e *external) stopProvider() {
if e.providerScheduler == nil {
return
}
if err := e.providerScheduler.Stop(e.providerHandle); err != nil {
e.logger.Info("ExternalClient failed to stop the native provider", "error", err)
}
}
func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo
// We skip the gocyclo check because most of the operations are straight-forward
// and serial.
// TODO(muvaf): Look for ways to reduce the cyclomatic complexity without
// increasing the difficulty of understanding the flow.
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID())
}
if requeued {
// return a noop for Observe after requeuing the reconcile request
// for a retry.
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
}, nil
}
defer e.stopProvider()
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalObservation{}, errors.New(errUnexpectedObject)
}
policySet := sets.New[xpv1.ManagementAction](tr.GetManagementPolicies()...)
// Note(turkenh): We don't need to check if the management policies are
// enabled or not because the crossplane-runtime's managed reconciler already
// does that for us. In other words, if the management policies are set
// without management policies being enabled, the managed
// reconciler will error out before reaching this point.
// https://github.com/crossplane/crossplane-runtime/pull/384/files#diff-97300a2543f95f5a2ada3560bf47dd7334e237e27976574d15d1cddef2e66c01R696
// Note (lsviben) We are only using import instead of refresh if the
// management policies do not contain create or update as they need the
// required fields to be set, which is not the case for import.
if !policySet.HasAny(xpv1.ManagementActionCreate, xpv1.ManagementActionUpdate, xpv1.ManagementActionAll) {
return e.Import(ctx, tr)
}
res, err := e.workspace.Refresh(ctx)
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errRefresh)
}
switch {
case res.ASyncInProgress:
mg.SetConditions(resource.AsyncOperationOngoingCondition())
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
}, nil
case !res.Exists:
return managed.ExternalObservation{
ResourceExists: false,
}, nil
}
// There might be a case where async operation is finished and the status
// update marking it as finished didn't go through. At this point, we are
// sure that there is no ongoing operation.
if e.config.UseAsync {
tr.SetConditions(resource.AsyncOperationFinishedCondition())
}
// No operation was in progress, our observation completed successfully, and
// we have an observation to consume.
tfstate := map[string]any{}
if err := json.JSParser.Unmarshal(res.State.GetAttributes(), &tfstate); err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot unmarshal state attributes")
}
if err := tr.SetObservation(tfstate); err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot set observation")
}
// NOTE(lsviben) although the annotations were supposed to be set and the
// managed resource updated during the Create step, we are checking and
// updating the annotations here due to the fact that in most cases, the
// Create step is done asynchronously and the managed resource is not
// updated with the annotations. That is why below we are prioritizing the
// annotations update before anything else. We are setting lateInitialized
// to true so that the reconciler updates the managed resource. This
// behavior conflicts with management policies in which LateInitialize is
// turned off. To circumvent this, we are checking if the management policy
// does not contain LateInitialize and if it does not, we are updating the
// annotations manually.
annotationsUpdated, err := resource.SetCriticalAnnotations(tr, e.config, tfstate, string(res.State.GetPrivateRaw()))
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot set critical annotations")
}
policyHasLateInit := policySet.HasAny(xpv1.ManagementActionLateInitialize, xpv1.ManagementActionAll)
if annotationsUpdated && !policyHasLateInit {
if err := e.kube.Update(ctx, mg); err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errUpdateAnnotations)
}
annotationsUpdated = false
}
conn, err := resource.GetConnectionDetails(tfstate, tr, e.config)
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot get connection details")
}
var lateInitedParams bool
if policyHasLateInit {
lateInitedParams, err = tr.LateInitialize(res.State.GetAttributes())
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot late initialize parameters")
}
}
markedAvailable := tr.GetCondition(xpv1.TypeReady).Equal(xpv1.Available())
// In the following switch block, before running a relatively costly
// Terraform apply and that may fail before critical annotations are
// updated, or late-initialized configuration is written to main.tf.json,
// we try to perform the following in the given order:
// 1. Update critical annotations if they have changed
// 2. Update status
// 3. Update spec with late-initialized fields
// We prioritize critical annotation updates most not to lose certain info
// (like the Cloud provider generated ID) before anything else. Then we
// prioritize status updates over late-initialization spec updates to
// mark the resource as available as soon as possible because a spec
// update due to late-initialized fields will void the status update.
switch {
// we prioritize critical annotation updates over status updates
case annotationsUpdated:
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
ConnectionDetails: conn,
ResourceLateInitialized: true,
}, nil
// we prioritize status updates over late-init'ed spec updates
case !markedAvailable:
addTTR(tr)
tr.SetConditions(xpv1.Available())
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
ConnectionDetails: conn,
}, nil
// with the least priority wrt critical annotation updates and status updates
// we allow a late-initialization before the Workspace.Plan call
case lateInitedParams:
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
ConnectionDetails: conn,
ResourceLateInitialized: true,
}, nil
// now we do a Workspace.Refresh
default:
plan, err := e.workspace.Plan(ctx)
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errPlan)
}
resource.SetUpToDateCondition(mg, plan.UpToDate)
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: plan.UpToDate,
ConnectionDetails: conn,
}, nil
}
}
func addTTR(mg xpresource.Managed) {
gvk := mg.GetObjectKind().GroupVersionKind()
metrics.TTRMeasurements.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Observe(time.Since(mg.GetCreationTimestamp().Time).Seconds())
}
func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID())
}
if requeued {
return managed.ExternalCreation{}, nil
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Create(mg.GetName())), errStartAsyncApply)
}
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalCreation{}, errors.New(errUnexpectedObject)
}
res, err := e.workspace.Apply(ctx)
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, errApply)
}
tfstate := map[string]any{}
if err := json.JSParser.Unmarshal(res.State.GetAttributes(), &tfstate); err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, "cannot unmarshal state attributes")
}
conn, err := resource.GetConnectionDetails(tfstate, tr, e.config)
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, "cannot get connection details")
}
// NOTE(muvaf): Only spec and metadata changes are saved after Create call.
_, err = resource.SetCriticalAnnotations(tr, e.config, tfstate, string(res.State.GetPrivateRaw()))
return managed.ExternalCreation{ConnectionDetails: conn}, errors.Wrap(err, "cannot set critical annotations")
}
func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID())
}
if requeued {
return managed.ExternalUpdate{}, nil
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Update(mg.GetName())), errStartAsyncApply)
}
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalUpdate{}, errors.New(errUnexpectedObject)
}
res, err := e.workspace.Apply(ctx)
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errApply)
}
attr := map[string]any{}
if err := json.JSParser.Unmarshal(res.State.GetAttributes(), &attr); err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, "cannot unmarshal state attributes")
}
return managed.ExternalUpdate{}, errors.Wrap(tr.SetObservation(attr), "cannot set observation")
}
func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error {
requeued, err := e.scheduleProvider(mg.GetName())
if err != nil {
return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID())
}
if requeued {
return nil
}
defer e.stopProvider()
if e.config.UseAsync {
return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy)
}
return errors.Wrap(e.workspace.Destroy(ctx), errDestroy)
}
func (e *external) Import(ctx context.Context, tr resource.Terraformed) (managed.ExternalObservation, error) {
res, err := e.workspace.Import(ctx, tr)
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errImport)
}
// We normally don't expect apply/destroy to be in progress when the
// management policy is set to "ObserveOnly". However, this could happen
// if the policy is changed to "ObserveOnly" while an async operation is
// in progress. In that case, we want to wait for the operation to finish
// before we start observing.
if res.ASyncInProgress {
tr.SetConditions(resource.AsyncOperationOngoingCondition())
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
}, nil
}
// If the resource doesn't exist, we don't need to do anything else.
// We report it to the managed reconciler as a non-existent resource and
// it will take care of reporting it to the user as an error case for
// observe-only policy.
if !res.Exists {
return managed.ExternalObservation{
ResourceExists: false,
}, nil
}
// No operation was in progress, our observation completed successfully, and
// we have an observation to consume.
tfstate := map[string]any{}
if err := json.JSParser.Unmarshal(res.State.GetAttributes(), &tfstate); err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot unmarshal state attributes")
}
if err := tr.SetObservation(tfstate); err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot set observation")
}
conn, err := resource.GetConnectionDetails(tfstate, tr, e.config)
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "cannot get connection details")
}
tr.SetConditions(xpv1.Available())
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
ConnectionDetails: conn,
}, nil
}