Skip to content

Commit

Permalink
Add Kinesis provider. (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed Mar 19, 2018
1 parent 172a76e commit d48736a
Show file tree
Hide file tree
Showing 6 changed files with 1,254 additions and 0 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Discover and call serverless functions from anything that can reach the Event Ga
following function types:

* FaaS functions (AWS Lambda, Google Cloud Functions, Azure Functions, OpenWhisk Actions)
* Connectors (AWS Kinesis)
* HTTP endpoints/Webhook (e.g. POST http://example.com/function)

Function Discovery stores information about functions allowing the Event Gateway to call them as a reaction to received
Expand Down Expand Up @@ -448,6 +449,12 @@ JSON object:
* `awsSessionToken` - `string` - optional, AWS session token
* for HTTP function:
* `url` - `string` - required, the URL of an http or https remote endpoint
* for AWS Kinesis connector:
* `streamName` - `string` - required, AWS Kinesis Stream Name
* `region` - `string` - required, region name
* `awsAccessKeyId` - `string` - optional, AWS API key ID. By default credentials from the [environment](http://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials) are used.
* `awsSecretAccessKey` - `string` - optional, AWS API access key. By default credentials from the [environment](http://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials) are used.
* `awsSessionToken` - `string` - optional, AWS session token

**Response**

Expand Down
1 change: 1 addition & 0 deletions cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/serverless/event-gateway/plugin"

// providers
_ "github.com/serverless/event-gateway/providers/awskinesis"
_ "github.com/serverless/event-gateway/providers/awslambda"
_ "github.com/serverless/event-gateway/providers/http"
)
Expand Down
106 changes: 106 additions & 0 deletions providers/awskinesis/awskinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package awskinesis

import (
"encoding/json"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
uuid "github.com/satori/go.uuid"
"github.com/serverless/event-gateway/function"
"go.uber.org/zap/zapcore"
validator "gopkg.in/go-playground/validator.v9"
)

// Type of provider.
const Type = function.ProviderType("awskinesis")

func init() {
function.RegisterProvider(Type, ProviderLoader{})
}

// AWSKinesis function implementation
type AWSKinesis struct {
Service kinesisiface.KinesisAPI `json:"-" validate:"-"`

StreamName string `json:"streamName" validate:"required"`
Region string `json:"region" validate:"required"`
AWSAccessKeyID string `json:"awsAccessKeyId,omitempty"`
AWSSecretAccessKey string `json:"awsSecretAccessKey,omitempty"`
AWSSessionToken string `json:"awsSessionToken,omitempty"`
}

// Call puts record into AWS Kinesis stream.
func (a AWSKinesis) Call(payload []byte) ([]byte, error) {
putRecordOutput, err := a.Service.PutRecord(&kinesis.PutRecordInput{
StreamName: &a.StreamName,
Data: payload,
PartitionKey: aws.String(uuid.NewV4().String()),
})
if err != nil {
if awserr, ok := err.(awserr.Error); ok {
return nil, &function.ErrFunctionCallFailed{Original: awserr}
}
}

return []byte(*putRecordOutput.SequenceNumber), err
}

// validate provider config.
func (a AWSKinesis) validate() error {
validate := validator.New()
err := validate.Struct(a)
if err != nil {
return err
}
return nil
}

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface.
func (a AWSKinesis) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("streamName", a.StreamName)
enc.AddString("region", a.Region)
if a.AWSAccessKeyID != "" {
enc.AddString("awsAccessKeyId", "*****")
}
if a.AWSSecretAccessKey != "" {
enc.AddString("awsSecretAccessKey", "*****")
}
if a.AWSSessionToken != "" {
enc.AddString("awsSessionToken", "*****")
}
return nil
}

// ProviderLoader implementation
type ProviderLoader struct{}

// Load decode JSON data as Config and return initialized Provider instance.
func (p ProviderLoader) Load(data []byte) (function.Provider, error) {
provider := &AWSKinesis{}
err := json.Unmarshal(data, provider)
if err != nil {
return nil, &function.ErrFunctionValidation{Message: "Unable to load function provider config: " + err.Error()}
}

err = provider.validate()
if err != nil {
return nil, &function.ErrFunctionValidation{Message: "Missing required fields for AWS Kinesis function."}
}

config := aws.NewConfig().WithRegion(provider.Region)
if provider.AWSAccessKeyID != "" && provider.AWSSecretAccessKey != "" {
config = config.WithCredentials(credentials.NewStaticCredentials(provider.AWSAccessKeyID, provider.AWSSecretAccessKey, provider.AWSSessionToken))
}

awsSession, err := session.NewSession(config)
if err != nil {
return nil, &function.ErrFunctionValidation{Message: "Unable to create AWS Session: " + err.Error()}
}

provider.Service = kinesis.New(awsSession)
return provider, nil
}
60 changes: 60 additions & 0 deletions providers/awskinesis/awskinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package awskinesis_test

import (
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/golang/mock/gomock"
"github.com/serverless/event-gateway/function"
"github.com/serverless/event-gateway/providers/awskinesis"
"github.com/serverless/event-gateway/providers/awskinesis/mock"
"github.com/stretchr/testify/assert"
)

func TestLoad_MalformedJSON(t *testing.T) {
config := []byte(`{"streamName": "", "region": `)
loader := awskinesis.ProviderLoader{}

provider, err := loader.Load(config)

assert.Nil(t, provider)
assert.Equal(t, err, &function.ErrFunctionValidation{Message: "Unable to load function provider config: unexpected end of JSON input"})
}

func TestLoad_MissingStreamName(t *testing.T) {
config := []byte(`{"streamName": "", "region": "us-east-1"}`)
loader := awskinesis.ProviderLoader{}

provider, err := loader.Load(config)

assert.Nil(t, provider)
assert.Equal(t, err, &function.ErrFunctionValidation{Message: "Missing required fields for AWS Kinesis function."})
}

func TestLoad_MissingRegion(t *testing.T) {
config := []byte(`{"streamName": "test", "region": ""}`)
loader := awskinesis.ProviderLoader{}

provider, err := loader.Load(config)

assert.Nil(t, provider)
assert.Equal(t, err, &function.ErrFunctionValidation{Message: "Missing required fields for AWS Kinesis function."})
}

func TestCall(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
serviceMock := mock.NewMockKinesisAPI(mockCtrl)
serviceMock.EXPECT().PutRecord(gomock.Any()).Return(&kinesis.PutRecordOutput{SequenceNumber: aws.String("testseq")}, nil)
provider := awskinesis.AWSKinesis{
Service: serviceMock,
StreamName: "teststream",
Region: "us-east-1",
}

output, err := provider.Call([]byte("testpayload"))

assert.Nil(t, err)
assert.Equal(t, []byte("testseq"), output)
}
Loading

0 comments on commit d48736a

Please sign in to comment.