Skip to content

Commit

Permalink
Add temporal prototype and readme. (#190)
Browse files Browse the repository at this point in the history
* Add temporal prototype and readme.

* Delete unnecessary file.

* Add support for sessions and configurable data dirs.
  • Loading branch information
nishkrishnan committed Feb 24, 2022
1 parent 7e2e205 commit a6b6443
Show file tree
Hide file tree
Showing 10 changed files with 814 additions and 15 deletions.
26 changes: 19 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ require (
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/net v0.0.0-20211109214657-ef0fda0de508 // indirect
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/sys v0.0.0-20211110154304-99a53858aa08 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/tools v0.1.2 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.44.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
google.golang.org/genproto v0.0.0-20211104193956-4c6863e31247 // indirect
google.golang.org/grpc v1.42.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/ini.v1 v1.62.0 // indirect
Expand All @@ -134,6 +134,8 @@ require (
github.com/uber-go/tally v3.4.3+incompatible
)

require go.temporal.io/sdk v1.13.0

require (
github.com/aws/aws-sdk-go-v2 v1.13.0
github.com/aws/aws-sdk-go-v2/config v1.13.1
Expand All @@ -151,5 +153,15 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 // indirect
github.com/aws/smithy-go v1.10.0 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/onsi/ginkgo v1.14.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
go.temporal.io/api v1.6.1-0.20211110205628-60c98e9cbfe2 // indirect
)
65 changes: 57 additions & 8 deletions go.sum

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/runatlantis/atlantis/cmd"
"github.com/runatlantis/atlantis/server/logging"
temporalCmd "github.com/runatlantis/atlantis/server/lyft/temporal/cmd"
"github.com/spf13/viper"
)

Expand All @@ -43,6 +44,8 @@ func main() {
}
version := &cmd.VersionCmd{AtlantisVersion: atlantisVersion}
testdrive := &cmd.TestdriveCmd{}
cmd.RootCmd.AddCommand(temporalCmd.NewWorkerCmd(temporalCmd.WorkerConfig{}))
cmd.RootCmd.AddCommand(temporalCmd.NewServerCmd())
cmd.RootCmd.AddCommand(server.Init())
cmd.RootCmd.AddCommand(version.Init())
cmd.RootCmd.AddCommand(testdrive.Init())
Expand Down
61 changes: 61 additions & 0 deletions server/lyft/temporal/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Deployment Prototype via Temporal

This workflow emulates a typical deployment system where there exists a deployment queue for a single repository and each revision that needs to be deployed in order. The workflow iterates through that queue serially and runs the dry run steps (terraform init, plan). Approval is required to run the terraform apply operation.

terraform data (plan files/archives) are kept locally and worker sessions are used to ensure that terraform activities within a given workflow are run on the same worker and therefore access the same data directory.

## Setup

In order to run this workflow a temporal cluster needs to be running. I usually just run a local version of this:

```
git clone git@github.com/danielhochman/docker-compose
cd docker-compose
docker-compose up -d
```

Next we'll want to start the worker:

```
go run main.go worker --ghuser nishkrishnan --ghtoken <GITHUB_ACCESS_TOKEN>
```

Finally we'll want to start the application server which is responsible for translating api requests to workflow executions/signals:

```
go run main.go application-server
```

Now we are ready to start making requests to the server.

## Request Types

Deploy/Queue a revision

```
curl -H 'Content-Type: application/json' -d '{
"Repo": {
"Owner": "<OWNER>",
"Name" : "<REPO>"
},
"Branch": "<BRANCH>",
"Revision" : "<SHA>"
}' localhost:9000/api/deploy
```

Approve a deployment

```
curl -H 'Content-Type: application/json' -d '{
"User": "<USER>",
"Status": 0,
"RunID": "<RUN_ID>",
"WorkflowID" : "<SHA>"
}' localhost:9000/api/plan_review
```

Note: run id can be found by looking at the worker logs, in an ideal world this info is relayed back to the client.




80 changes: 80 additions & 0 deletions server/lyft/temporal/activities/clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package activities

import (
"context"
"fmt"
"os"
"os/exec"
"strings"

"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/events/models"
"go.temporal.io/sdk/activity"
)

type CloneActivityRequest struct {
Repo models.Repo
Branch string
Dir string
Revision string
}

type CloneActivityResponse struct {
Dir string
}

func Clone(ctx context.Context, request CloneActivityRequest) (CloneActivityResponse, error) {
log := activity.GetLogger(ctx)

cloneDir := request.Dir
headRepo := request.Repo

err := os.RemoveAll(cloneDir)
if err != nil {
return CloneActivityResponse{}, errors.Wrapf(err, "deleting dir %q before cloning", cloneDir)
}

// Create the directory and parents if necessary.
log.Info("creating dir %q", cloneDir)
if err := os.MkdirAll(cloneDir, 0700); err != nil {
return CloneActivityResponse{}, errors.Wrap(err, "creating new workspace")
}

headCloneURL := headRepo.CloneURL

var cmds = [][]string{
{
"git", "clone", "--branch", request.Branch, "--single-branch", headCloneURL, cloneDir,
},
{
"git", "checkout", request.Revision,
},
}

for _, args := range cmds {
cmd := exec.Command(args[0], args[1:]...) // nolint: gosec
cmd.Dir = cloneDir
// The git merge command requires these env vars are set.
cmd.Env = append(os.Environ(), []string{
"EMAIL=atlantis@runatlantis.io",
"GIT_AUTHOR_NAME=atlantis",
"GIT_COMMITTER_NAME=atlantis",
}...)

cmdStr := sanitizeGitCredentials(strings.Join(cmd.Args, " "), headRepo)
output, err := cmd.CombinedOutput()
sanitizedOutput := sanitizeGitCredentials(string(output), headRepo)
if err != nil {
sanitizedErrMsg := sanitizeGitCredentials(err.Error(), headRepo)
return CloneActivityResponse{}, fmt.Errorf("running %s: %s: %s", cmdStr, sanitizedOutput, sanitizedErrMsg)
}
log.Debug("ran: %s. Output: %s", cmdStr, strings.TrimSuffix(sanitizedOutput, "\n"))
}
return CloneActivityResponse{Dir: cloneDir}, nil
}

// sanitizeGitCredentials replaces any git clone urls that contain credentials
// in s with the sanitized versions.
func sanitizeGitCredentials(s string, head models.Repo) string {
return strings.Replace(s, head.CloneURL, head.SanitizedCloneURL, -1)
}
97 changes: 97 additions & 0 deletions server/lyft/temporal/activities/terraform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package activities

import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/pkg/errors"
)

