Skip to content

Commit

Permalink
Add SQSProvider (#399)
Browse files Browse the repository at this point in the history
* Add SQSProvider

* Update OpenAPI
  • Loading branch information
alexdebrie committed Mar 25, 2018
1 parent 4937d70 commit f673e72
Show file tree
Hide file tree
Showing 7 changed files with 1,067 additions and 6 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Discover and call serverless functions from anything that can reach the Event Ga
following function types:

* FaaS functions (AWS Lambda, Google Cloud Functions, Azure Functions, OpenWhisk Actions)
* Connectors (AWS Kinesis, AWS Kinesis Firehose)
* Connectors (AWS Kinesis, AWS Kinesis Firehose, AWS SQS)
* HTTP endpoints/Webhook (e.g. POST http://example.com/function)

Function Discovery stores information about functions allowing the Event Gateway to call them as a reaction to received
Expand Down Expand Up @@ -461,6 +461,12 @@ JSON object:
* `awsAccessKeyId` - `string` - optional, AWS API key ID. By default credentials from the [environment](http://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials) are used.
* `awsSecretAccessKey` - `string` - optional, AWS API access key. By default credentials from the [environment](http://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials) are used.
* `awsSessionToken` - `string` - optional, AWS session token
* for AWS SQS connector:
* `queueUrl` - `string` - required, AWS SQS Queue URL
* `region` - `string` - required, region name
* `awsAccessKeyId` - `string` - optional, AWS API key ID. By default credentials from the [environment](http://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials) are used.
* `awsSecretAccessKey` - `string` - optional, AWS API access key. By default credentials from the [environment](http://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials) are used.
* `awsSessionToken` - `string` - optional, AWS session token

**Response**

Expand Down
1 change: 1 addition & 0 deletions cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "github.com/serverless/event-gateway/providers/awsfirehose"
_ "github.com/serverless/event-gateway/providers/awskinesis"
_ "github.com/serverless/event-gateway/providers/awslambda"
_ "github.com/serverless/event-gateway/providers/awssqs"
_ "github.com/serverless/event-gateway/providers/http"
)

Expand Down
46 changes: 41 additions & 5 deletions docs/openapi/openapi-config-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,19 @@ components:
type: string
description: "function provider"
enum:
- awslambda
- awsfirehose
- awskinesis
- awslambda
- awssqs
- http
Provider:
type: object
description: "function provider configuration"
oneOf:
- $ref: '#/components/schemas/AWSFirehose'
- $ref: '#/components/schemas/AWSKinesis'
- $ref: '#/components/schemas/AWSLambda'
- $ref: '#/components/schemas/AWSSQS'
- $ref: '#/components/schemas/HTTP'
Function:
type: object
Expand Down Expand Up @@ -265,11 +269,11 @@ components:
type: array
items:
$ref: '#/components/schemas/Subscription'
AWSLambda:
AWSFirehose:
type: object
properties:
arn:
$ref: '#/components/schemas/ARN'
deliveryStreamName:
$ref: '#/components/schemas/DeliveryStreamName'
region:
$ref: '#/components/schemas/Region'
awsAccessKeyId:
Expand All @@ -291,6 +295,32 @@ components:
$ref: '#/components/schemas/AWSSecretAccessKey'
awsSessionToken:
$ref: '#/components/schemas/AWSSessionToken'
AWSLambda:
type: object
properties:
arn:
$ref: '#/components/schemas/ARN'
region:
$ref: '#/components/schemas/Region'
awsAccessKeyId:
$ref: '#/components/schemas/AWSAccessKeyId'
awsSecretAccessKey:
$ref: '#/components/schemas/AWSSecretAccessKey'
awsSessionToken:
$ref: '#/components/schemas/AWSSessionToken'
AWSSQS:
type: object
properties:
queueUrl:
$ref: '#/components/schemas/QueueURL'
region:
$ref: '#/components/schemas/Region'
awsAccessKeyId:
$ref: '#/components/schemas/AWSAccessKeyId'
awsSecretAccessKey:
$ref: '#/components/schemas/AWSSecretAccessKey'
awsSessionToken:
$ref: '#/components/schemas/AWSSessionToken'
HTTP:
type: object
properties:
Expand All @@ -314,6 +344,12 @@ components:
StreamName:
type: string
description: "AWS Kinesis stream name"
DeliveryStreamName:
type: string
description: "AWS Firehose delivery stream name"
QueueURL:
type: string
description: "AWS SQS Queue URL"
URL:
type: string
format: url
Expand Down Expand Up @@ -456,4 +492,4 @@ components:
content:
application/json:
schema:
$ref: '#/components/schemas/Errors'
$ref: '#/components/schemas/Errors'
106 changes: 106 additions & 0 deletions providers/awssqs/awssqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package awssqs

import (
"encoding/json"
"errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/serverless/event-gateway/function"
"go.uber.org/zap/zapcore"
validator "gopkg.in/go-playground/validator.v9"
)

// Type of provider.
const Type = function.ProviderType("awssqs")

func init() {
function.RegisterProvider(Type, ProviderLoader{})
}

// AWSSQS function implementation
type AWSSQS struct {
Service sqsiface.SQSAPI`json:"-" validate:"-"`

QueueURL string `json:"queueUrl" validate:"required"`
Region string `json:"region" validate:"required"`
AWSAccessKeyID string `json:"awsAccessKeyId,omitempty"`
AWSSecretAccessKey string `json:"awsSecretAccessKey,omitempty"`
AWSSessionToken string `json:"awsSessionToken,omitempty"`
}

// Call sends message to AWS SQS Queue
func (a AWSSQS) Call(payload []byte) ([]byte, error) {
body := string(payload)
sendMessageOutput, err := a.Service.SendMessage(&sqs.SendMessageInput{
QueueUrl: &a.QueueURL,
MessageBody: &body,
})
if err != nil {
if awserr, ok := err.(awserr.Error); ok {
return nil, &function.ErrFunctionCallFailed{Original: awserr}
}
}

return []byte(*sendMessageOutput.MessageId), err
}

// validate provider config.
func (a AWSSQS) validate() error {
validate := validator.New()
err := validate.Struct(a)
if err != nil {
return err
}
return nil
}

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface.
func (a AWSSQS) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("queueUrl", a.QueueURL)
enc.AddString("region", a.Region)
if a.AWSAccessKeyID != "" {
enc.AddString("awsAccessKeyId", "*****")
}
if a.AWSSecretAccessKey != "" {
enc.AddString("awsSecretAccessKey", "*****")
}
if a.AWSSessionToken != "" {
enc.AddString("awsSessionToken", "*****")
}
return nil
}

// ProviderLoader implementation
type ProviderLoader struct{}

// Load decode JSON data as Config and return initialized Provider instance.
func (p ProviderLoader) Load(data []byte) (function.Provider, error) {
provider := &AWSSQS{}
err := json.Unmarshal(data, provider)
if err != nil {
return nil, errors.New("unable to load function provider config: " + err.Error())
}

err = provider.validate()
if err != nil {
return nil, errors.New("missing required fields for AWS SQS function")
}

config := aws.NewConfig().WithRegion(provider.Region)
if provider.AWSAccessKeyID != "" && provider.AWSSecretAccessKey != "" {
config = config.WithCredentials(credentials.NewStaticCredentials(provider.AWSAccessKeyID, provider.AWSSecretAccessKey, provider.AWSSessionToken))
}

awsSession, err := session.NewSession(config)
if err != nil {
return nil, errors.New("unable to create AWS Session: " + err.Error())
}

provider.Service = sqs.New(awsSession)
return provider, nil
}
125 changes: 125 additions & 0 deletions providers/awssqs/awssqs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package awssqs_test

import (
"errors"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/golang/mock/gomock"
"github.com/serverless/event-gateway/function"
"github.com/serverless/event-gateway/providers/awssqs"
"github.com/serverless/event-gateway/providers/awssqs/mock"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zapcore"
)

func TestLoad(t *testing.T) {
for _, testCase := range loadTests {
config := []byte(testCase.config)
loader := awssqs.ProviderLoader{}

_, err := loader.Load(config)

assert.Equal(t, testCase.expectedError, err)
}
}

func TestCall(t *testing.T) {
for _, testCase := range callTests {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
serviceMock := mock.NewMockSQSAPI(mockCtrl)
serviceMock.EXPECT().SendMessage(gomock.Any()).Return(testCase.sendMessageResult, testCase.sendMessageError)

provider := awssqs.AWSSQS{
Service: serviceMock,
QueueURL: "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue",
Region: "us-east-1",
}

output, err := provider.Call([]byte("testpayload"))

assert.Equal(t, testCase.expectedResult, output)
assert.Equal(t, testCase.expectedError, err)
}
}

func TestMarshalLogObject(t *testing.T) {
for _, testCase := range logTests {
enc := zapcore.NewMapObjectEncoder()

testCase.provider.MarshalLogObject(enc)

assert.Equal(t, testCase.expectedFields, enc.Fields)
}
}

var loadTests = []struct {
config string
expectedError error
}{
{
`{"queueUrl": "", "region": `,
errors.New("unable to load function provider config: unexpected end of JSON input"),
},
{
`{"queueUrl": "", "region": "us-east-1"}`,
errors.New("missing required fields for AWS SQS function"),
},
{
`{"queueUrl": "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue", "region": ""}`,
errors.New("missing required fields for AWS SQS function"),
},
}

var callTests = []struct {
sendMessageResult *sqs.SendMessageOutput
sendMessageError error
expectedResult []byte
expectedError error
}{
{
&sqs.SendMessageOutput{MessageId: aws.String("testid")},
nil,
[]byte("testid"),
nil,
},
{
nil,
awserr.New("", "", nil),
[]byte(nil),
&function.ErrFunctionCallFailed{Original: awserr.New("", "", nil)},
},
}

var logTests = []struct {
provider function.Provider
expectedFields map[string]interface{}
}{
{
awssqs.AWSSQS{
QueueURL : "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue",
Region: "us-east-1",
},
map[string]interface{}{
"queueUrl": "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue",
"region": "us-east-1",
},
},
{
awssqs.AWSSQS{
AWSAccessKeyID: "id",
AWSSecretAccessKey: "key",
AWSSessionToken: "token",
},
map[string]interface{}{
"queueUrl": "",
"region": "",
"awsAccessKeyId": "*****",
"awsSecretAccessKey": "*****",
"awsSessionToken": "*****",
},
},
}
3 changes: 3 additions & 0 deletions providers/awssqs/mock/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//go:generate mockgen -package mock -destination sqsiface.go github.com/aws/aws-sdk-go/service/sqs/sqsiface SQSAPI

package mock
Loading

0 comments on commit f673e72

Please sign in to comment.