diff --git a/.travis.yml b/.travis.yml index af8a158..6a70387 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,8 +5,7 @@ cache: directories: - vendor install: -- go get -u github.com/hashicorp/go-plugin -- go get -u github.com/hashicorp/go-hclog +- go get -u github.com/hashicorp/{go-plugin,go-hclog} - go get -u golang.org/x/net/{context,http2,trace} - go get -u github.com/golang/dep/cmd/dep - dep ensure diff --git a/cmd/event-gateway/main.go b/cmd/event-gateway/main.go index 4c97c0b..f20f725 100644 --- a/cmd/event-gateway/main.go +++ b/cmd/event-gateway/main.go @@ -44,6 +44,7 @@ func main() { eventsPort := flag.Uint("events-port", 4000, "Port to serve events API on.") eventsTLSCrt := flag.String("events-tls-cert", "", "Path to events API TLS certificate file.") eventsTLSKey := flag.String("events-tls-key", "", "Path to events API TLS key file.") + workersNumber := flag.Uint("workers", 100, "Number of workers processing incoming events.") plugins := paths{} flag.Var(&plugins, "plugin", "Path to a plugin to load.") flag.Parse() @@ -83,7 +84,7 @@ func main() { } targetCache := cache.NewTarget("/serverless-event-gateway", kv, log) - router := router.New(targetCache, pluginManager, log) + router := router.New(*workersNumber, targetCache, pluginManager, log) router.StartWorkers() api.StartEventsAPI(httpapi.Config{ diff --git a/router/integration_test.go b/router/integration_test.go index 9a8b4fe..127f61a 100644 --- a/router/integration_test.go +++ b/router/integration_test.go @@ -195,7 +195,7 @@ func get(url string) (int, http.Header, string) { func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) { targetCache := cache.NewTarget("/serverless-event-gateway", kvstore, log) - router := New(targetCache, plugin.NewManager([]string{}, log), log) + router := New(10, targetCache, plugin.NewManager([]string{}, log), log) return router, httptest.NewServer(router) } diff --git a/router/router.go b/router/router.go index 34eb6dd..e47adf2 100644 --- a/router/router.go +++ b/router/router.go @@ -25,7 +25,7 @@ type Router struct { targetCache Targeter plugins *plugin.Manager log *zap.Logger - workerNumber uint + workersNumber uint drain chan struct{} drainWaitGroup sync.WaitGroup active bool @@ -33,14 +33,14 @@ type Router struct { } // New instantiates a new Router -func New(targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router { +func New(workersNumber uint, targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router { return &Router{ - targetCache: targetCache, - plugins: plugins, - log: log, - workerNumber: 20, - drain: make(chan struct{}), - backlog: nil, + targetCache: targetCache, + plugins: plugins, + log: log, + workersNumber: workersNumber, + drain: make(chan struct{}), + backlog: nil, } } @@ -101,10 +101,11 @@ func (router *Router) StartWorkers() { router.active = true if router.backlog == nil { - router.backlog = make(chan backlogEvent, router.workerNumber*2) + router.backlog = make(chan backlogEvent, router.workersNumber*2) } - for i := 0; i < int(router.workerNumber); i++ { + router.log.Debug("Starting processing workers.", zap.Uint("workers", router.workersNumber)) + for i := 0; i < int(router.workersNumber); i++ { router.drainWaitGroup.Add(1) go router.loop() } diff --git a/router/router_test.go b/router/router_test.go index 5e0f291..4aa37b3 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -144,7 +144,7 @@ func TestRouterServeHTTP_AllowCORSPreflightForCustomEvents(t *testing.T) { func testrouter(target Targeter) *Router { log := zap.NewNop() plugins := plugin.NewManager([]string{}, log) - router := New(target, plugins, log) + router := New(10, target, plugins, log) router.StartWorkers() return router }