type InitRequest struct {
RootDir string
}

type InitResponse struct {
Output string
}

type PlanRequest struct {
RootDir string
}

type PlanResponse struct {
Output string
Planfile string
}

type ApplyRequest struct {
RootDir string
Planfile string
}

type ApplyResponse struct {
Output string
}

func Init(ctx context.Context, request InitRequest) (InitResponse, error) {
output, err := terraform(request.RootDir, "init", "-input=false")

// output using fmt instead of logger to get pretty formatting.
// this is probably susceptible to duplication though in the event of replays
// this is only for prototype purposes anyways.
fmt.Println(string(output))

if err != nil {
return InitResponse{}, errors.Wrap(err, "running terraform init")
}

return InitResponse{Output: string(output)}, nil
}

func Plan(ctx context.Context, request PlanRequest) (PlanResponse, error) {
planFile := "plan.tfplan"

output, err := terraform(request.RootDir, "plan", "-input=false", "-refresh", "-out", fmt.Sprintf("%q", planFile))

// output using fmt instead of logger to get pretty formatting.
// this is probably susceptible to duplication though in the event of replays
// this is only for prototype purposes anyways.
fmt.Println(string(output))

if err != nil {
return PlanResponse{}, errors.Wrap(err, "running terraform plan")
}

return PlanResponse{
Output: string(output),
Planfile: planFile,
}, nil
}

func Apply(ctx context.Context, request ApplyRequest) (ApplyResponse, error) {
output, err := terraform(request.RootDir, "apply", "-input=false", filepath.Join(request.RootDir, request.Planfile))

// output using fmt instead of logger to get pretty formatting.
// this is probably susceptible to duplication though in the event of replays
// this is only for prototype purposes anyways.
fmt.Println(string(output))

if err != nil {
return ApplyResponse{}, errors.Wrap(err, "running terraform apply")
}

return ApplyResponse{Output: string(output)}, nil
}

func terraform(dir string, args ...string) ([]byte, error) {
tfCmd := fmt.Sprintf("terraform %s", strings.Join(args, " "))
cmd := exec.Command("sh", "-c", tfCmd)
cmd.Dir = dir
cmd.Env = os.Environ()

return cmd.CombinedOutput()
}
60 changes: 60 additions & 0 deletions server/lyft/temporal/activities/vcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package activities

import (
"context"
"strings"

"github.com/google/go-github/v31/github"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/events"
"github.com/runatlantis/atlantis/server/events/models"
)

type VCSClientWrapper struct {
client *github.Client
eventParser *events.EventParser
}

func NewVCSClientWrapper(githubUser string, githubToken string) *VCSClientWrapper {
eventParser := &events.EventParser{
GithubUser: githubUser,
GithubToken: githubToken,
}

transport := &github.BasicAuthTransport{
Username: strings.TrimSpace(githubUser),
Password: strings.TrimSpace(githubToken),
}

client := github.NewClient(transport.Client())

return &VCSClientWrapper{
client: client,
eventParser: eventParser,
}
}

type GetRepositoryRequest struct {
Owner string
Repo string
}

type GetRepositoryResponse struct {
Repo models.Repo
}

func (r *VCSClientWrapper) GetRepository(ctx context.Context, request GetRepositoryRequest) (GetRepositoryResponse, error) {
rawRepo, _, err := r.client.Repositories.Get(ctx, request.Owner, request.Repo)

if err != nil {
return GetRepositoryResponse{}, errors.Wrapf(err, "getting github repo %s/%s", request.Owner, request.Repo)
}

repository, err := r.eventParser.ParseGithubRepo(rawRepo)

if err != nil {
return GetRepositoryResponse{}, errors.Wrapf(err, "parsing github repo %s from response", *rawRepo.Name)
}

return GetRepositoryResponse{Repo: repository}, err
}
Loading

0 comments on commit a6b6443

Please sign in to comment.