Skip to content

Commit

Permalink
add support for HTTP response object. Closes #245 (#286)
Browse files Browse the repository at this point in the history
* add support for HTTP response object. Closes #245

* refactor handleSyncEvent
  • Loading branch information
mthenw authored Aug 15, 2017
1 parent 2fc77fb commit dec7b02
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 68 deletions.
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ All data that passes through the Event Gateway is formatted as an Event, based o
- `event` - `string` - the event name
- `id` - `string` - the event's instance universally unique ID (provided by the event gateway)
- `receivedAt` - `number` - the time (milliseconds) when the Event was received by the Event Gateway (provided by the event gateway)
- `data` - type depends on `encoding` - the event payload
- `data` - type depends on `dataType` - the event payload
- `dataType` - `string` - the mime type of `data` payload

Example:
Expand All @@ -268,25 +268,25 @@ Example:
}
```

### Event Data Types
#### Event Data Type

The MIME type of the data block can be specified using the Content-Type header. This allows the event gateway to understand how to deserialize the data block if it needs to. If not specified `application/octet-stream` type is assumed and no deserialization happens. In case of `application/json` encoding the event gateway passes deserialized JSON payload to target functions.
The MIME type of the data block can be specified using the `Content-Type` header (by default it's `application/octet-stream`). This allows the event gateway to understand how to deserialize the data block if it needs to. In case of `application/json` type the event gateway passes JSON payload to the target functions. In any other case the data block is Base64 encoded.

### Emit a Custom Event (Async Function Invocation)

`POST /` with `Event` header set to event name. Optionally `Content-Type: <MIME type>` header can be set to specify payload encoding.
`POST /` with `Event` header set to an event name. Optionally `Content-Type: <MIME type>` header can be set to specify payload encoding.

Request: arbitrary payload, subscribed function receives an event in above schema, where request payload is passed as `data` field

Response: `202 Accepted` in case of success

### Emit a HTTP Event
### Emit an HTTP Event

Creating HTTP subscription requires `method` and `path` properties. Those properties are used to emit HTTP event.
Creating HTTP subscription requires `method` and `path` properties. Those properties are used to listen for HTTP events.

`<method> /<path>`

Request: arbitrary payload, subscribed function receives an event in above schema. `data` field has following fields:
Request: arbitrary payload, subscribed function receives an event in above schema. `data` field has the following fields:

```
{
Expand All @@ -304,6 +304,16 @@ Request: arbitrary payload, subscribed function receives an event in above schem

Response: function response

### Respond to an HTTP Event

To respond to an HTTP event a function needs to return object with following fields:

- `statusCode` - `int` - response status code, default: 200
- `headers` - `object` - response headers
- `body` - `string` - response body

Currently, the event gateway supports only string responses.

### Invoking a Registered Function (Sync Function Invocation)

`POST /` with `Event` header set to `invoke` and `Function-ID` set to function ID.
Expand Down
7 changes: 7 additions & 0 deletions router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type HTTPEvent struct {
Method string `json:"method"`
}

// HTTPResponse is a response schema returned by subscribed function in case of HTTP event.
type HTTPResponse struct {
StatusCode int `json:"statusCode"`
Headers map[string]string `json:"headers"`
Body string `json:"body"`
}

const (
mimeJSON = "application/json"
mimeOctetStrem = "application/octet-stream"
Expand Down
79 changes: 60 additions & 19 deletions router/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@ import (
"github.com/serverless/event-gateway/internal/metrics"
"github.com/serverless/event-gateway/internal/sync"
"github.com/serverless/event-gateway/subscriptions"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestMain(t *testing.T) {
etcd.Register()
}

func TestIntegrationSubscription(t *testing.T) {
func TestIntegration_Subscription(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
log, _ := logCfg.Build()

kv, shutdownGuard := newTestEtcd()

testAPIServer := newConfigAPIServer(kv, log)
defer testAPIServer.Close()

router, testRouterServer := newTestRouterServer(kv, log)
defer testRouterServer.Close()
router.StartWorkers()
Expand Down Expand Up @@ -100,23 +99,19 @@ func TestIntegrationSubscription(t *testing.T) {
shutdownGuard.ShutdownAndWait()
}

func TestIntegrationHTTPSubscription(t *testing.T) {
func TestIntegration_HTTPSubscription(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
log, _ := logCfg.Build()

kv, shutdownGuard := newTestEtcd()

testAPIServer := newConfigAPIServer(kv, log)
defer testAPIServer.Close()

router, testRouterServer := newTestRouterServer(kv, log)
defer testRouterServer.Close()

expected := "😸"

testTargetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, expected)
fmt.Fprintf(w, "😸")
}))
defer testTargetServer.Close()

Expand All @@ -132,22 +127,68 @@ func TestIntegrationHTTPSubscription(t *testing.T) {
post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{
FunctionID: functions.FunctionID("supersmileyfunction"),
Event: "http",
Method: "POST",
Method: "GET",
Path: "/smilez",
})

select {
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("POST", "/smilez")):
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("GET", "/smilez")):
case <-time.After(10 * time.Second):
panic("timed out waiting for endpoint to be configured!")
}

res := get(testRouterServer.URL + "/smilez")
_, _, body := get(testRouterServer.URL + "/smilez")
assert.Equal(t, "😸", body)

router.Drain()
shutdownGuard.ShutdownAndWait()
}

