Skip to content

Commit

Permalink
add flag for configuring backlog lenght (#349)
Browse files Browse the repository at this point in the history
* add flag for configuring backlog lenght

* fix comment

* typo
  • Loading branch information
mthenw authored Nov 22, 2017
1 parent 52b52ff commit 0ac2643
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func main() {
eventsTLSCrt := flag.String("events-tls-cert", "", "Path to events API TLS certificate file.")
eventsTLSKey := flag.String("events-tls-key", "", "Path to events API TLS key file.")
workersNumber := flag.Uint("workers", 100, "Number of workers processing incoming events.")
workersBacklog := flag.Uint("workers-backlog", 200, "Length of workers backlog. Maximum number of events that wait for processing.")
plugins := paths{}
flag.Var(&plugins, "plugin", "Path to a plugin to load.")
flag.Parse()
Expand Down Expand Up @@ -84,7 +85,7 @@ func main() {
}

targetCache := cache.NewTarget("/serverless-event-gateway", kv, log)
router := router.New(*workersNumber, targetCache, pluginManager, log)
router := router.New(*workersNumber, *workersBacklog, targetCache, pluginManager, log)
router.StartWorkers()

api.StartEventsAPI(httpapi.Config{
Expand Down
2 changes: 1 addition & 1 deletion router/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func get(url string) (int, http.Header, string) {
func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) {
targetCache := cache.NewTarget("/serverless-event-gateway", kvstore, log)

router := New(10, targetCache, plugin.NewManager([]string{}, log), log)
router := New(10, 10, targetCache, plugin.NewManager([]string{}, log), log)
return router, httptest.NewServer(router)
}

Expand Down
8 changes: 5 additions & 3 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ type Router struct {
plugins *plugin.Manager
log *zap.Logger
workersNumber uint
backlogLength uint
drain chan struct{}
drainWaitGroup sync.WaitGroup
active bool
backlog chan backlogEvent
}

// New instantiates a new Router
func New(workersNumber uint, targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router {
func New(workersNumber uint, backlogLength uint, targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router {
return &Router{
targetCache: targetCache,
plugins: plugins,
log: log,
workersNumber: workersNumber,
backlogLength: backlogLength,
drain: make(chan struct{}),
backlog: nil,
}
Expand Down Expand Up @@ -101,10 +103,10 @@ func (router *Router) StartWorkers() {
router.active = true

if router.backlog == nil {
router.backlog = make(chan backlogEvent, router.workersNumber*2)
router.backlog = make(chan backlogEvent, router.backlogLength)
}

router.log.Debug("Starting processing workers.", zap.Uint("workers", router.workersNumber))
router.log.Debug("Starting processing workers.", zap.Uint("workers", router.workersNumber), zap.Uint("backlog", router.backlogLength))
for i := 0; i < int(router.workersNumber); i++ {
router.drainWaitGroup.Add(1)
go router.loop()
Expand Down
2 changes: 1 addition & 1 deletion router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestRouterServeHTTP_AllowCORSPreflightForCustomEvents(t *testing.T) {
func testrouter(target Targeter) *Router {
log := zap.NewNop()
plugins := plugin.NewManager([]string{}, log)
router := New(10, target, plugins, log)
router := New(10, 10, target, plugins, log)
router.StartWorkers()
return router
}

0 comments on commit 0ac2643

Please sign in to comment.