-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publish system events on space prefixed path for hosted version #485
Changes from all commits
0a1dc69
b477847
01e01ba
178604e
3cbdaa6
4f1d3ef
5770b28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
// +build !hosted | ||
|
||
package router | ||
|
||
func extractPath(host, path string) string { | ||
return path | ||
} | ||
|
||
func systemPathFromSpace(space string) string { | ||
return basePath | ||
} | ||
|
||
func systemPathFromPath(path string) string { | ||
return basePath | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
// +build hosted | ||
|
||
package router | ||
|
||
import ( | ||
"strings" | ||
) | ||
|
||
// extractPath extracts path from hosted EG host name (<space>.eventgateway([a-z-]*)?.io|slsgateway.com) | ||
func extractPath(host, path string) string { | ||
subdomain := strings.Split(host, ".")[0] | ||
return basePath + subdomain + path | ||
} | ||
|
||
func systemPathFromSpace(space string) string { | ||
return basePath + space + "/" | ||
} | ||
|
||
// systemPathFromPath constructs path from path on which event was emitted. Helpful for "event.received" system event. | ||
func systemPathFromPath(path string) string { | ||
return basePath + strings.Split(path, "/")[1] + "/" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,8 +4,6 @@ import ( | |
"encoding/json" | ||
"errors" | ||
"net/http" | ||
"regexp" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/jinzhu/copier" | ||
|
@@ -22,6 +20,7 @@ import ( | |
|
||
const ( | ||
mimeJSON = "application/json" | ||
basePath = "/" | ||
) | ||
|
||
// Router calls a target function when an endpoint is hit, and handles pubsub message delivery. | ||
|
@@ -163,8 +162,6 @@ func (router *Router) Drain() { | |
router.Unlock() | ||
} | ||
|
||
const hostedDomain = "(eventgateway([a-z-]*)?.io|slsgateway.com)" | ||
|
||
var ( | ||
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function") | ||
) | ||
|
@@ -242,7 +239,7 @@ func (router *Router) httpRequestHandler(space string, backingFunction function. | |
// handleAsyncSubscriptions fetched events subscribers, runs authorization and enqueues event in the queue | ||
func (router *Router) handleAsyncSubscriptions(method, path string, event eventpkg.Event, r *http.Request) { | ||
if event.IsSystem() { | ||
router.log.Debug("System event received.", zap.Object("event", event)) | ||
router.log.Debug("System event received.", zap.String("path", path), zap.Object("event", event)) | ||
} | ||
|
||
subscribers := router.targetCache.AsyncSubscribers(method, path, event.EventType) | ||
|
@@ -458,7 +455,7 @@ func (router *Router) emitSystemEventReceived(path string, event eventpkg.Event, | |
mimeJSON, | ||
eventpkg.SystemEventReceivedData{Path: path, Event: event, Headers: ihttp.FlattenHeader(header)}, | ||
) | ||
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil) | ||
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromPath(path), *system, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be more idiomatic to use something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason for helper function is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, that makes a lot of sense |
||
return router.plugins.React(system) | ||
} | ||
|
||
|
@@ -468,7 +465,7 @@ func (router *Router) emitSystemFunctionInvoking(space string, functionID functi | |
mimeJSON, | ||
eventpkg.SystemFunctionInvokingData{Space: space, FunctionID: functionID, Event: event}, | ||
) | ||
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil) | ||
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromSpace(space), *system, nil) | ||
|
||
metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokingType)).Inc() | ||
|
||
|
@@ -480,7 +477,7 @@ func (router *Router) emitSystemFunctionInvoked(space string, functionID functio | |
eventpkg.SystemFunctionInvokedType, | ||
mimeJSON, | ||
eventpkg.SystemFunctionInvokedData{Space: space, FunctionID: functionID, Event: event, Result: result}) | ||
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil) | ||
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromSpace(space), *system, nil) | ||
|
||
metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvokedType)).Inc() | ||
|
||
|
@@ -493,7 +490,7 @@ func (router *Router) emitSystemFunctionInvocationFailed(space string, functionI | |
eventpkg.SystemFunctionInvocationFailedType, | ||
mimeJSON, | ||
eventpkg.SystemFunctionInvocationFailedData{Space: space, FunctionID: functionID, Event: event, Error: err}) | ||
router.handleAsyncSubscriptions(http.MethodPost, "/", *system, nil) | ||
router.handleAsyncSubscriptions(http.MethodPost, systemPathFromSpace(space), *system, nil) | ||
|
||
metricEventsReceived.WithLabelValues(space, string(eventpkg.SystemFunctionInvocationFailedType)).Inc() | ||
} | ||
|
@@ -532,16 +529,6 @@ func determineErrorMessage(err error) string { | |
return message | ||
} | ||
|
||
func extractPath(host, path string) string { | ||
extracted := path | ||
rxp, _ := regexp.Compile(hostedDomain) | ||
if rxp.MatchString(host) { | ||
subdomain := strings.Split(host, ".")[0] | ||
extracted = "/" + subdomain + path | ||
} | ||
return extracted | ||
} | ||
|
||
type backlogEvent struct { | ||
space string | ||
functionID function.ID | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// +build hosted | ||
|
||
package router_test | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/serverless/event-gateway/event" | ||
"github.com/serverless/event-gateway/plugin" | ||
"github.com/serverless/event-gateway/router" | ||
"github.com/serverless/event-gateway/router/mock" | ||
"go.uber.org/zap" | ||
) | ||
|
||
func TestHostedRouterServeHTTP(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
defer ctrl.Finish() | ||
target := mock.NewMockTargeter(ctrl) | ||
|
||
t.Run("emit system event 'event received' on path prefixed with space", func(t *testing.T) { | ||
target.EXPECT().CORS(gomock.Any(), gomock.Any()).Return(nil) | ||
target.EXPECT().SyncSubscriber(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) | ||
target.EXPECT().AsyncSubscribers(gomock.Any(), gomock.Any(), event.TypeName("http.request")).Return([]router.AsyncSubscriber{}) | ||
|
||
target.EXPECT().AsyncSubscribers(http.MethodPost, "/custom/", event.SystemEventReceivedType).Return([]router.AsyncSubscriber{}) | ||
|
||
router := setupTestRouter(target) | ||
req, _ := http.NewRequest(http.MethodGet, "https://custom.slsgateway.com/foo/bar", nil) | ||
recorder := httptest.NewRecorder() | ||
router.ServeHTTP(recorder, req) | ||
}) | ||
|
||
t.Run("extract path from hosted domain", func(t *testing.T) { | ||
target.EXPECT().CORS(gomock.Any(), gomock.Any()).Return(nil) | ||
target.EXPECT().AsyncSubscribers(gomock.Any(), gomock.Any(), event.SystemEventReceivedType).Return([]router.AsyncSubscriber{}) | ||
|
||
target.EXPECT().SyncSubscriber(http.MethodGet, "/custom/test", event.TypeName("http.request")).Return(nil) | ||
target.EXPECT().AsyncSubscribers(http.MethodGet, "/custom/test", event.TypeName("http.request")).Return([]router.AsyncSubscriber{}) | ||
|
||
router := setupTestRouter(target) | ||
req, _ := http.NewRequest(http.MethodGet, "https://custom.slsgateway.com/test", nil) | ||
recorder := httptest.NewRecorder() | ||
router.ServeHTTP(recorder, req) | ||
}) | ||
} | ||
|
||
func setupTestRouter(target router.Targeter) *router.Router { | ||
log := zap.NewNop() | ||
plugins := plugin.NewManager([]string{}, log) | ||
router := router.New(10, 10, target, plugins, log) | ||
router.StartWorkers() | ||
return router | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we put the
coverprofile
straight tocoverage.txt
?