Skip to content

Commit

Permalink
Setup Storage interface for log persistence (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aayyush authored Feb 16, 2022
1 parent 6ef3407 commit 59ae206
Show file tree
Hide file tree
Showing 20 changed files with 1,224 additions and 319 deletions.
12 changes: 3 additions & 9 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package websocket

import (
"fmt"
"net/http"

"github.com/gorilla/websocket"
Expand All @@ -18,8 +17,6 @@ type PartitionKeyGenerator interface {
// and is responsible for registering/deregistering new buffers
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
IsKeyExists(key string) bool
}

// Multiplexor is responsible for handling the data transfer between the storage layer
Expand Down Expand Up @@ -53,18 +50,15 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
return errors.Wrapf(err, "generating partition key")
}

// check if the job ID exists before registering receiver
if !m.registry.IsKeyExists(key) {
return fmt.Errorf("invalid key: %s", key)
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)

// Note: Here we register the key without checking if the job exists because
// if the job DNE, the job is marked complete and we close the ws conn immediately

// spinning up a goroutine for this since we are attempting to block on the read side.
go m.registry.Register(key, buffer)
defer m.registry.Deregister(key, buffer)

return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key)
}
4 changes: 2 additions & 2 deletions server/core/terraform/async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message, false)
c.projectCmdOutputHandler.Send(ctx, message)
}
wg.Done()
}()
Expand All @@ -102,7 +102,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message, false)
c.projectCmdOutputHandler.Send(ctx, message)
}
wg.Done()
}()
Expand Down
97 changes: 97 additions & 0 deletions server/events/mocks/mock_job_closer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,30 @@ type JobURLSetter interface {
SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error
}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_message_sender.go JobMessageSender
//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_closer.go JobCloser

type JobMessageSender interface {
Send(ctx models.ProjectCommandContext, msg string, operationComplete bool)
// Job Closer closes a job by marking op complete and clearing up buffers if logs are successfully persisted
type JobCloser interface {
CloseJob(jobID string)
}

// ProjectOutputWrapper is a decorator that creates a new PR status check per project.
// The status contains a url that outputs current progress of the terraform plan/apply command.
type ProjectOutputWrapper struct {
ProjectCommandRunner
JobMessageSender JobMessageSender
JobURLSetter JobURLSetter
JobURLSetter JobURLSetter
JobCloser JobCloser
}

func (p *ProjectOutputWrapper) Plan(ctx models.ProjectCommandContext) models.ProjectResult {
result := p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan)
p.JobMessageSender.Send(ctx, "", OperationComplete)
p.JobCloser.CloseJob(ctx.JobID)
return result
}

func (p *ProjectOutputWrapper) Apply(ctx models.ProjectCommandContext) models.ProjectResult {
result := p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply)
p.JobMessageSender.Send(ctx, "", OperationComplete)
p.JobCloser.CloseJob(ctx.JobID)
return result
}

Expand Down
4 changes: 2 additions & 2 deletions server/events/project_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ func TestProjectOutputWrapper(t *testing.T) {
var expCommitStatus models.CommitStatus

mockJobURLSetter := eventmocks.NewMockJobURLSetter()
mockJobMessageSender := eventmocks.NewMockJobMessageSender()
mockJobCloser := eventmocks.NewMockJobCloser()
mockProjectCommandRunner := mocks.NewMockProjectCommandRunner()

runner := &events.ProjectOutputWrapper{
JobURLSetter: mockJobURLSetter,
JobMessageSender: mockJobMessageSender,
JobCloser: mockJobCloser,
ProjectCommandRunner: mockProjectCommandRunner,
}

Expand Down
8 changes: 5 additions & 3 deletions server/events/pull_closed_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/models/fixtures"
vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
loggermocks "github.com/runatlantis/atlantis/server/logging/mocks"
. "github.com/runatlantis/atlantis/testing"
)
Expand Down Expand Up @@ -197,7 +198,8 @@ func TestCleanUpLogStreaming(t *testing.T) {

// Create Log streaming resources
prjCmdOutput := make(chan *jobs.ProjectCmdOutputLine)
prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger)
storageBackend := jobmocks.NewMockStorageBackend()
prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger, jobs.NewJobStore(storageBackend))
ctx := models.ProjectCommandContext{
BaseRepo: fixtures.GithubRepo,
Pull: fixtures.Pull,
Expand All @@ -206,7 +208,7 @@ func TestCleanUpLogStreaming(t *testing.T) {
}

go prjCmdOutHandler.Handle()
prjCmdOutHandler.Send(ctx, "Test Message", false)
prjCmdOutHandler.Send(ctx, "Test Message")

// Create boltdb and add pull request.
var lockBucket = "bucket"
Expand Down Expand Up @@ -280,7 +282,7 @@ func TestCleanUpLogStreaming(t *testing.T) {

// Assert log streaming resources are cleaned up.
dfPrjCmdOutputHandler := prjCmdOutHandler.(*jobs.AsyncProjectCommandOutputHandler)
assert.Empty(t, dfPrjCmdOutputHandler.GetProjectOutputBuffer(ctx.PullInfo()))
assert.Empty(t, dfPrjCmdOutputHandler.GetJob(ctx.PullInfo()).Output)
assert.Empty(t, dfPrjCmdOutputHandler.GetReceiverBufferForPull(ctx.PullInfo()))
})
}
147 changes: 147 additions & 0 deletions server/jobs/job_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package jobs