func TestIntegration_HTTPResponse(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
log, _ := logCfg.Build()

kv, shutdownGuard := newTestEtcd()

testAPIServer := newConfigAPIServer(kv, log)
defer testAPIServer.Close()

router, testRouterServer := newTestRouterServer(kv, log)
defer testRouterServer.Close()

testTargetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `{"statusCode":201,"headers":{"content-type":"text/html"},"body":"<head></head>"}`)
}))
defer testTargetServer.Close()

if res != expected {
panic("returned value was not \"" + expected + "\", unexpected value: \"" + res + "\"")
post(testAPIServer.URL+"/v1/functions",
functions.Function{
ID: functions.FunctionID("httpresponse"),
Provider: &functions.Provider{
Type: functions.HTTPEndpoint,
URL: testTargetServer.URL,
},
})

post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{
FunctionID: functions.FunctionID("httpresponse"),
Event: "http",
Method: "GET",
Path: "/httpresponse",
})

select {
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("GET", "/httpresponse")):
case <-time.After(10 * time.Second):
panic("timed out waiting for endpoint to be configured!")
}

statusCode, headers, body := get(testRouterServer.URL + "/httpresponse")
assert.Equal(t, statusCode, 201)
assert.Equal(t, headers.Get("content-type"), "text/html")
assert.Equal(t, body, "<head></head>")

router.Drain()
shutdownGuard.ShutdownAndWait()
}
Expand All @@ -174,9 +215,9 @@ func emit(url, topic string, body []byte) {
defer resp.Body.Close()
}

func post(url string, thing interface{}) ([]byte, error) {
func post(url string, payload interface{}) ([]byte, error) {
reqBytes := &bytes.Buffer{}
json.NewEncoder(reqBytes).Encode(thing)
json.NewEncoder(reqBytes).Encode(payload)

resp, err := http.Post(url, "application/json", reqBytes)
if err != nil {
Expand All @@ -187,8 +228,8 @@ func post(url string, thing interface{}) ([]byte, error) {
return ioutil.ReadAll(resp.Body)
}

func get(url string) string {
res, err := http.Post(url, "application/json", nil)
func get(url string) (int, http.Header, string) {
res, err := http.Get(url)
if err != nil {
panic(err)
}
Expand All @@ -198,7 +239,7 @@ func get(url string) string {
panic(err)
}

return string(body)
return res.StatusCode, res.Header, string(body)
}

func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) {
Expand Down
92 changes: 50 additions & 42 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,17 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if event.Event == eventHTTP || event.Event == eventInvoke {
router.handleSyncEvent(event, w, r)
router.handleSyncEvent(event.Event, payload, w, r)
} else if r.Method == http.MethodPost && r.URL.Path == "/" {
router.handleAsyncEvent(event, w, r)
router.enqueueWork(subscriptions.TopicID(event.Event), payload)
w.WriteHeader(http.StatusAccepted)
}
}

Expand Down Expand Up @@ -159,20 +166,15 @@ var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

func (router *Router) handleSyncEvent(name string, payload []byte, w http.ResponseWriter, r *http.Request) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

var resp []byte
var functionID functions.FunctionID
if event.Event == eventInvoke {

if name == eventInvoke {
functionID = functions.FunctionID(r.Header.Get(headerFunctionID))
} else if event.Event == eventHTTP {
} else if name == eventHTTP {
endpointID := subscriptions.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath())
backingFunction := router.targetCache.BackingFunction(endpointID)
if backingFunction == nil {
Expand All @@ -183,10 +185,9 @@ func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *ht
functionID = *backingFunction
}

router.log.Debug("Function triggered.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)))
router.log.Debug("Function triggered.", zap.String("functionId", string(functionID)), zap.String("event", string(payload)))

resp, err = router.callFunction(functionID, payload)
resp, err := router.callFunction(functionID, payload)
if err != nil {
router.log.Warn("Function invocation failed.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)), zap.Error(err))
Expand All @@ -195,16 +196,8 @@ func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *ht
http.Error(w, err.Error(), http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}

if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(functionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.enqueueWork(subscriptions.TopicID(internal.Event), payload)
}
router.emitFunctionProviderErrorEvent(functionID, payload, err)
}

return
Expand All @@ -214,22 +207,39 @@ func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *ht
zap.String("functionId", string(functionID)), zap.String("event", string(payload)),
zap.String("response", string(resp)))

if name == eventHTTP {
httpResponse := &HTTPResponse{StatusCode: http.StatusOK}
err = json.Unmarshal(resp, httpResponse)
if err == nil {
for key, value := range httpResponse.Headers {
w.Header().Set(key, value)
}

w.WriteHeader(httpResponse.StatusCode)

resp = []byte(httpResponse.Body)
}
}

_, err = w.Write(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (router *Router) handleAsyncEvent(instance *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(instance)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
func (router *Router) enqueueWork(topic subscriptions.TopicID, payload []byte) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

router.enqueueWork(subscriptions.TopicID(instance.Event), payload)
w.WriteHeader(http.StatusAccepted)
select {
case router.work <- event{
topic: topic,
payload: payload,
}:
default:
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
router.dropMetric.Inc()
}
}

// callFunction looks up a function and calls it.
Expand Down Expand Up @@ -325,17 +335,15 @@ func (router *Router) processEvent(e event) {
}
}

func (router *Router) enqueueWork(topic subscriptions.TopicID, payload []byte) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

select {
case router.work <- event{
topic: topic,
payload: payload,
}:
default:
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
router.dropMetric.Inc()
func (router *Router) emitFunctionProviderErrorEvent(functionID functions.FunctionID, payload []byte, err error) {
if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(functionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.enqueueWork(subscriptions.TopicID(internal.Event), payload)
}
}
}

Expand Down

0 comments on commit dec7b02

Please sign in to comment.