Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subscription support for invoke event #355

Merged
merged 1 commit into from
Jan 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ the data block is base64 encoded.

#### Invoke Event

`invoke` is a built-in type of event allowing to call functions synchronously.
`invoke` is a built-in event type allowing synchronous invocations. Function will react to this event only if there is a
subscription created beforehand.

### Emit a Custom Event

Expand Down
77 changes: 55 additions & 22 deletions internal/cache/subscription_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ type subscriptionCache struct {
eventToFunctions map[string]map[event.Type][]functions.FunctionID
// endpoints maps HTTP method to internal/pathtree. Tree struct which is used for resolving HTTP requests paths.
endpoints map[string]*pathtree.Node
// invokable stores functions that have invoke subscription
invokable map[string]map[functions.FunctionID]struct{}
log *zap.Logger
}

func newSubscriptionCache(log *zap.Logger) *subscriptionCache {
return &subscriptionCache{
eventToFunctions: map[string]map[event.Type][]functions.FunctionID{},
endpoints: map[string]*pathtree.Node{},
invokable: map[string]map[functions.FunctionID]struct{}{},
log: log,
}
}
Expand All @@ -50,6 +53,15 @@ func (c *subscriptionCache) Modified(k string, v []byte) {
if err != nil {
c.log.Error("Could not add path to the tree.", zap.Error(err), zap.String("path", s.Path), zap.String("method", s.Method))
}
} else if s.Event == event.TypeInvoke {
fnSet, exists := c.invokable[s.Path]
if exists {
fnSet[s.FunctionID] = struct{}{}
} else {
fnSet := map[functions.FunctionID]struct{}{}
fnSet[s.FunctionID] = struct{}{}
c.invokable[s.Path] = fnSet
}
} else {
c.createPath(s.Path)
ids, exists := c.eventToFunctions[s.Path][s.Event]
Expand All @@ -75,29 +87,11 @@ func (c *subscriptionCache) Deleted(k string, v []byte) {
}

if oldSub.Event == event.TypeHTTP {
root := c.endpoints[oldSub.Method]
if root == nil {
return
}
err := root.DeleteRoute(oldSub.Path)
if err != nil {
c.log.Error("Could not delete path from the tree.", zap.Error(err), zap.String("path", oldSub.Path), zap.String("method", oldSub.Method))
}
c.deleteEndpoint(oldSub)
} else if oldSub.Event == event.TypeInvoke {
c.deleteInvokable(oldSub)
} else {
ids, exists := c.eventToFunctions[oldSub.Path][oldSub.Event]
if exists {
for i, id := range ids {
if id == oldSub.FunctionID {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
c.eventToFunctions[oldSub.Path][oldSub.Event] = ids

if len(ids) == 0 {
delete(c.eventToFunctions[oldSub.Path], oldSub.Event)
}
}
c.deleteSubscription(oldSub)
}
}

Expand All @@ -107,3 +101,42 @@ func (c *subscriptionCache) createPath(path string) {
c.eventToFunctions[path] = map[event.Type][]functions.FunctionID{}
}
}

func (c *subscriptionCache) deleteEndpoint(sub subscriptions.Subscription) {
root := c.endpoints[sub.Method]
if root == nil {
return
}
err := root.DeleteRoute(sub.Path)
if err != nil {
c.log.Error("Could not delete path from the tree.", zap.Error(err), zap.String("path", sub.Path), zap.String("method", sub.Method))
}
}

func (c *subscriptionCache) deleteInvokable(sub subscriptions.Subscription) {
fnSet, exists := c.invokable[sub.Path]
if exists {
delete(fnSet, sub.FunctionID)

if len(fnSet) == 0 {
delete(c.invokable, sub.Path)
}
}
}

func (c *subscriptionCache) deleteSubscription(sub subscriptions.Subscription) {
ids, exists := c.eventToFunctions[sub.Path][sub.Event]
if exists {
for i, id := range ids {
if id == sub.FunctionID {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
c.eventToFunctions[sub.Path][sub.Event] = ids

if len(ids) == 0 {
delete(c.eventToFunctions[sub.Path], sub.Event)
}
}
}
32 changes: 27 additions & 5 deletions internal/cache/subscription_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
)

func TestSubscriptionCacheModified(t *testing.T) {
func TestSubscriptionCacheModifiedEvents(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "test.event", "functionId": "testfunc1", "path": "/"}`))
Expand Down Expand Up @@ -51,15 +51,15 @@ func TestSubscriptionCacheModifiedCORSConfiguration(t *testing.T) {
assert.Equal(t, &cors.CORS{Origins: []string{"http://example.com"}}, corsConfig)
}

func TestSubscriptionCacheModified_WrongPayload(t *testing.T) {
func TestSubscriptionCacheModifiedEventsWrongPayload(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub", []byte(`not json`))

assert.Equal(t, []functions.FunctionID(nil), scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedDeleted(t *testing.T) {
func TestSubscriptionCacheModifiedEventsDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "test.event", "functionId": "testfunc1", "path": "/"}`))
Expand All @@ -69,7 +69,7 @@ func TestSubscriptionCacheModifiedDeleted(t *testing.T) {
assert.Equal(t, []functions.FunctionID{functions.FunctionID("testfunc2")}, scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedDeletedHTTPSubscription(t *testing.T) {
func TestSubscriptionCacheModifiedHTTPSubscriptionDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "http", "functionId": "testfunc1", "path": "/", "method": "GET"}`))
Expand All @@ -79,11 +79,33 @@ func TestSubscriptionCacheModifiedDeletedHTTPSubscription(t *testing.T) {
assert.Nil(t, id)
}

func TestSubscriptionCacheModifiedDeletedLast(t *testing.T) {
func TestSubscriptionCacheModifiedEventsDeletedLast(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub", []byte(`{"subscriptionId":"testsub", "event": "test.event", "functionId": "testfunc", "path": "/"}`))
scache.Deleted("testsub", []byte(`{"subscriptionId":"testsub", "event": "test.event", "functionId": "testfunc", "path": "/"}`))

assert.Equal(t, []functions.FunctionID(nil), scache.eventToFunctions["/"]["test.event"])
}

func TestSubscriptionCacheModifiedInvokable(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))
scache.Modified("testsub2", []byte(`{"subscriptionId":"testsub2", "event": "invoke", "functionId": "testfunc2", "path": "/"}`))

_, exists := scache.invokable["/"][functions.FunctionID("testfunc1")]
assert.Equal(t, true, exists)
_, exists = scache.invokable["/"][functions.FunctionID("testfunc2")]
assert.Equal(t, true, exists)
}

func TestSubscriptionCacheModifiedInvokableDeleted(t *testing.T) {
scache := newSubscriptionCache(zap.NewNop())

scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))
scache.Deleted("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`))

_, exists := scache.invokable["/"][functions.FunctionID("testfunc1")]
assert.Equal(t, false, exists)
}
9 changes: 9 additions & 0 deletions internal/cache/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func (tc *Target) HTTPBackingFunction(method, path string) (*functions.FunctionI
return root.Resolve(path)
}

