Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
Implement shared gRPC server in terraform.WorkspaceStore
Browse files Browse the repository at this point in the history
Signed-off-by: Alper Rifat Ulucinar <ulucinar@users.noreply.github.com>
  • Loading branch information
ulucinar committed Mar 28, 2022
1 parent 8247a83 commit 3f3dce8
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 19 deletions.
13 changes: 0 additions & 13 deletions pkg/config/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ type Provider struct {
// resource name.
Resources map[string]*Resource

// SharedGRPC set to `true` to run the native Terraform plugin in
// shared gRPC mode. In shared gRPC mode, the Terraform CLI does not fork
// the binary plugin at each request but rather send requests to a shared
// instance forked once.
SharedGRPC bool

// resourceConfigurators is a map holding resource configurators where key
// is Terraform resource name.
resourceConfigurators map[string]ResourceConfiguratorChain
Expand Down Expand Up @@ -152,13 +146,6 @@ func WithDefaultResourceFn(f DefaultResourceFn) ProviderOption {
}
}

// WithSharedGRPC configures SharedGRPC for this Provider
func WithSharedGRPC(sharedGRPC bool) ProviderOption {
return func(p *Provider) {
p.SharedGRPC = sharedGRPC
}
}

// NewProviderWithSchema builds and returns a new Provider from provider
// tfjson schema, that is generated using Terraform CLI with:
// `terraform providers schema --json`
Expand Down
5 changes: 2 additions & 3 deletions pkg/terraform/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package terraform
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -139,7 +138,7 @@ func (fp *FileProducer) WriteTFState(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "cannot marshal state object")
}
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "terraform.tfstate"), rawState, os.ModePerm), "cannot write tfstate file")
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "terraform.tfstate"), rawState, 0600), "cannot write tfstate file")
}

// WriteMainTF writes the content main configuration file that has the desired
Expand Down Expand Up @@ -194,5 +193,5 @@ func (fp *FileProducer) WriteMainTF() error {
if err != nil {
return errors.Wrap(err, "cannot marshal main hcl object")
}
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, os.ModePerm), "cannot write tfstate file")
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), "cannot write tfstate file")
}
95 changes: 95 additions & 0 deletions pkg/terraform/shared_grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2022 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terraform

import (
"bufio"
"os"
"regexp"
"time"

"github.com/pkg/errors"
)

const (
envReattachConfig = "TF_REATTACH_PROVIDERS"
regexReattachLine = envReattachConfig + `='(.*)'`
reattachTimeout = 1 * time.Minute
)

func (ws *WorkspaceStore) startSharedServer() error { //nolint:gocyclo
if len(ws.nativeProviderPath) == 0 {
return nil
}
log := ws.logger.WithValues("nativeProviderPath", ws.nativeProviderPath, "nativeProviderArgs", ws.nativeProviderArgs)
if ws.reattachConfig != "" {
log.Debug("Shared gRPC server is running...", "reattachConfig", ws.reattachConfig)
return nil
}
errCh := make(chan error, 1)
reattachCh := make(chan string, 1)
re, err := regexp.Compile(regexReattachLine)
if err != nil {
return errors.Wrap(err, "failed to compile regexp")
}

go func() {
defer close(errCh)
defer close(reattachCh)
defer func() {
ws.mu.Lock()
ws.reattachConfig = ""
ws.mu.Unlock()
}()
//#nosec G204 no user input
cmd := ws.executor.Command(ws.nativeProviderPath, ws.nativeProviderArgs...)
stdout, err := cmd.StdoutPipe()
if err != nil {
errCh <- err
return
}
if err := cmd.Start(); err != nil {
errCh <- err
return
}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
t := scanner.Text()
matches := re.FindStringSubmatch(t)
if matches == nil {
continue
}
reattachCh <- matches[1]
break
}
if err := cmd.Wait(); err != nil {
log.Info("Native Terraform provider process error", "error", err)
errCh <- err
}
}()

select {
case reattachConfig := <-reattachCh:
ws.reattachConfig = reattachConfig
return errors.Wrapf(os.Setenv(envReattachConfig, ws.reattachConfig),
"could not set reattach config env variable %q to value: %s", envReattachConfig, ws.reattachConfig)
case err := <-errCh:
return err
case <-time.After(reattachTimeout):
return errors.Errorf("timed out after %v while waiting for the reattach configuration string", reattachTimeout)
}
}
28 changes: 25 additions & 3 deletions pkg/terraform/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ func WithFs(fs afero.Fs) WorkspaceStoreOption {
}
}

// WithNativeProviderPath enables shared gRPC mode and configures the path
// of the Terraform native provider. When set, Terraform CLI does not fork
// the native plugin for each request but a shared server is used instead.
func WithNativeProviderPath(path string) WorkspaceStoreOption {
return func(ws *WorkspaceStore) {
ws.nativeProviderPath = path
}
}

// WithNativeProviderArgs are the arguments to be passed to the native provider
func WithNativeProviderArgs(args []string) WorkspaceStoreOption {
return func(ws *WorkspaceStore) {
ws.nativeProviderArgs = args
}
}

// NewWorkspaceStore returns a new WorkspaceStore.
func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *WorkspaceStore {
ws := &WorkspaceStore{
Expand All @@ -87,8 +103,11 @@ type WorkspaceStore struct {
// Since there can be multiple calls that add/remove values from the map at
// the same time, it has to be safe for concurrency since those operations
// cause rehashing in some cases.
store map[types.UID]*Workspace
logger logging.Logger
store map[types.UID]*Workspace
logger logging.Logger
nativeProviderPath string
nativeProviderArgs []string
reattachConfig string

mu sync.Mutex

Expand All @@ -99,7 +118,7 @@ type WorkspaceStore struct {
// Workspace makes sure the Terraform workspace for the given resource is ready
// to be used and returns the Workspace object configured to work in that
// workspace folder in the filesystem.
func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts Setup, cfg *config.Resource) (*Workspace, error) {
func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts Setup, cfg *config.Resource) (*Workspace, error) { //nolint:gocyclo
dir := filepath.Join(ws.fs.GetTempDir(""), string(tr.GetUID()))
if err := ws.fs.MkdirAll(dir, os.ModePerm); err != nil {
return nil, errors.Wrap(err, "cannot create directory for workspace")
Expand All @@ -122,6 +141,9 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient
}
l := ws.logger.WithValues("workspace", dir)
ws.mu.Lock()
if err := ws.startSharedServer(); err != nil {
return nil, err
}
w, ok := ws.store[tr.GetUID()]
if !ok {
ws.store[tr.GetUID()] = NewWorkspace(dir, WithLogger(l), WithExecutor(ws.executor))
Expand Down

0 comments on commit 3f3dce8

Please sign in to comment.