import (
"fmt"
"sync"

"github.com/pkg/errors"
)

type JobStatus int

const (
Processing JobStatus = iota
Complete
)

type Job struct {
Output []string
Status JobStatus
}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_store.go JobStore

type JobStore interface {
// Gets the job from the in memory buffer, if available and if not, reaches to the storage backend
// Returns an empty job with error if not in storage backend
Get(jobID string) (Job, error)

// Appends a given string to a job's output if the job is not complete yet
AppendOutput(jobID string, output string) error

// Sets a job status to complete and triggers any associated workflow,
// e.g: if the status is complete, the job is flushed to the associated storage backend
SetJobCompleteStatus(jobID string, status JobStatus) error

// Removes a job from the store
RemoveJob(jobID string)
}

func NewJobStore(storageBackend StorageBackend) *LayeredJobStore {
return &LayeredJobStore{
jobs: map[string]*Job{},
storageBackend: storageBackend,
}
}

// Setup job store for testing
func NewTestJobStore(storageBackend StorageBackend, jobs map[string]*Job) *LayeredJobStore {
return &LayeredJobStore{
jobs: jobs,
storageBackend: storageBackend,
}
}

// layeredJobStore is a job store with one or more than one layers of persistence
// storageBackend in this case
type LayeredJobStore struct {
jobs map[string]*Job
storageBackend StorageBackend
lock sync.RWMutex
}

func (j *LayeredJobStore) Get(jobID string) (Job, error) {
// Get from memory if available
if job, ok := j.GetJobFromMemory(jobID); ok {
return job, nil
}

// Get from storage backend if not in memory.
logs, err := j.storageBackend.Read(jobID)
if err != nil {
return Job{}, err
}

// If read from storage backend, mark job complete so that the conn
// can be closed
return Job{
Output: logs,
Status: Complete,
}, nil
}

func (j *LayeredJobStore) GetJobFromMemory(jobID string) (Job, bool) {
j.lock.RLock()
defer j.lock.RUnlock()

if j.jobs[jobID] == nil {
return Job{}, false
}
return *j.jobs[jobID], true
}

func (j *LayeredJobStore) AppendOutput(jobID string, output string) error {
j.lock.Lock()
defer j.lock.Unlock()

// Create new job if job dne
if j.jobs[jobID] == nil {
j.jobs[jobID] = &Job{}
}

if j.jobs[jobID].Status == Complete {
return fmt.Errorf("cannot append to a complete job")
}

updatedOutput := append(j.jobs[jobID].Output, output)
j.jobs[jobID].Output = updatedOutput
return nil
}

func (j *LayeredJobStore) RemoveJob(jobID string) {
j.lock.Lock()
defer j.lock.Unlock()

delete(j.jobs, jobID)
}

func (j *LayeredJobStore) SetJobCompleteStatus(jobID string, status JobStatus) error {
j.lock.Lock()
defer j.lock.Unlock()

// Error out when job dne
if j.jobs[jobID] == nil {
return fmt.Errorf("job: %s does not exist", jobID)
}

// Error when job is already set to complete
if job := j.jobs[jobID]; job.Status == Complete {
return fmt.Errorf("job: %s is already complete", jobID)
}

job := j.jobs[jobID]
job.Status = Complete

// Persist to storage backend
ok, err := j.storageBackend.Write(jobID, job.Output)
if err != nil {
return errors.Wrapf(err, "error persisting job: %s", jobID)
}

// Clear output buffers if successfully persisted
if ok {
delete(j.jobs, jobID)
}

return nil
}
Loading

0 comments on commit 59ae206

Please sign in to comment.