Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove invoke functionality #432

Merged
merged 2 commits into from
May 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,34 +153,6 @@ eventGateway.registerFunction({
```
</details>


#### Example: Function-To-Function call

<details open>
<summary>curl example</summary>

```bash
curl --request POST \
--url http://localhost:4000/ \
--header 'content-type: application/json' \
--header 'event: invoke' \
--header 'function-id: createUser' \
--data '{ "name": "Max" }'
```

</details>
<details>
<summary>Node.js SDK example</summary>

```javascript
const eventGateway = new EventGateway({ url: 'http://localhost' })
eventGateway.invoke({
functionId: 'createUser',
data: { name: 'Max' }
})
```
</details>

### Subscriptions

Lightweight pub/sub system. Allows functions to asynchronously receive custom events. Instead of rewriting your
Expand Down
29 changes: 0 additions & 29 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ the data block is base64 encoded.
* `params` - `object` - matched path parameters
* `body` - depends on `Content-Type` header - request payload

#### Invoke Event

`invoke` is a built-in event type allowing synchronous invocations. Function will react to this event only if there is a
subscription created beforehand.

### Emit a Custom Event

Creating a subscription requires `path` property (by default it's "/"). `path` indicates path under which you can push an
Expand Down Expand Up @@ -139,30 +134,6 @@ To respond to an HTTP event a function needs to return object with following fie

Currently, the event gateway supports only string responses.

### Invoking a Registered Function - Sync Function Invocation

**Endpoint**

`POST <Events API URL>/`

**Request Headers**

* `Event` - `string` - `"invoke"`
* `Function-ID` - `string` - required, ID of a function to call
* `Space` - `string` - space name, default: `default`

**Request**

arbitrary payload, invoked function receives an event in above schema, where request payload is passed as `data` field

**Response**

Status code:

* `200 OK` with payload returned by invoked function
* `404 Not Found` if there is no function registered or `invoke` subscription created for requested function
* `500 Internal Server Error` if the function invocation failed

### CORS

Events API supports CORS requests which means that any origin can emit a custom event. In case of `http` events CORS is
Expand Down
39 changes: 0 additions & 39 deletions docs/openapi/openapi-events-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,3 @@ paths:
responses:
202:
description: "event accepted"

/{invokeEventSubscriptionPath}:
post:
summary: "Sync function invocation"
operationId: "InvokeFunction"
parameters:
- name: invokeEventSubscriptionPath
in: path
description: invoke event subscription path
required: true
schema:
type: string
- name: Event
in: header
description: "'invoke' event type"
required: true
schema:
type: string
default: invoke
- name: Function-ID
in: header
description: function identifier
required: true
schema:
type: string
- name: Space
in: header
description: space name
required: false
schema:
type: string
default: default
responses:
200:
description: "function invoked successfully, response body contains payload returned by the function"
404:
description: "no function registered or 'invoke' subscription created for requested function"
500:
description: "function invocation failed"
2 changes: 1 addition & 1 deletion docs/reliability-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Events are not durable

The event received by Event Gateway is stored only in memory, it's not persisted to disk before processing. This means that in case of hardware failure or software crash the event may not be delivered to the subscriber. For a synchronous subscription (`http` or `invoke` event) it can manifest as error message returned to the requester. For asynchronous custom event with multiple subscribers it means that the event may not be delivered to all of the subscribers.
The event received by Event Gateway is stored only in memory, it's not persisted to disk before processing. This means that in case of hardware failure or software crash the event may not be delivered to the subscriber. For a synchronous subscription (`http`) it can manifest as error message returned to the requester. For asynchronous custom event with multiple subscribers it means that the event may not be delivered to all of the subscribers.

## Events are delivered _at most once_

Expand Down
3 changes: 0 additions & 3 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import (
type Type string

const (
// TypeInvoke is a special type of event for sync function invocation.
TypeInvoke = Type("invoke")

// TypeHTTP is a special type of event for sync http subscriptions.
TypeHTTP = Type("http")

Expand Down
25 changes: 0 additions & 25 deletions internal/cache/subscription_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ type subscriptionCache struct {
eventToFunctions map[string]map[eventpkg.Type][]libkv.FunctionKey
// endpoints maps HTTP method to internal/pathtree. Tree struct which is used for resolving HTTP requests paths.
endpoints map[string]*pathtree.Node
// invokable stores functions that have invoke subscription
invokable map[string]map[libkv.FunctionKey]struct{}
log *zap.Logger
}

func newSubscriptionCache(log *zap.Logger) *subscriptionCache {
return &subscriptionCache{
eventToFunctions: map[string]map[eventpkg.Type][]libkv.FunctionKey{},
endpoints: map[string]*pathtree.Node{},
invokable: map[string]map[libkv.FunctionKey]struct{}{},
log: log,
}
}
Expand Down Expand Up @@ -56,15 +53,6 @@ func (c *subscriptionCache) Modified(k string, v []byte) {
if err != nil {
c.log.Error("Could not add path to the tree.", zap.Error(err), zap.String("path", s.Path), zap.String("method", s.Method))
}
} else if s.Event == eventpkg.TypeInvoke {
fnSet, exists := c.invokable[s.Path]
if exists {
fnSet[key] = struct{}{}
} else {
fnSet := map[libkv.FunctionKey]struct{}{}
fnSet[key] = struct{}{}
c.invokable[s.Path] = fnSet
}
} else {
c.createPath(s.Path)
ids, exists := c.eventToFunctions[s.Path][s.Event]
Expand All @@ -90,8 +78,6 @@ func (c *subscriptionCache) Deleted(k string, v []byte) {

if oldSub.Event == eventpkg.TypeHTTP {
c.deleteEndpoint(oldSub)
} else if oldSub.Event == eventpkg.TypeInvoke {
c.deleteInvokable(oldSub)
} else {
c.deleteSubscription(oldSub)
}
Expand All @@ -115,17 +101,6 @@ func (c *subscriptionCache) deleteEndpoint(sub subscription.Subscription) {
}
}

func (c *subscriptionCache) deleteInvokable(sub subscription.Subscription) {
fnSet, exists := c.invokable[sub.Path]
if exists {
delete(fnSet, libkv.FunctionKey{Space: sub.Space, ID: sub.FunctionID})

if len(fnSet) == 0 {
delete(c.invokable, sub.Path)
}
}
}

func (c *subscriptionCache) deleteSubscription(sub subscription.Subscription) {
ids, exists := c.eventToFunctions[sub.Path][sub.Event]
if exists {
Expand Down
22 changes: 0 additions & 22 deletions internal/cache/subscription_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,3 @@ func TestSubscriptionCacheModifiedEventsDeletedLast(t *testing.T) {

assert.Equal(t, []libkv.FunctionKey(nil), scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedInvokable(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "space": "space1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))
scache.Modified("testsub2", []byte(`{"subscriptionId":"testsub2", "space": "space1", "event": "invoke", "functionId": "testfunc2", "path": "/"}`))

_, exists := scache.invokable["/"][libkv.FunctionKey{Space: "space1", ID: function.ID("testfunc1")}]
assert.Equal(t, true, exists)
_, exists = scache.invokable["/"][libkv.FunctionKey{Space: "space1", ID: function.ID("testfunc2")}]
assert.Equal(t, true, exists)
}

func TestSubscriptionCacheModifiedInvokableDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "space": "space1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))
scache.Deleted("testsub1", []byte(`{"subscriptionId":"testsub1", "space": "space1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))

_, exists := scache.invokable["/"][libkv.FunctionKey{Space: "space1", ID: function.ID("testfunc1")}]
assert.Equal(t, false, exists)
}
9 changes: 0 additions & 9 deletions internal/cache/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ func (tc *Target) HTTPBackingFunction(method, path string) (string, *function.ID
return root.Resolve(path)
}

// InvokableFunction returns function ID for handling invoke sync event.
func (tc *Target) InvokableFunction(path string, space string, id function.ID) bool {
tc.subscriptionCache.RLock()
defer tc.subscriptionCache.RUnlock()

_, exists := tc.subscriptionCache.invokable[path][libkv.FunctionKey{Space: space, ID: id}]
return exists
}

// Function takes a function ID and returns a deserialized instance of that function, if it exists
func (tc *Target) Function(space string, id function.ID) *function.Function {
tc.functionCache.RLock()
Expand Down
28 changes: 0 additions & 28 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,34 +290,6 @@ func (router *Router) handleHTTPEvent(event *eventpkg.Event, w http.ResponseWrit
metricEventsProcessed.WithLabelValues(space, "http").Inc()
}

func (router *Router) handleInvokeEvent(space string, functionID function.ID, path string, event *eventpkg.Event, w http.ResponseWriter) {
encoder := json.NewEncoder(w)

if !router.targetCache.InvokableFunction(path, space, functionID) {
w.WriteHeader(http.StatusNotFound)
w.Header().Set("Content-Type", "application/json")
encoder.Encode(&httpapi.Response{Errors: []httpapi.Error{{Message: "function or subscription not found"}}})
return
}

resp, err := router.callFunction(space, functionID, *event)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
message := determineErrorMessage(err)
encoder.Encode(&httpapi.Response{Errors: []httpapi.Error{{Message: message}}})
return
}

_, err = w.Write(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
encoder.Encode(&httpapi.Response{Errors: []httpapi.Error{{Message: err.Error()}}})
return
}
}

func determineErrorMessage(err error) string {
message := "Function call failed. Please check logs."
if accessError, ok := err.(*function.ErrFunctionAccessDenied); ok {
Expand Down
1 change: 0 additions & 1 deletion router/targeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
// Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions.
type Targeter interface {
HTTPBackingFunction(method, path string) (string, *function.ID, pathtree.Params, *subscription.CORS)
InvokableFunction(path string, space string, id function.ID) bool
Function(space string, id function.ID) *function.Function
SubscribersOfEvent(path string, eventType event.Type) []FunctionInfo
}
Expand Down