-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add SQS polling infrastructure (#187)
* sqs * mark false * remove unneeded import * first pass comments * remove header * no-op * move interface
- Loading branch information
1 parent
59ae206
commit 688a360
Showing
11 changed files
with
829 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
server/controllers/events/mocks/matchers/http_responsewriter.go
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
102 changes: 102 additions & 0 deletions
102
server/controllers/events/mocks/mock_vcs_post_handler.go
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package sqs | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"github.com/aws/aws-sdk-go-v2/service/sqs/types" | ||
"github.com/pkg/errors" | ||
"github.com/uber-go/tally" | ||
"net/http" | ||
) | ||
|
||
//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_sqs_message_handler.go MessageProcessor | ||
type MessageProcessor interface { | ||
ProcessMessage(types.Message) error | ||
} | ||
|
||
//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_vcs_post_handler.go VCSPostHandler | ||
type VCSPostHandler interface { | ||
Post(w http.ResponseWriter, r *http.Request) | ||
} | ||
|
||
type VCSEventMessageProcessor struct { | ||
PostHandler VCSPostHandler | ||
} | ||
|
||
func (p *VCSEventMessageProcessor) ProcessMessage(msg types.Message) error { | ||
if msg.Body == nil { | ||
return errors.New("message received from sqs has no body") | ||
} | ||
|
||
buffer := bytes.NewBufferString(*msg.Body) | ||
buf := bufio.NewReader(buffer) | ||
req, err := http.ReadRequest(buf) | ||
if err != nil { | ||
return errors.Wrap(err, "reading bytes from sqs into http request") | ||
} | ||
|
||
// using a no-op writer since we shouldn't send response back in worker mode | ||
p.PostHandler.Post(&NoOpResponseWriter{}, req) | ||
return nil | ||
} | ||
|
||
type VCSEventMessageProcessorStats struct { | ||
Scope tally.Scope | ||
VCSEventMessageProcessor | ||
} | ||
|
||
func (s *VCSEventMessageProcessorStats) ProcessMessage(msg types.Message) error { | ||
successCount := s.Scope.Counter(Success) | ||
errorCount := s.Scope.Counter(Error) | ||
|
||
if err := s.VCSEventMessageProcessor.ProcessMessage(msg); err != nil { | ||
errorCount.Inc(1) | ||
return err | ||
} | ||
successCount.Inc(1) | ||
return nil | ||
} | ||
|
||
type NoOpResponseWriter struct{} | ||
|
||
func (n *NoOpResponseWriter) Header() http.Header { | ||
return nil | ||
} | ||
|
||
func (n *NoOpResponseWriter) Write([]byte) (int, error) { | ||
return 0, nil | ||
} | ||
|
||
func (n *NoOpResponseWriter) WriteHeader(statusCode int) {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package sqs_test | ||
|
||
import ( | ||
"bytes" | ||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/sqs/types" | ||
. "github.com/petergtz/pegomock" | ||
controller_mocks "github.com/runatlantis/atlantis/server/controllers/events/mocks" | ||
"github.com/runatlantis/atlantis/server/controllers/events/mocks/matchers" | ||
"github.com/runatlantis/atlantis/server/lyft/aws/sqs" | ||
. "github.com/runatlantis/atlantis/testing" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/uber-go/tally" | ||
"net/http" | ||
"net/url" | ||
|
||
"testing" | ||
) | ||
|
||
func TestAtlantisMessageHandler_PostSuccess(t *testing.T) { | ||
RegisterMockTestingT(t) | ||
testScope := tally.NewTestScope("test", nil) | ||
req := createExampleRequest(t) | ||
mockPostHandler := controller_mocks.NewMockVCSPostHandler() | ||
handler := &sqs.VCSEventMessageProcessorStats{ | ||
VCSEventMessageProcessor: sqs.VCSEventMessageProcessor{ | ||
PostHandler: mockPostHandler, | ||
}, | ||
Scope: testScope, | ||
} | ||
|
||
err := handler.ProcessMessage(toSqsMessage(t, req)) | ||
assert.NoError(t, err) | ||
mockPostHandler.VerifyWasCalledOnce().Post(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest()) | ||
Assert(t, testScope.Snapshot().Counters()["test.success+"].Value() == 1, "message handler was successful") | ||
} | ||
|
||
func TestAtlantisMessageHandler_Error(t *testing.T) { | ||
RegisterMockTestingT(t) | ||
testScope := tally.NewTestScope("test", nil) | ||
mockPostHandler := controller_mocks.NewMockVCSPostHandler() | ||
handler := &sqs.VCSEventMessageProcessorStats{ | ||
VCSEventMessageProcessor: sqs.VCSEventMessageProcessor{ | ||
PostHandler: mockPostHandler, | ||
}, | ||
Scope: testScope, | ||
} | ||
invalidMessage := types.Message{} | ||
err := handler.ProcessMessage(invalidMessage) | ||
assert.Error(t, err) | ||
mockPostHandler.VerifyWasCalled(Never()).Post(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest()) | ||
Assert(t, testScope.Snapshot().Counters()["test.error+"].Value() == 1, "message handler was not successful") | ||
} | ||
|
||
func toSqsMessage(t *testing.T, req *http.Request) types.Message { | ||
buffer := bytes.NewBuffer([]byte{}) | ||
err := req.Write(buffer) | ||
assert.NoError(t, err) | ||
return types.Message{ | ||
Body: aws.String(string(buffer.Bytes())), | ||
} | ||
} | ||
|
||
func createExampleRequest(t *testing.T) *http.Request { | ||
url, err := url.Parse("http://www.atlantis.com") | ||
assert.NoError(t, err) | ||
req := &http.Request{ | ||
URL: url, | ||
} | ||
return req | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.