Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Add shared gRPC mode to run the native provider #267

Merged
merged 6 commits into from
Apr 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/terraform/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package terraform
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -139,7 +138,7 @@ func (fp *FileProducer) WriteTFState(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "cannot marshal state object")
}
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "terraform.tfstate"), rawState, os.ModePerm), "cannot write tfstate file")
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "terraform.tfstate"), rawState, 0600), "cannot write tfstate file")
}

// WriteMainTF writes the content main configuration file that has the desired
Expand Down Expand Up @@ -194,5 +193,5 @@ func (fp *FileProducer) WriteMainTF() error {
if err != nil {
return errors.Wrap(err, "cannot marshal main hcl object")
}
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, os.ModePerm), "cannot write tfstate file")
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), "cannot write tfstate file")
}
170 changes: 170 additions & 0 deletions pkg/terraform/provider_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
Copyright 2022 The Crossplane Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terraform

import (
"bufio"
"regexp"
"sync"
"time"

"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/utils/exec"
)

const (
// error messages
errFmtTimeout = "timed out after %v while waiting for the reattach configuration string"

// an example value would be: '{"registry.terraform.io/hashicorp/aws": {"Protocol": "grpc", "ProtocolVersion":5, "Pid":... "Addr":{"Network": "unix","String": "..."}}}'
envReattachConfig = "TF_REATTACH_PROVIDERS"
regexReattachLine = envReattachConfig + `='(.*)'`
reattachTimeout = 1 * time.Minute
)

// ProviderRunner is the interface for running
// Terraform native provider processes in the shared
// gRPC server mode
type ProviderRunner interface {
Start() (string, error)
}

// NoOpProviderRunner is a no-op ProviderRunner
type NoOpProviderRunner struct{}
muvaf marked this conversation as resolved.
Show resolved Hide resolved

// NewNoOpProviderRunner constructs a new NoOpProviderRunner
func NewNoOpProviderRunner() NoOpProviderRunner {
return NoOpProviderRunner{}
}

// Start takes no action
func (NoOpProviderRunner) Start() (string, error) {
return "", nil
}

// SharedProvider runs the configured native provider plugin
// using the supplied command-line args
type SharedProvider struct {
nativeProviderPath string
nativeProviderArgs []string
reattachConfig string
logger logging.Logger
executor exec.Interface
clock clock.Clock
mu *sync.Mutex
}

// SharedGRPCRunnerOption lets you configure the shared gRPC runner.
type SharedGRPCRunnerOption func(runner *SharedProvider)

// WithNativeProviderArgs are the arguments to be passed to the native provider
func WithNativeProviderArgs(args ...string) SharedGRPCRunnerOption {
return func(sr *SharedProvider) {
sr.nativeProviderArgs = args
}
}

// WithNativeProviderExecutor sets the process executor to be used
func WithNativeProviderExecutor(e exec.Interface) SharedGRPCRunnerOption {
return func(sr *SharedProvider) {
sr.executor = e
}
}

// NewSharedProvider instantiates a SharedProvider with an
// OS executor using the supplied logger
func NewSharedProvider(l logging.Logger, nativeProviderPath string, opts ...SharedGRPCRunnerOption) *SharedProvider {
sr := &SharedProvider{
logger: l,
nativeProviderPath: nativeProviderPath,
executor: exec.New(),
clock: clock.RealClock{},
mu: &sync.Mutex{},
}
for _, o := range opts {
o(sr)
}
return sr
}

// Start starts a shared gRPC server if not already running
// A logger, native provider's path and command-line arguments to be
// passed to it must have been properly configured.
// Returns any errors encountered and the reattachment configuration for
// the native provider.
func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
sr.mu.Lock()
defer sr.mu.Unlock()
log := sr.logger.WithValues("nativeProviderPath", sr.nativeProviderPath, "nativeProviderArgs", sr.nativeProviderArgs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: we can do this initialization of log in NewSharedProvider so that it's done only once.

if sr.reattachConfig != "" {
log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig)
return sr.reattachConfig, nil
}
errCh := make(chan error, 1)
reattachCh := make(chan string, 1)
re, err := regexp.Compile(regexReattachLine)
if err != nil {
return "", errors.Wrap(err, "failed to compile regexp")
}

go func() {
defer close(errCh)
defer close(reattachCh)
defer func() {
sr.mu.Lock()
sr.reattachConfig = ""
sr.mu.Unlock()
}()
//#nosec G204 no user input
cmd := sr.executor.Command(sr.nativeProviderPath, sr.nativeProviderArgs...)
stdout, err := cmd.StdoutPipe()
if err != nil {
errCh <- err
return
}
if err := cmd.Start(); err != nil {
errCh <- err
return
}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
t := scanner.Text()
matches := re.FindStringSubmatch(t)
if matches == nil {
continue
}
reattachCh <- matches[1]
break
}
if err := cmd.Wait(); err != nil {
log.Info("Native Terraform provider process error", "error", err)
errCh <- err
}
}()

