Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for HTTP response object. Closes #245 #286

Merged
merged 2 commits into from
Aug 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ All data that passes through the Event Gateway is formatted as an Event, based o
- `event` - `string` - the event name
- `id` - `string` - the event's instance universally unique ID (provided by the event gateway)
- `receivedAt` - `number` - the time (milliseconds) when the Event was received by the Event Gateway (provided by the event gateway)
- `data` - type depends on `encoding` - the event payload
- `data` - type depends on `dataType` - the event payload
- `dataType` - `string` - the mime type of `data` payload

Example:
Expand All @@ -268,25 +268,25 @@ Example:
}
```

### Event Data Types
#### Event Data Type

The MIME type of the data block can be specified using the Content-Type header. This allows the event gateway to understand how to deserialize the data block if it needs to. If not specified `application/octet-stream` type is assumed and no deserialization happens. In case of `application/json` encoding the event gateway passes deserialized JSON payload to target functions.
The MIME type of the data block can be specified using the `Content-Type` header (by default it's `application/octet-stream`). This allows the event gateway to understand how to deserialize the data block if it needs to. In case of `application/json` type the event gateway passes JSON payload to the target functions. In any other case the data block is Base64 encoded.

### Emit a Custom Event (Async Function Invocation)

`POST /` with `Event` header set to event name. Optionally `Content-Type: <MIME type>` header can be set to specify payload encoding.
`POST /` with `Event` header set to an event name. Optionally `Content-Type: <MIME type>` header can be set to specify payload encoding.

Request: arbitrary payload, subscribed function receives an event in above schema, where request payload is passed as `data` field

Response: `202 Accepted` in case of success

### Emit a HTTP Event
### Emit an HTTP Event

Creating HTTP subscription requires `method` and `path` properties. Those properties are used to emit HTTP event.
Creating HTTP subscription requires `method` and `path` properties. Those properties are used to listen for HTTP events.

`<method> /<path>`

Request: arbitrary payload, subscribed function receives an event in above schema. `data` field has following fields:
Request: arbitrary payload, subscribed function receives an event in above schema. `data` field has the following fields:

```
{
Expand All @@ -304,6 +304,16 @@ Request: arbitrary payload, subscribed function receives an event in above schem

Response: function response

### Respond to an HTTP Event

To respond to an HTTP event a function needs to return object with following fields:

- `statusCode` - `int` - response status code, default: 200
- `headers` - `object` - response headers
- `body` - `string` - response body

Currently, the event gateway supports only string responses.

### Invoking a Registered Function (Sync Function Invocation)

`POST /` with `Event` header set to `invoke` and `Function-ID` set to function ID.
Expand Down
7 changes: 7 additions & 0 deletions router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type HTTPEvent struct {
Method string `json:"method"`
}

// HTTPResponse is a response schema returned by subscribed function in case of HTTP event.
type HTTPResponse struct {
StatusCode int `json:"statusCode"`
Headers map[string]string `json:"headers"`
Body string `json:"body"`
}

const (
mimeJSON = "application/json"
mimeOctetStrem = "application/octet-stream"
Expand Down
79 changes: 60 additions & 19 deletions router/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@ import (
"github.com/serverless/event-gateway/internal/metrics"
"github.com/serverless/event-gateway/internal/sync"
"github.com/serverless/event-gateway/subscriptions"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestMain(t *testing.T) {
etcd.Register()
}

func TestIntegrationSubscription(t *testing.T) {
func TestIntegration_Subscription(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
log, _ := logCfg.Build()

kv, shutdownGuard := newTestEtcd()

testAPIServer := newConfigAPIServer(kv, log)
defer testAPIServer.Close()

router, testRouterServer := newTestRouterServer(kv, log)
defer testRouterServer.Close()
router.StartWorkers()
Expand Down Expand Up @@ -100,23 +99,19 @@ func TestIntegrationSubscription(t *testing.T) {
shutdownGuard.ShutdownAndWait()
}

func TestIntegrationHTTPSubscription(t *testing.T) {
func TestIntegration_HTTPSubscription(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
log, _ := logCfg.Build()

kv, shutdownGuard := newTestEtcd()

testAPIServer := newConfigAPIServer(kv, log)
defer testAPIServer.Close()

router, testRouterServer := newTestRouterServer(kv, log)
defer testRouterServer.Close()

expected := "😸"

testTargetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, expected)
fmt.Fprintf(w, "😸")
}))
defer testTargetServer.Close()

Expand All @@ -132,22 +127,68 @@ func TestIntegrationHTTPSubscription(t *testing.T) {
post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{
FunctionID: functions.FunctionID("supersmileyfunction"),
Event: "http",
Method: "POST",
Method: "GET",
Path: "/smilez",
})

select {
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("POST", "/smilez")):
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("GET", "/smilez")):
case <-time.After(10 * time.Second):
panic("timed out waiting for endpoint to be configured!")
}

res := get(testRouterServer.URL + "/smilez")
_, _, body := get(testRouterServer.URL + "/smilez")
assert.Equal(t, "😸", body)

router.Drain()
shutdownGuard.ShutdownAndWait()
}

func TestIntegration_HTTPResponse(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
log, _ := logCfg.Build()

kv, shutdownGuard := newTestEtcd()

testAPIServer := newConfigAPIServer(kv, log)
defer testAPIServer.Close()

router, testRouterServer := newTestRouterServer(kv, log)
defer testRouterServer.Close()

testTargetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `{"statusCode":201,"headers":{"content-type":"text/html"},"body":"<head></head>"}`)
}))
defer testTargetServer.Close()

