From 77716ed01850df848d958be96c5268661de59b70 Mon Sep 17 00:00:00 2001 From: Maciej Winnicki Date: Tue, 30 Jan 2018 13:18:27 +0100 Subject: [PATCH] add subscription support for invoke event --- README.md | 3 +- internal/cache/subscription_cache.go | 77 ++++++++++++++++------- internal/cache/subscription_cache_test.go | 32 ++++++++-- internal/cache/target.go | 9 +++ router/mock/targetcache.go | 10 +++ router/router.go | 12 +++- router/router_test.go | 1 + router/targeter.go | 1 + 8 files changed, 114 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 6964b3d..5e22318 100644 --- a/README.md +++ b/README.md @@ -281,7 +281,8 @@ the data block is base64 encoded. #### Invoke Event -`invoke` is a built-in type of event allowing to call functions synchronously. +`invoke` is a built-in event type allowing synchronous invocations. Function will react to this event only if there is a +subscription created beforehand. ### Emit a Custom Event diff --git a/internal/cache/subscription_cache.go b/internal/cache/subscription_cache.go index 4f4cdfb..33ab462 100644 --- a/internal/cache/subscription_cache.go +++ b/internal/cache/subscription_cache.go @@ -17,6 +17,8 @@ type subscriptionCache struct { eventToFunctions map[string]map[event.Type][]functions.FunctionID // endpoints maps HTTP method to internal/pathtree. Tree struct which is used for resolving HTTP requests paths. endpoints map[string]*pathtree.Node + // invokable stores functions that have invoke subscription + invokable map[string]map[functions.FunctionID]struct{} log *zap.Logger } @@ -24,6 +26,7 @@ func newSubscriptionCache(log *zap.Logger) *subscriptionCache { return &subscriptionCache{ eventToFunctions: map[string]map[event.Type][]functions.FunctionID{}, endpoints: map[string]*pathtree.Node{}, + invokable: map[string]map[functions.FunctionID]struct{}{}, log: log, } } @@ -50,6 +53,15 @@ func (c *subscriptionCache) Modified(k string, v []byte) { if err != nil { c.log.Error("Could not add path to the tree.", zap.Error(err), zap.String("path", s.Path), zap.String("method", s.Method)) } + } else if s.Event == event.TypeInvoke { + fnSet, exists := c.invokable[s.Path] + if exists { + fnSet[s.FunctionID] = struct{}{} + } else { + fnSet := map[functions.FunctionID]struct{}{} + fnSet[s.FunctionID] = struct{}{} + c.invokable[s.Path] = fnSet + } } else { c.createPath(s.Path) ids, exists := c.eventToFunctions[s.Path][s.Event] @@ -75,29 +87,11 @@ func (c *subscriptionCache) Deleted(k string, v []byte) { } if oldSub.Event == event.TypeHTTP { - root := c.endpoints[oldSub.Method] - if root == nil { - return - } - err := root.DeleteRoute(oldSub.Path) - if err != nil { - c.log.Error("Could not delete path from the tree.", zap.Error(err), zap.String("path", oldSub.Path), zap.String("method", oldSub.Method)) - } + c.deleteEndpoint(oldSub) + } else if oldSub.Event == event.TypeInvoke { + c.deleteInvokable(oldSub) } else { - ids, exists := c.eventToFunctions[oldSub.Path][oldSub.Event] - if exists { - for i, id := range ids { - if id == oldSub.FunctionID { - ids = append(ids[:i], ids[i+1:]...) - break - } - } - c.eventToFunctions[oldSub.Path][oldSub.Event] = ids - - if len(ids) == 0 { - delete(c.eventToFunctions[oldSub.Path], oldSub.Event) - } - } + c.deleteSubscription(oldSub) } } @@ -107,3 +101,42 @@ func (c *subscriptionCache) createPath(path string) { c.eventToFunctions[path] = map[event.Type][]functions.FunctionID{} } } + +func (c *subscriptionCache) deleteEndpoint(sub subscriptions.Subscription) { + root := c.endpoints[sub.Method] + if root == nil { + return + } + err := root.DeleteRoute(sub.Path) + if err != nil { + c.log.Error("Could not delete path from the tree.", zap.Error(err), zap.String("path", sub.Path), zap.String("method", sub.Method)) + } +} + +func (c *subscriptionCache) deleteInvokable(sub subscriptions.Subscription) { + fnSet, exists := c.invokable[sub.Path] + if exists { + delete(fnSet, sub.FunctionID) + + if len(fnSet) == 0 { + delete(c.invokable, sub.Path) + } + } +} + +func (c *subscriptionCache) deleteSubscription(sub subscriptions.Subscription) { + ids, exists := c.eventToFunctions[sub.Path][sub.Event] + if exists { + for i, id := range ids { + if id == sub.FunctionID { + ids = append(ids[:i], ids[i+1:]...) + break + } + } + c.eventToFunctions[sub.Path][sub.Event] = ids + + if len(ids) == 0 { + delete(c.eventToFunctions[sub.Path], sub.Event) + } + } +} diff --git a/internal/cache/subscription_cache_test.go b/internal/cache/subscription_cache_test.go index caa28eb..2dc4133 100644 --- a/internal/cache/subscription_cache_test.go +++ b/internal/cache/subscription_cache_test.go @@ -10,7 +10,7 @@ import ( "go.uber.org/zap" ) -func TestSubscriptionCacheModified(t *testing.T) { +func TestSubscriptionCacheModifiedEvents(t *testing.T) { scache := newSubscriptionCache(zap.NewNop()) scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "test.event", "functionId": "testfunc1", "path": "/"}`)) @@ -51,7 +51,7 @@ func TestSubscriptionCacheModifiedCORSConfiguration(t *testing.T) { assert.Equal(t, &cors.CORS{Origins: []string{"http://example.com"}}, corsConfig) } -func TestSubscriptionCacheModified_WrongPayload(t *testing.T) { +func TestSubscriptionCacheModifiedEventsWrongPayload(t *testing.T) { scache := newSubscriptionCache(zap.NewNop()) scache.Modified("testsub", []byte(`not json`)) @@ -59,7 +59,7 @@ func TestSubscriptionCacheModified_WrongPayload(t *testing.T) { assert.Equal(t, []functions.FunctionID(nil), scache.eventToFunctions["/"]["test.event"]) } -func TestSubscriptionCacheModifiedDeleted(t *testing.T) { +func TestSubscriptionCacheModifiedEventsDeleted(t *testing.T) { scache := newSubscriptionCache(zap.NewNop()) scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "test.event", "functionId": "testfunc1", "path": "/"}`)) @@ -69,7 +69,7 @@ func TestSubscriptionCacheModifiedDeleted(t *testing.T) { assert.Equal(t, []functions.FunctionID{functions.FunctionID("testfunc2")}, scache.eventToFunctions["/"]["test.event"]) } -func TestSubscriptionCacheModifiedDeletedHTTPSubscription(t *testing.T) { +func TestSubscriptionCacheModifiedHTTPSubscriptionDeleted(t *testing.T) { scache := newSubscriptionCache(zap.NewNop()) scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "http", "functionId": "testfunc1", "path": "/", "method": "GET"}`)) @@ -79,7 +79,7 @@ func TestSubscriptionCacheModifiedDeletedHTTPSubscription(t *testing.T) { assert.Nil(t, id) } -func TestSubscriptionCacheModifiedDeletedLast(t *testing.T) { +func TestSubscriptionCacheModifiedEventsDeletedLast(t *testing.T) { scache := newSubscriptionCache(zap.NewNop()) scache.Modified("testsub", []byte(`{"subscriptionId":"testsub", "event": "test.event", "functionId": "testfunc", "path": "/"}`)) @@ -87,3 +87,25 @@ func TestSubscriptionCacheModifiedDeletedLast(t *testing.T) { assert.Equal(t, []functions.FunctionID(nil), scache.eventToFunctions["/"]["test.event"]) } + +func TestSubscriptionCacheModifiedInvokable(t *testing.T) { + scache := newSubscriptionCache(zap.NewNop()) + + scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`)) + scache.Modified("testsub2", []byte(`{"subscriptionId":"testsub2", "event": "invoke", "functionId": "testfunc2", "path": "/"}`)) + + _, exists := scache.invokable["/"][functions.FunctionID("testfunc1")] + assert.Equal(t, true, exists) + _, exists = scache.invokable["/"][functions.FunctionID("testfunc2")] + assert.Equal(t, true, exists) +} + +func TestSubscriptionCacheModifiedInvokableDeleted(t *testing.T) { + scache := newSubscriptionCache(zap.NewNop()) + + scache.Modified("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`)) + scache.Deleted("testsub1", []byte(`{"subscriptionId":"testsub1", "event": "invoke", "functionId": "testfunc1", "path": "/"}`)) + + _, exists := scache.invokable["/"][functions.FunctionID("testfunc1")] + assert.Equal(t, false, exists) +} diff --git a/internal/cache/target.go b/internal/cache/target.go index 2d18739..7675ca9 100644 --- a/internal/cache/target.go +++ b/internal/cache/target.go @@ -35,6 +35,15 @@ func (tc *Target) HTTPBackingFunction(method, path string) (*functions.FunctionI return root.Resolve(path) } +// InvokableFunction returns function ID for handling invoke sync event. +func (tc *Target) InvokableFunction(path string, functionID functions.FunctionID) bool { + tc.subscriptionCache.RLock() + defer tc.subscriptionCache.RUnlock() + + _, exists := tc.subscriptionCache.invokable[path][functionID] + return exists +} + // Function takes a function ID and returns a deserialized instance of that function, if it exists func (tc *Target) Function(functionID functions.FunctionID) *functions.Function { tc.functionCache.RLock() diff --git a/router/mock/targetcache.go b/router/mock/targetcache.go index b5ab9d7..4131272 100644 --- a/router/mock/targetcache.go +++ b/router/mock/targetcache.go @@ -54,6 +54,16 @@ func (_mr *_MockTargeterRecorder) HTTPBackingFunction(arg0, arg1 interface{}) *g return _mr.mock.ctrl.RecordCall(_mr.mock, "HTTPBackingFunction", arg0, arg1) } +func (_m *MockTargeter) InvokableFunction(_param0 string, _param1 functions.FunctionID) bool { + ret := _m.ctrl.Call(_m, "InvokableFunction", _param0, _param1) + ret0, _ := ret[0].(bool) + return ret0 +} + +func (_mr *_MockTargeterRecorder) InvokableFunction(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "InvokableFunction", arg0, arg1) +} + func (_m *MockTargeter) SubscribersOfEvent(_param0 string, _param1 event.Type) []functions.FunctionID { ret := _m.ctrl.Call(_m, "SubscribersOfEvent", _param0, _param1) ret0, _ := ret[0].([]functions.FunctionID) diff --git a/router/router.go b/router/router.go index b5231f4..aaa2f65 100644 --- a/router/router.go +++ b/router/router.go @@ -80,8 +80,7 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if event.Type == eventpkg.TypeInvoke { - routerEventsSyncReceived.Inc() - router.handleInvokeEvent(event, w, r) + router.handleInvokeEvent(path, event, w, r) } else if !event.IsSystem() { router.enqueueWork(path, event) w.WriteHeader(http.StatusAccepted) @@ -306,8 +305,15 @@ func (router *Router) handleHTTPEvent(event *eventpkg.Event, w http.ResponseWrit } } -func (router *Router) handleInvokeEvent(event *eventpkg.Event, w http.ResponseWriter, r *http.Request) { +func (router *Router) handleInvokeEvent(path string, event *eventpkg.Event, w http.ResponseWriter, r *http.Request) { + routerEventsSyncReceived.Inc() + functionID := functions.FunctionID(r.Header.Get(headerFunctionID)) + if !router.targetCache.InvokableFunction(path, functionID) { + http.Error(w, "function or subscription not found", http.StatusNotFound) + return + } + resp, err := router.callFunction(functionID, *event) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/router/router_test.go b/router/router_test.go index aba4e1b..1ef2683 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -54,6 +54,7 @@ func TestRouterServeHTTP_InvokeEventFunctionNotFound(t *testing.T) { defer ctrl.Finish() target := mock.NewMockTargeter(ctrl) target.EXPECT().Function(functions.FunctionID("testfunc")).Return(nil).MaxTimes(1) + target.EXPECT().InvokableFunction("/", functions.FunctionID("testfunc")).Return(true).MaxTimes(1) target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]functions.FunctionID{}).MaxTimes(1) router := testrouter(target) diff --git a/router/targeter.go b/router/targeter.go index cd1a51c..cd323f2 100644 --- a/router/targeter.go +++ b/router/targeter.go @@ -10,6 +10,7 @@ import ( // Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions. type Targeter interface { HTTPBackingFunction(method, path string) (*functions.FunctionID, pathtree.Params, *cors.CORS) + InvokableFunction(path string, functionID functions.FunctionID) bool Function(functionID functions.FunctionID) *functions.Function SubscribersOfEvent(path string, eventType event.Type) []functions.FunctionID }