select {
case reattachConfig := <-reattachCh:
sr.reattachConfig = reattachConfig
return sr.reattachConfig, nil
case err := <-errCh:
return "", err
case <-sr.clock.After(reattachTimeout):
return "", errors.Errorf(errFmtTimeout, reattachTimeout)
}
}
189 changes: 189 additions & 0 deletions pkg/terraform/provider_runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
Copyright 2022 The Crossplane Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terraform

import (
"io"
"reflect"
"strings"
"sync"
"testing"
"time"

"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/test"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/utils/exec"
testingexec "k8s.io/utils/exec/testing"
)

func TestStartSharedServer(t *testing.T) {
testPath := "path"
testArgs := []string{"arg1", "arg2"}
testReattachConfig1 := `TF_REATTACH_PROVIDERS='test1'`
testReattachConfig2 := `TF_REATTACH_PROVIDERS='test2'`
testErr := errors.New("boom")
type args struct {
runner ProviderRunner
}
type want struct {
reattachConfig string
err error
}
tests := map[string]struct {
args args
want want
}{
"NotConfiguredNoOp": {
args: args{
runner: NewNoOpProviderRunner(),
},
},
"SuccessfullyStarted": {
args: args{
runner: NewSharedProvider(logging.NewNopLogger(), testPath, WithNativeProviderArgs(testArgs...),
WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, nil))),
},
want: want{
reattachConfig: "test1",
},
},
"AlreadyRunning": {
args: args{
runner: &SharedProvider{
nativeProviderPath: testPath,
reattachConfig: "test1",
logger: logging.NewNopLogger(),
executor: newExecutorWithStoutPipe(testReattachConfig2, nil),
mu: &sync.Mutex{},
},
},
want: want{
reattachConfig: "test1",
},
},
"NativeProviderError": {
args: args{
runner: NewSharedProvider(logging.NewNopLogger(), testPath,
WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, testErr))),
},
want: want{
err: testErr,
},
},
"NativeProviderTimeout": {
args: args{
runner: &SharedProvider{
nativeProviderPath: testPath,
logger: logging.NewNopLogger(),
executor: newExecutorWithStoutPipe("invalid", nil),
mu: &sync.Mutex{},
clock: &fakeClock{},
},
},
want: want{
err: errors.Errorf(errFmtTimeout, reattachTimeout),
},
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
reattachConfig, err := tt.args.runner.Start()
if diff := cmp.Diff(tt.want.err, err, test.EquateErrors()); diff != "" {
t.Errorf("\n%s\nStartSharedServer(): -want error, +got error:\n%s", name, diff)
}
if err != nil {
return
}
if diff := cmp.Diff(reattachConfig, tt.want.reattachConfig); diff != "" {
t.Errorf("\n%s\nStartSharedServer(): -want reattachConfig, +got reattachConfig:\n%s", name, diff)
}
})
}
}

type fakeClock struct {
clock.FakeClock
}

func (f *fakeClock) After(d time.Duration) <-chan time.Time {
defer func() {
f.Step(reattachTimeout)
}()
return f.FakeClock.After(d)
}

func newExecutorWithStoutPipe(reattachConfig string, err error) exec.Interface {
return &testingexec.FakeExec{
CommandScript: []testingexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd {
return &testingexec.FakeCmd{
StdoutPipeResponse: testingexec.FakeStdIOPipeResponse{
ReadCloser: io.NopCloser(strings.NewReader(reattachConfig)),
Error: err,
},
}
},
},
}
}

func TestWithNativeProviderArgs(t *testing.T) {
tests := map[string]struct {
args []string
want []string
}{
"NotConfigured": {},
"Configured": {
args: []string{"a", "b", "c"},
want: []string{"a", "b", "c"},
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
sr := &SharedProvider{}
WithNativeProviderArgs(tt.args...)(sr)
if !reflect.DeepEqual(sr.nativeProviderArgs, tt.want) {
t.Errorf("WithNativeProviderArgs(tt.args) = %v, want %v", sr.nativeProviderArgs, tt.want)
}
})
}
}

func TestWithNativeProviderExecutor(t *testing.T) {
tests := map[string]struct {
executor exec.Interface
want exec.Interface
}{
"NotConfigured": {},
"Configured": {
executor: &testingexec.FakeExec{},
want: &testingexec.FakeExec{},
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
sr := &SharedProvider{}
WithNativeProviderExecutor(tt.executor)(sr)
if !reflect.DeepEqual(sr.executor, tt.want) {
t.Errorf("WithNativeProviderExecutor(tt.executor) = %v, want %v", sr.executor, tt.want)
}
})
}
}
Loading