if res != expected {
panic("returned value was not \"" + expected + "\", unexpected value: \"" + res + "\"")
post(testAPIServer.URL+"/v1/functions",
functions.Function{
ID: functions.FunctionID("httpresponse"),
Provider: &functions.Provider{
Type: functions.HTTPEndpoint,
URL: testTargetServer.URL,
},
})

post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{
FunctionID: functions.FunctionID("httpresponse"),
Event: "http",
Method: "GET",
Path: "/httpresponse",
})

select {
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("GET", "/httpresponse")):
case <-time.After(10 * time.Second):
panic("timed out waiting for endpoint to be configured!")
}

statusCode, headers, body := get(testRouterServer.URL + "/httpresponse")
assert.Equal(t, statusCode, 201)
assert.Equal(t, headers.Get("content-type"), "text/html")
assert.Equal(t, body, "<head></head>")

router.Drain()
shutdownGuard.ShutdownAndWait()
}
Expand All @@ -174,9 +215,9 @@ func emit(url, topic string, body []byte) {
defer resp.Body.Close()
}

func post(url string, thing interface{}) ([]byte, error) {
func post(url string, payload interface{}) ([]byte, error) {
reqBytes := &bytes.Buffer{}
json.NewEncoder(reqBytes).Encode(thing)
json.NewEncoder(reqBytes).Encode(payload)

resp, err := http.Post(url, "application/json", reqBytes)
if err != nil {
Expand All @@ -187,8 +228,8 @@ func post(url string, thing interface{}) ([]byte, error) {
return ioutil.ReadAll(resp.Body)
}

func get(url string) string {
res, err := http.Post(url, "application/json", nil)
func get(url string) (int, http.Header, string) {
res, err := http.Get(url)
if err != nil {
panic(err)
}
Expand All @@ -198,7 +239,7 @@ func get(url string) string {
panic(err)
}

return string(body)
return res.StatusCode, res.Header, string(body)
}

func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) {
Expand Down
92 changes: 50 additions & 42 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,17 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if event.Event == eventHTTP || event.Event == eventInvoke {
router.handleSyncEvent(event, w, r)
router.handleSyncEvent(event.Event, payload, w, r)
} else if r.Method == http.MethodPost && r.URL.Path == "/" {
router.handleAsyncEvent(event, w, r)
router.enqueueWork(subscriptions.TopicID(event.Event), payload)
w.WriteHeader(http.StatusAccepted)
}
}

Expand Down Expand Up @@ -159,20 +166,15 @@ var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

func (router *Router) handleSyncEvent(name string, payload []byte, w http.ResponseWriter, r *http.Request) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

var resp []byte
var functionID functions.FunctionID
if event.Event == eventInvoke {

if name == eventInvoke {
functionID = functions.FunctionID(r.Header.Get(headerFunctionID))
} else if event.Event == eventHTTP {
} else if name == eventHTTP {
endpointID := subscriptions.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath())
backingFunction := router.targetCache.BackingFunction(endpointID)
if backingFunction == nil {
Expand All @@ -183,10 +185,9 @@ func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *ht
functionID = *backingFunction
}

router.log.Debug("Function triggered.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)))
router.log.Debug("Function triggered.", zap.String("functionId", string(functionID)), zap.String("event", string(payload)))

resp, err = router.callFunction(functionID, payload)
resp, err := router.callFunction(functionID, payload)
if err != nil {
router.log.Warn("Function invocation failed.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)), zap.Error(err))
Expand All @@ -195,16 +196,8 @@ func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *ht
http.Error(w, err.Error(), http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}

if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(functionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.enqueueWork(subscriptions.TopicID(internal.Event), payload)
}
router.emitFunctionProviderErrorEvent(functionID, payload, err)
}

return
Expand All @@ -214,22 +207,39 @@ func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *ht
zap.String("functionId", string(functionID)), zap.String("event", string(payload)),
zap.String("response", string(resp)))

if name == eventHTTP {
httpResponse := &HTTPResponse{StatusCode: http.StatusOK}
err = json.Unmarshal(resp, httpResponse)
if err == nil {
for key, value := range httpResponse.Headers {
w.Header().Set(key, value)
}

w.WriteHeader(httpResponse.StatusCode)

resp = []byte(httpResponse.Body)
}
}

_, err = w.Write(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (router *Router) handleAsyncEvent(instance *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(instance)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
func (router *Router) enqueueWork(topic subscriptions.TopicID, payload []byte) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

router.enqueueWork(subscriptions.TopicID(instance.Event), payload)
w.WriteHeader(http.StatusAccepted)
select {
case router.work <- event{
topic: topic,
payload: payload,
}:
default:
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
router.dropMetric.Inc()
}
}

// callFunction looks up a function and calls it.
Expand Down Expand Up @@ -325,17 +335,15 @@ func (router *Router) processEvent(e event) {
}
}

func (router *Router) enqueueWork(topic subscriptions.TopicID, payload []byte) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

select {
case router.work <- event{
topic: topic,
payload: payload,
}:
default:
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
router.dropMetric.Inc()
func (router *Router) emitFunctionProviderErrorEvent(functionID functions.FunctionID, payload []byte, err error) {
if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(functionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.enqueueWork(subscriptions.TopicID(internal.Event), payload)
}
}
}

Expand Down