Skip to content

Commit

Permalink
fix HTTP event structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciej Winnicki committed Aug 11, 2017
1 parent e1afe94 commit 74b274b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 45 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,15 @@ Creating HTTP subscription requires `method` and `path` properties. Those proper

Request: arbitrary payload, subscribed function receives an event in above schema. `data` field has following fields:

```json
```
{
...
"data": {
"headers": <request headers>,
"query": <request query params>,
"body": <request payload>
}
...
}
```

Expand Down
45 changes: 18 additions & 27 deletions router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,55 +31,46 @@ const (
mimeOctetStrem = "application/octet-stream"
)

func transform(event string, r *http.Request) ([]byte, error) {
func fromRequest(r *http.Request) (*Schema, error) {
name := r.Header.Get("event")
if name == "" {
name = eventHTTP
}

mime := r.Header.Get("content-type")
if mime == "" {
mime = mimeOctetStrem
}

payload, err := ioutil.ReadAll(r.Body)
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}

instance := &Schema{
Event: event,
event := &Schema{
Event: name,
ID: uuid.NewV4().String(),
ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
DataType: mime,
Data: payload,
Data: body,
}

if mime == mimeJSON && len(payload) > 0 {
err := json.Unmarshal(payload, &instance.Data)
if mime == mimeJSON && len(body) > 0 {
err := json.Unmarshal(body, &event.Data)
if err != nil {
return nil, err
}
}

return json.Marshal(instance)
}

func transformHTTP(r *http.Request) ([]byte, error) {
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}

instance := &HTTPSchema{
Headers: r.Header,
Query: r.URL.Query(),
Body: payload,
}

if r.Header.Get("content-type") == mimeJSON && len(payload) > 0 {
err := json.Unmarshal(payload, &instance.Body)
if err != nil {
return nil, err
if event.Event == eventHTTP {
event.Data = &HTTPSchema{
Headers: r.Header,
Query: r.URL.Query(),
Body: event.Data,
}
}

return json.Marshal(instance)
return event, nil
}

type event struct {
Expand Down
43 changes: 26 additions & 17 deletions router/router.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"encoding/json"
"errors"
"net/http"
"strings"
Expand Down Expand Up @@ -48,11 +49,16 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

eventName := r.Header.Get("event")
if eventName == "" {
router.handleHTTPEvent(w, r)
event, err := fromRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if event.Event == eventHTTP {
router.handleHTTPEvent(event, w, r)
} else if r.Method == http.MethodPost && r.URL.Path == "/" {
router.handleEvent(eventName, w, r)
router.handleEvent(event, w, r)
}
}

Expand Down Expand Up @@ -139,6 +145,9 @@ const (
// eventInvoke is a special type of event for sync function invocation.
eventInvoke = "invoke"

// eventHTTP is a special type of event for sync http subscriptions.
eventHTTP = "http"

// headerFunctionID is a header name for specifing function id for sync invocation.
headerFunctionID = "function-id"
)
Expand All @@ -148,17 +157,17 @@ var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleHTTPEvent(w http.ResponseWriter, r *http.Request) {
httpevent, err := transformHTTP(r)
func (router *Router) handleHTTPEvent(event *Schema, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

router.log.Debug("Event received.", zap.String("event", string(httpevent)), zap.String("path", r.URL.EscapedPath()), zap.String("method", r.Method))
router.log.Debug("Event received.", zap.String("event", string(payload)), zap.String("path", r.URL.EscapedPath()), zap.String("method", r.Method))

endpointID := pubsub.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath())
res, err := router.callEndpoint(endpointID, httpevent)
res, err := router.callEndpoint(endpointID, payload)
if err != nil {
router.log.Warn(`Handling "http" event failed.`, zap.String("path", r.URL.EscapedPath()), zap.String("method", r.Method), zap.Error(err))
if err == errUnableToLookUpBackingFunction {
Expand All @@ -177,21 +186,21 @@ func (router *Router) handleHTTPEvent(w http.ResponseWriter, r *http.Request) {
}
}

func (router *Router) handleEvent(eventName string, w http.ResponseWriter, r *http.Request) {
customevent, err := transform(eventName, r)
func (router *Router) handleEvent(instance *Schema, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(instance)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

router.log.Debug("Event received.", zap.String("event", string(customevent)))
router.log.Debug("Event received.", zap.String("event", string(payload)))

if eventName == eventInvoke {
if instance.Event == eventInvoke {
functionID := functions.FunctionID(r.Header.Get(headerFunctionID))
res, err := router.callFunction(functionID, customevent)
res, err := router.callFunction(functionID, payload)
if err != nil {
router.log.Warn("Function invocation failed.", zap.String("functionId", string(functionID)),
zap.String("event", string(customevent)), zap.Error(err))
zap.String("event", string(payload)), zap.Error(err))

if err == errUnableToLookUpRegisteredFunction {
http.Error(w, err.Error(), http.StatusNotFound)
Expand All @@ -202,7 +211,7 @@ func (router *Router) handleEvent(eventName string, w http.ResponseWriter, r *ht
}

router.log.Debug("Function invoked.", zap.String("functionId", string(functionID)),
zap.String("event", string(customevent)), zap.String("response", string(res)))
zap.String("event", string(payload)), zap.String("response", string(res)))

_, err = w.Write(res)
if err != nil {
Expand All @@ -211,8 +220,8 @@ func (router *Router) handleEvent(eventName string, w http.ResponseWriter, r *ht
}
} else {
router.processEvent(event{
topics: []pubsub.TopicID{pubsub.TopicID(eventName)},
payload: customevent,
topics: []pubsub.TopicID{pubsub.TopicID(instance.Event)},
payload: payload,
})

w.WriteHeader(http.StatusAccepted)
Expand Down

0 comments on commit 74b274b

Please sign in to comment.