Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add terraform.ProviderScheduler #178

Merged
merged 4 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
Expand All @@ -32,6 +33,7 @@ const (
errApply = "cannot apply"
errDestroy = "cannot destroy"
errStatusUpdate = "cannot update status of custom resource"
errScheduleProvider = "cannot schedule native Terraform provider process"
)

// Option allows you to configure Connector.
Expand All @@ -46,13 +48,21 @@ func WithCallbackProvider(ac CallbackProvider) Option {
}
}

// WithLogger configures a logger for the Connector.
func WithLogger(l logging.Logger) Option {
return func(c *Connector) {
c.logger = l
}
}

// NewConnector returns a new Connector object.
func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector {
c := &Connector{
kube: kube,
getTerraformSetup: sf,
store: ws,
config: cfg,
logger: logging.NewNopLogger(),
}
for _, f := range opts {
f(c)
Expand All @@ -68,6 +78,7 @@ type Connector struct {
getTerraformSetup terraform.SetupFn
config *config.Resource
callback CallbackProvider
logger logging.Logger
}

// Connect makes sure the underlying client is ready to issue requests to the
Expand All @@ -83,29 +94,61 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
return nil, errors.Wrap(err, errGetTerraformSetup)
}

tf, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
if err != nil {
return nil, errors.Wrap(err, errGetWorkspace)
}

return &external{
workspace: tf,
config: c.config,
callback: c.callback,
workspace: ws,
config: c.config,
callback: c.callback,
providerScheduler: ts.Scheduler,
providerHandle: ws.ProviderHandle,
logger: c.logger.WithValues("uid", mg.GetUID()),
}, nil
}

type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
workspace Workspace
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
logger logging.Logger
}

func (e *external) scheduleProvider() error {
if e.providerScheduler == nil || e.workspace == nil {
return nil
}
inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle)
if err != nil {
return errors.Wrap(err, errScheduleProvider)
}
if ps, ok := e.workspace.(ProviderSharer); ok {
ps.UseProvider(inuse, attachmentConfig)
}
return nil
}

func (e *external) stopProvider() {
if e.providerScheduler == nil {
return
}
if err := e.providerScheduler.Stop(e.providerHandle); err != nil {
e.logger.Info("ExternalClient failed to stop the native provider", "error", err)
}
}

func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo
// We skip the gocyclo check because most of the operations are straight-forward
// and serial.
// TODO(muvaf): Look for ways to reduce the cyclomatic complexity without
// increasing the difficulty of understanding the flow.
if err := e.scheduleProvider(); err != nil {
return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID())
}
defer e.stopProvider()
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalObservation{}, errors.New(errUnexpectedObject)
Expand Down Expand Up @@ -220,6 +263,10 @@ func addTTR(mg xpresource.Managed) {
}

func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand Down Expand Up @@ -247,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand All @@ -266,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error {
if err := e.scheduleProvider(); err != nil {
return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/upbound/upjet/pkg/terraform"
)

const (
testPath = "test/path"
)

var (
errBoom = errors.New("boom")
exampleState = &json.StateV4{
Expand Down Expand Up @@ -154,7 +158,7 @@ func TestConnect(t *testing.T) {
},
store: StoreFns{
WorkspaceFn: func(_ context.Context, _ resource.SecretClient, _ resource.Terraformed, _ terraform.Setup, _ *config.Resource) (*terraform.Workspace, error) {
return nil, nil
return terraform.NewWorkspace(testPath), nil
},
},
},
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type Workspace interface {
Plan(context.Context) (terraform.PlanResult, error)
}

// ProviderSharer shares a native provider process with the receiver.
type ProviderSharer interface {
UseProvider(inuse terraform.InUse, attachmentConfig string)
}

// Store is where we can get access to the Terraform workspace of given resource.
type Store interface {
Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error)
Expand Down
22 changes: 0 additions & 22 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,6 @@ const (
promSysResource = "resource"
)

// ExecMode is the Terraform CLI execution mode label
type ExecMode int

const (
// ModeSync represents the synchronous execution mode
ModeSync ExecMode = iota
// ModeASync represents the asynchronous execution mode
ModeASync
)

// String converts an execMode to string
func (em ExecMode) String() string {
switch em {
case ModeSync:
return "sync"
case ModeASync:
return "async"
default:
return "unknown"
}
}

var (
// CLITime is the Terraform CLI execution times histogram.
CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
}
r := managed.NewReconciler(mgr,
xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind),
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"],
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))),
{{- end}}
Expand Down
10 changes: 7 additions & 3 deletions pkg/terraform/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type FileProducer struct {

// WriteMainTF writes the content main configuration file that has the desired
// state configuration for Terraform.
func (fp *FileProducer) WriteMainTF() error {
func (fp *FileProducer) WriteMainTF() (ProviderHandle, error) {
// If the resource is in a deletion process, we need to remove the deletion
// protection.
fp.parameters["lifecycle"] = map[string]bool{
Expand Down Expand Up @@ -129,9 +129,13 @@ func (fp *FileProducer) WriteMainTF() error {
}
rawMainTF, err := json.JSParser.Marshal(m)
if err != nil {
return errors.Wrap(err, "cannot marshal main hcl object")
return InvalidProviderHandle, errors.Wrap(err, "cannot marshal main hcl object")
}
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile)
h, err := fp.Setup.Configuration.ToProviderHandle()
if err != nil {
return InvalidProviderHandle, errors.Wrap(err, "cannot get scheduler handle")
}
return h, errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile)
}

// EnsureTFState writes the Terraform state that should exist in the filesystem
Expand Down
2 changes: 1 addition & 1 deletion pkg/terraform/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestWriteMainTF(t *testing.T) {
if err != nil {
t.Errorf("cannot initialize a file producer: %s", err.Error())
}
err = fp.WriteMainTF()
_, err = fp.WriteMainTF()
if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" {
t.Errorf("\n%s\nWriteMainTF(...): -want error, +got error:\n%s", tc.reason, diff)
}
Expand Down
Loading