// InvokableFunction returns function ID for handling invoke sync event.
func (tc *Target) InvokableFunction(path string, functionID functions.FunctionID) bool {
tc.subscriptionCache.RLock()
defer tc.subscriptionCache.RUnlock()

_, exists := tc.subscriptionCache.invokable[path][functionID]
return exists
}

// Function takes a function ID and returns a deserialized instance of that function, if it exists
func (tc *Target) Function(functionID functions.FunctionID) *functions.Function {
tc.functionCache.RLock()
Expand Down
10 changes: 10 additions & 0 deletions router/mock/targetcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ func (_mr *_MockTargeterRecorder) HTTPBackingFunction(arg0, arg1 interface{}) *g
return _mr.mock.ctrl.RecordCall(_mr.mock, "HTTPBackingFunction", arg0, arg1)
}

func (_m *MockTargeter) InvokableFunction(_param0 string, _param1 functions.FunctionID) bool {
ret := _m.ctrl.Call(_m, "InvokableFunction", _param0, _param1)
ret0, _ := ret[0].(bool)
return ret0
}

func (_mr *_MockTargeterRecorder) InvokableFunction(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "InvokableFunction", arg0, arg1)
}

func (_m *MockTargeter) SubscribersOfEvent(_param0 string, _param1 event.Type) []functions.FunctionID {
ret := _m.ctrl.Call(_m, "SubscribersOfEvent", _param0, _param1)
ret0, _ := ret[0].([]functions.FunctionID)
Expand Down
12 changes: 9 additions & 3 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if event.Type == eventpkg.TypeInvoke {
routerEventsSyncReceived.Inc()
router.handleInvokeEvent(event, w, r)
router.handleInvokeEvent(path, event, w, r)
} else if !event.IsSystem() {
router.enqueueWork(path, event)
w.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -306,8 +305,15 @@ func (router *Router) handleHTTPEvent(event *eventpkg.Event, w http.ResponseWrit
}
}

func (router *Router) handleInvokeEvent(event *eventpkg.Event, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleInvokeEvent(path string, event *eventpkg.Event, w http.ResponseWriter, r *http.Request) {
routerEventsSyncReceived.Inc()

functionID := functions.FunctionID(r.Header.Get(headerFunctionID))
if !router.targetCache.InvokableFunction(path, functionID) {
http.Error(w, "function or subscription not found", http.StatusNotFound)
return
}

resp, err := router.callFunction(functionID, *event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
1 change: 1 addition & 0 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestRouterServeHTTP_InvokeEventFunctionNotFound(t *testing.T) {
defer ctrl.Finish()
target := mock.NewMockTargeter(ctrl)
target.EXPECT().Function(functions.FunctionID("testfunc")).Return(nil).MaxTimes(1)
target.EXPECT().InvokableFunction("/", functions.FunctionID("testfunc")).Return(true).MaxTimes(1)
target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]functions.FunctionID{}).MaxTimes(1)
router := testrouter(target)

Expand Down
1 change: 1 addition & 0 deletions router/targeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
// Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions.
type Targeter interface {
HTTPBackingFunction(method, path string) (*functions.FunctionID, pathtree.Params, *cors.CORS)
InvokableFunction(path string, functionID functions.FunctionID) bool
Function(functionID functions.FunctionID) *functions.Function
SubscribersOfEvent(path string, eventType event.Type) []functions.FunctionID
}