diff --git a/cmd/run.go b/cmd/run.go index fda928a5531..cd69ed043a7 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -23,6 +23,7 @@ import ( "go.k6.io/k6/cmd/state" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" "go.k6.io/k6/execution" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" @@ -39,6 +40,12 @@ type cmdRun struct { gs *state.GlobalState } +// We use an excessively high timeout to wait for event processing to complete, +// since prematurely proceeding before it is done could create bigger problems. +// In practice, this effectively acts as no timeout, and the user will have to +// kill k6 if a hang happens, which is the behavior without events anyway. +const waitEventDoneTimeout = 30 * time.Minute + // TODO: split apart some more // //nolint:funlen,gocognit,gocyclo,cyclop @@ -66,6 +73,26 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // from sub-contexts while also attaching a reason for the abort. runCtx, runAbort := execution.NewTestRunContext(lingerCtx, logger) + emitEvent := func(evt *event.Event) func() { + waitDone := c.gs.Events.Emit(evt) + return func() { + waitCtx, waitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) + defer waitCancel() + if werr := waitDone(waitCtx); werr != nil { + logger.WithError(werr).Warn() + } + } + } + + defer func() { + waitExitDone := emitEvent(&event.Event{ + Type: event.Exit, + Data: &event.ExitData{Error: err}, + }) + waitExitDone() + c.gs.Events.UnsubscribeAll() + }() + test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig) if err != nil { return err @@ -153,6 +180,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } + waitInitDone := emitEvent(&event.Event{Type: event.Init}) + // Create and start the outputs. We do it quite early to get any output URLs // or other details below. It also allows us to ensure when they have // flushed their samples and when they have stopped in the defer statements. @@ -300,10 +329,18 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } + waitInitDone() + + waitTestStartDone := emitEvent(&event.Event{Type: event.TestStart}) + waitTestStartDone() + // Start the test! However, we won't immediately return if there was an // error, we still have things to do. err = execScheduler.Run(globalCtx, runCtx, samples) + waitTestEndDone := emitEvent(&event.Event{Type: event.TestEnd}) + defer waitTestEndDone() + // Init has passed successfully, so unless disabled, make sure we send a // usage report after the context is done. if !conf.NoUsageReport.Bool { diff --git a/cmd/state/state.go b/cmd/state/state.go index f0595d7770e..3facd99d019 100644 --- a/cmd/state/state.go +++ b/cmd/state/state.go @@ -12,6 +12,7 @@ import ( "github.com/mattn/go-isatty" "github.com/sirupsen/logrus" + "go.k6.io/k6/event" "go.k6.io/k6/lib/fsext" "go.k6.io/k6/ui/console" ) @@ -39,6 +40,7 @@ type GlobalState struct { BinaryName string CmdArgs []string Env map[string]string + Events *event.System DefaultFlags, Flags GlobalFlags @@ -110,6 +112,7 @@ func NewGlobalState(ctx context.Context) *GlobalState { BinaryName: filepath.Base(binary), CmdArgs: os.Args, Env: env, + Events: event.NewEventSystem(100, logger), DefaultFlags: defaultFlags, Flags: getFlags(defaultFlags, env), OutMutex: outMutex, diff --git a/cmd/test_load.go b/cmd/test_load.go index bf7378dda74..6dd1d6b6665 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -70,6 +70,7 @@ func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loaded RuntimeOptions: runtimeOptions, Registry: registry, BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + Events: gs.Events, LookupEnv: func(key string) (string, bool) { val, ok := gs.Env[key] return val, ok diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index ae11a58311e..0df7021af5d 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -13,6 +13,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -23,7 +24,10 @@ import ( "github.com/tidwall/gjson" "go.k6.io/k6/cloudapi" "go.k6.io/k6/cmd" + "go.k6.io/k6/cmd/tests/events" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" + "go.k6.io/k6/js/modules" "go.k6.io/k6/lib/consts" "go.k6.io/k6/lib/fsext" "go.k6.io/k6/lib/testutils" @@ -42,7 +46,6 @@ func TestVersion(t *testing.T) { assert.Contains(t, stdout, runtime.Version()) assert.Contains(t, stdout, runtime.GOOS) assert.Contains(t, stdout, runtime.GOARCH) - assert.NotContains(t, stdout[:len(stdout)-1], "\n") assert.Empty(t, ts.Stderr.Bytes()) assert.Empty(t, ts.LoggerHook.Drain()) @@ -1534,7 +1537,7 @@ func TestMinIterationDuration(t *testing.T) { elapsed := time.Since(start) assert.Greater(t, elapsed, 7*time.Second, "expected more time to have passed because of minIterationDuration") assert.Less( - t, elapsed, 14*time.Second, + t, elapsed, 15*time.Second, "expected less time to have passed because minIterationDuration should not affect setup() and teardown() ", ) @@ -1998,3 +2001,230 @@ func TestBadLogOutput(t *testing.T) { }) } } + +// HACK: We need this so multiple tests can register differently named modules. +var uniqueModuleNumber uint64 //nolint:gochecknoglobals + +// Tests that the appropriate events are emitted in the correct order. +func TestEventSystemOK(t *testing.T) { + t.Parallel() + + ts := NewGlobalTestState(t) + + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + mod := events.New(event.GlobalEvents, event.VUEvents) + modules.Register(moduleName, mod) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` + import events from '%s'; + import { sleep } from 'k6'; + + export let options = { + vus: 1, + iterations: 5, + } + + export default function () { sleep(1); } + `, moduleName))) + + cmd.ExecuteWithGlobalState(ts.GlobalState) + + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("timed out") + } + + expLog := []string{ + `got event Init with data ''`, + `got event TestStart with data ''`, + `got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:2 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:2 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:3 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:3 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:4 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:4 VUID:1 ScenarioName:default Error:}'`, + `got event TestEnd with data ''`, + `got event Exit with data '&{Error:}'`, + } + log := ts.LoggerHook.Lines() + assert.Equal(t, expLog, log) +} + +// Check emitted events in the case of a script error. +func TestEventSystemError(t *testing.T) { + t.Parallel() + + testCases := []struct { + name, script string + expLog []string + expExitCode exitcodes.ExitCode + }{ + { + name: "abort", + script: ` + import { test } from 'k6/execution'; + + export let options = { + vus: 1, + iterations: 5, + } + + export default function () { + test.abort('oops!'); + } + `, expLog: []string{ + "got event Init with data ''", + "got event TestStart with data ''", + "got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'", + "got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:test aborted: oops! at file:///-:11:16(6)}'", + "got event TestEnd with data ''", + "got event Exit with data '&{Error:test aborted: oops! at file:///-:11:16(6)}'", + "test aborted: oops! at file:///-:11:16(6)", + }, + expExitCode: exitcodes.ScriptAborted, + }, + { + name: "init", + script: "undefinedVar", + expLog: []string{ + "got event Exit with data '&{Error:could not initialize '-': could not load JS test " + + "'file:///-': ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n}'", + "ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n", + }, + expExitCode: exitcodes.ScriptException, + }, + { + name: "throw", + script: ` + export let options = { + vus: 1, + iterations: 2, + } + + export default function () { + throw new Error('oops!'); + } + `, expLog: []string{ + "got event Init with data ''", + "got event TestStart with data ''", + "got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'", + "got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'", + "Error: oops!\n\tat file:///-:9:11(3)\n", + "got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:}'", + "got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'", + "Error: oops!\n\tat file:///-:9:11(3)\n", + "got event TestEnd with data ''", + "got event Exit with data '&{Error:}'", + }, + expExitCode: 0, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ts := NewGlobalTestState(t) + + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + mod := events.New(event.GlobalEvents, event.VUEvents) + modules.Register(moduleName, mod) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.ExpectedExitCode = int(tc.expExitCode) + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf("import events from '%s';\n%s", moduleName, tc.script))) + + cmd.ExecuteWithGlobalState(ts.GlobalState) + + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("timed out") + } + + log := ts.LoggerHook.Lines() + assert.Equal(t, tc.expLog, log) + }) + } +} + +func BenchmarkRun(b *testing.B) { + b.StopTimer() + + for i := 0; i < b.N; i++ { + ts := NewGlobalTestState(b) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.Stdin = bytes.NewBuffer([]byte(` + export let options = { + vus: 10, + iterations: 100, + } + + export default function () {} + `)) + ts.ExpectedExitCode = 0 + + b.StartTimer() + cmd.ExecuteWithGlobalState(ts.GlobalState) + b.StopTimer() + } +} + +func BenchmarkRunEvents(b *testing.B) { + b.StopTimer() + + for i := 0; i < b.N; i++ { + ts := NewGlobalTestState(b) + + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + mod := events.New(event.GlobalEvents, event.VUEvents) + modules.Register(moduleName, mod) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` + import events from '%s'; + export let options = { + vus: 10, + iterations: 100, + } + + export default function () {} + `, moduleName))) + ts.ExpectedExitCode = 0 + + b.StartTimer() + cmd.ExecuteWithGlobalState(ts.GlobalState) + b.StopTimer() + + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + case <-time.After(time.Second): + b.Fatal("timed out") + } + } +} diff --git a/cmd/tests/events/events.go b/cmd/tests/events/events.go new file mode 100644 index 00000000000..8db59e1ca8f --- /dev/null +++ b/cmd/tests/events/events.go @@ -0,0 +1,106 @@ +// Package events is used for testing the event functionality. +package events + +import ( + "sync" + + "go.k6.io/k6/event" + "go.k6.io/k6/js/modules" +) + +// RootModule is the global module instance that will create module +// instances for each VU. +type RootModule struct { + initOnce sync.Once + globalEvents, vuEvents []event.Type + // Used by the test function to wait for all event handler goroutines to exit, + // to avoid dangling goroutines. + WG sync.WaitGroup + // Closed by the global event handler once the Exit event is received, and + // used as a signal for VU event handlers to also exit. + exit chan struct{} +} + +// Events represents an instance of the events module. +type Events struct{} + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &Events{} +) + +// New returns a pointer to a new RootModule instance. +func New(globalEvents, vuEvents []event.Type) *RootModule { + return &RootModule{ + initOnce: sync.Once{}, + exit: make(chan struct{}), + globalEvents: globalEvents, + vuEvents: vuEvents, + } +} + +// NewModuleInstance implements the modules.Module interface to return +// a new instance for each VU. +func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { + rm.initOnce.Do(func() { + sid, evtCh := vu.Events().Global.Subscribe(rm.globalEvents...) + logger := vu.InitEnv().Logger + rm.WG.Add(1) + go func() { + defer func() { + close(rm.exit) + rm.WG.Done() + }() + for { + select { + case evt, ok := <-evtCh: + if !ok { + return + } + logger.Infof("got event %s with data '%+v'", evt.Type, evt.Data) + evt.Done() + if evt.Type == event.Exit { + vu.Events().Global.Unsubscribe(sid) + } + case <-vu.Context().Done(): + return + } + } + }() + }) + + if len(rm.vuEvents) > 0 { + // NOTE: It would be an improvement to only subscribe to events in VUs + // that will actually run the VU function (VU IDs > 0), and not in the + // throwaway VUs used for setup/teardown. But since there's no direct + // access to the VU ID at this point (it would involve getting it from + // vu.Runtime()), we subscribe in all VUs, and all event handler + // goroutines would exit normally once rm.exit is closed. + sid, evtCh := vu.Events().Local.Subscribe(rm.vuEvents...) + logger := vu.InitEnv().Logger + rm.WG.Add(1) + go func() { + defer rm.WG.Done() + for { + select { + case evt, ok := <-evtCh: + if !ok { + return + } + logger.Infof("got event %s with data '%+v'", evt.Type, evt.Data) + evt.Done() + case <-rm.exit: + vu.Events().Local.Unsubscribe(sid) + return + } + } + }() + } + + return &Events{} +} + +// Exports returns the exports of the k6 module. +func (e *Events) Exports() modules.Exports { + return modules.Exports{Default: e} +} diff --git a/cmd/tests/test_state.go b/cmd/tests/test_state.go index 54e5be13a0e..1459ed97b7a 100644 --- a/cmd/tests/test_state.go +++ b/cmd/tests/test_state.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.k6.io/k6/cmd/state" + "go.k6.io/k6/event" "go.k6.io/k6/lib/fsext" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/ui/console" @@ -90,6 +91,7 @@ func NewGlobalTestState(tb testing.TB) *GlobalTestState { BinaryName: "k6", CmdArgs: []string{}, Env: map[string]string{"K6_NO_USAGE_REPORT": "true"}, + Events: event.NewEventSystem(100, logger), DefaultFlags: defaultFlags, Flags: defaultFlags, OutMutex: outMutex, diff --git a/event/doc.go b/event/doc.go new file mode 100644 index 00000000000..7a5c3c04505 --- /dev/null +++ b/event/doc.go @@ -0,0 +1,3 @@ +// Package event contains the event system used to notify external components of +// various internal events during test execution. +package event diff --git a/event/event.go b/event/event.go new file mode 100644 index 00000000000..0fe6a21ab4a --- /dev/null +++ b/event/event.go @@ -0,0 +1,10 @@ +package event + +// Event is the emitted object sent to all subscribers of its type. +// The subscriber should call its Done method when finished processing +// to notify the emitter, though this is not required for all events. +type Event struct { + Type Type + Data any + Done func() +} diff --git a/event/system.go b/event/system.go new file mode 100644 index 00000000000..416f8ac7742 --- /dev/null +++ b/event/system.go @@ -0,0 +1,170 @@ +package event + +import ( + "context" + "fmt" + "sync" + + "github.com/sirupsen/logrus" +) + +// Subscriber is a limited interface of System that only allows subscribing and +// unsubscribing. +type Subscriber interface { + Subscribe(events ...Type) (subID uint64, eventsCh <-chan *Event) + Unsubscribe(subID uint64) +} + +// System keeps track of subscribers, and allows subscribing to and emitting +// events. +type System struct { + subMx sync.RWMutex + subIDCount uint64 + subscribers map[Type]map[uint64]chan *Event + eventBuffer int + logger logrus.FieldLogger +} + +// NewEventSystem returns a new System. +// eventBuffer determines the size of the Event channel buffer. Events might be +// dropped if this buffer is full and there are no event listeners, or if events +// are emitted very quickly and the event handler goroutine is busy. It is +// recommended to handle events in a separate goroutine to not block the +// listener goroutine. +func NewEventSystem(eventBuffer int, logger logrus.FieldLogger) *System { + return &System{ + subscribers: make(map[Type]map[uint64]chan *Event), + eventBuffer: eventBuffer, + logger: logger, + } +} + +// Subscribe to one or more events. It returns a subscriber ID that can be +// used to unsubscribe, and an Event channel to receive events. +// It panics if events is empty. +func (s *System) Subscribe(events ...Type) (subID uint64, eventsCh <-chan *Event) { + if len(events) == 0 { + panic("must subscribe to at least 1 event type") + } + + s.subMx.Lock() + defer s.subMx.Unlock() + s.subIDCount++ + subID = s.subIDCount + + evtCh := make(chan *Event, s.eventBuffer) + for _, evt := range events { + if s.subscribers[evt] == nil { + s.subscribers[evt] = make(map[uint64]chan *Event) + } + s.subscribers[evt][subID] = evtCh + } + + s.logger.WithFields(logrus.Fields{ + "subscriptionID": subID, + "events": events, + }).Debug("Created event subscription") + + return subID, evtCh +} + +// Emit the event to all subscribers of its type. +// It returns a function that can be optionally used to wait for all subscribers +// to process the event (by signalling via the Done method). +func (s *System) Emit(event *Event) (wait func(context.Context) error) { + s.subMx.RLock() + defer s.subMx.RUnlock() + totalSubs := len(s.subscribers[event.Type]) + if totalSubs == 0 { + return func(context.Context) error { return nil } + } + + if event.Done == nil { + event.Done = func() {} + } + origDoneFn := event.Done + doneCh := make(chan struct{}, s.eventBuffer) + doneFn := func() { + origDoneFn() + select { + case doneCh <- struct{}{}: + default: + } + } + event.Done = doneFn + + for _, evtCh := range s.subscribers[event.Type] { + select { + case evtCh <- event: + default: + } + } + + s.logger.WithFields(logrus.Fields{ + "subscribers": totalSubs, + "event": event.Type, + }).Trace("Emitted event") + + return func(ctx context.Context) error { + var doneCount int + for { + if doneCount == totalSubs { + close(doneCh) + return nil + } + select { + case <-doneCh: + doneCount++ + case <-ctx.Done(): + return fmt.Errorf("context is done before all '%s' events were processed", event.Type) + } + } + } +} + +// Unsubscribe closes the Event channel and removes the subscription with ID +// subID. +func (s *System) Unsubscribe(subID uint64) { + s.subMx.Lock() + defer s.subMx.Unlock() + var seen bool + for _, sub := range s.subscribers { + if evtCh, ok := sub[subID]; ok { + if !seen { + close(evtCh) + } + delete(sub, subID) + seen = true + } + } + + if seen { + s.logger.WithFields(logrus.Fields{ + "subscriptionID": subID, + }).Debug("Removed event subscription") + } +} + +// UnsubscribeAll closes all event channels and removes all subscriptions. +func (s *System) UnsubscribeAll() { + s.subMx.Lock() + defer s.subMx.Unlock() + + seenSubs := make(map[uint64]struct{}) + for _, sub := range s.subscribers { + for subID, evtCh := range sub { + if _, ok := seenSubs[subID]; !ok { + close(evtCh) + seenSubs[subID] = struct{}{} + } + } + } + + if len(seenSubs) > 0 { + s.logger.WithFields(logrus.Fields{ + "subscriptions": len(seenSubs), + }).Debug("Removed all event subscriptions") + } + + s.subscribers = make(map[Type]map[uint64]chan *Event) +} diff --git a/event/system_test.go b/event/system_test.go new file mode 100644 index 00000000000..33aed5de008 --- /dev/null +++ b/event/system_test.go @@ -0,0 +1,247 @@ +package event + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventSystem(t *testing.T) { + t.Parallel() + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(10, logger) + + require.Len(t, es.subscribers, 0) + + s1id, s1ch := es.Subscribe(Init) + + assert.Equal(t, uint64(1), s1id) + assert.NotNil(t, s1ch) + assert.Len(t, es.subscribers, 1) + assert.Len(t, es.subscribers[Init], 1) + assert.Equal(t, (<-chan *Event)(es.subscribers[Init][s1id]), s1ch) + + s2id, s2ch := es.Subscribe(Init, TestStart) + + assert.Equal(t, uint64(2), s2id) + assert.NotNil(t, s2ch) + assert.Len(t, es.subscribers, 2) + assert.Len(t, es.subscribers[Init], 2) + assert.Len(t, es.subscribers[TestStart], 1) + assert.Equal(t, (<-chan *Event)(es.subscribers[Init][s2id]), s2ch) + assert.Equal(t, (<-chan *Event)(es.subscribers[TestStart][s2id]), s2ch) + }) + + t.Run("subscribe/panic", func(t *testing.T) { + t.Parallel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(10, logger) + assert.PanicsWithValue(t, "must subscribe to at least 1 event type", func() { + es.Subscribe() + }) + }) + + t.Run("emit_and_process", func(t *testing.T) { + t.Parallel() + testTimeout := 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(10, logger) + + s1id, s1ch := es.Subscribe(Init, Exit) + s2id, s2ch := es.Subscribe(Init, TestStart, TestEnd, Exit) + + type result struct { + sid uint64 + events []*Event + err error + } + resultCh := make(chan result, 2) + go func() { + s1result, err := processEvents(ctx, es, s1id, s1ch) + resultCh <- result{s1id, s1result, err} + }() + + go func() { + s2result, err := processEvents(ctx, es, s2id, s2ch) + resultCh <- result{s2id, s2result, err} + }() + + var ( + doneMx sync.RWMutex + processed = make(map[Type]int) + emitEvents = []Type{Init, TestStart, IterStart, IterEnd, TestEnd, Exit} + data int + ) + for _, et := range emitEvents { + et := et + evt := &Event{Type: et, Data: data, Done: func() { + doneMx.Lock() + processed[et]++ + doneMx.Unlock() + }} + es.Emit(evt) + data++ + } + + for i := 0; i < 2; i++ { + select { + case result := <-resultCh: + require.NoError(t, result.err) + switch result.sid { + case s1id: + require.Len(t, result.events, 2) + assert.Equal(t, Init, result.events[0].Type) + assert.Equal(t, 0, result.events[0].Data) + assert.Equal(t, Exit, result.events[1].Type) + assert.Equal(t, 5, result.events[1].Data) + case s2id: + require.Len(t, result.events, 4) + assert.Equal(t, Init, result.events[0].Type) + assert.Equal(t, 0, result.events[0].Data) + assert.Equal(t, TestStart, result.events[1].Type) + assert.Equal(t, 1, result.events[1].Data) + assert.Equal(t, TestEnd, result.events[2].Type) + assert.Equal(t, 4, result.events[2].Data) + assert.Equal(t, Exit, result.events[3].Type) + assert.Equal(t, 5, result.events[3].Data) + } + case <-ctx.Done(): + t.Fatalf("test timed out after %s", testTimeout) + } + } + + expProcessed := map[Type]int{ + Init: 2, + TestStart: 1, + TestEnd: 1, + Exit: 2, + } + assert.Equal(t, expProcessed, processed) + }) + + t.Run("emit_and_wait/ok", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(100, logger) + + var ( + wg sync.WaitGroup + numSubs = 100 + ) + for i := 0; i < numSubs; i++ { + sid, evtCh := es.Subscribe(Exit) + wg.Add(1) + go func() { + defer wg.Done() + _, err := processEvents(ctx, es, sid, evtCh) + require.NoError(t, err) + }() + } + + var done uint32 + wait := es.Emit(&Event{Type: Exit, Done: func() { + atomic.AddUint32(&done, 1) + }}) + waitCtx, waitCancel := context.WithTimeout(ctx, time.Second) + defer waitCancel() + err := wait(waitCtx) + require.NoError(t, err) + assert.Equal(t, uint32(numSubs), done) + + wg.Wait() + }) + + t.Run("emit_and_wait/error", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(10, logger) + + sid, evtCh := es.Subscribe(Exit) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := processEvents(ctx, es, sid, evtCh) + assert.NoError(t, err) + }() + + wait := es.Emit(&Event{Type: Exit, Done: func() { + time.Sleep(200 * time.Millisecond) + }}) + waitCtx, waitCancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer waitCancel() + err := wait(waitCtx) + assert.EqualError(t, err, "context is done before all 'Exit' events were processed") + + wg.Wait() + }) + + t.Run("unsubscribe", func(t *testing.T) { + t.Parallel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(10, logger) + + require.Len(t, es.subscribers, 0) + + var ( + numSubs = 5 + subs = make([]uint64, numSubs) + ) + for i := 0; i < numSubs; i++ { + sid, _ := es.Subscribe(Init) + subs[i] = sid + } + + require.Len(t, es.subscribers[Init], numSubs) + + es.Unsubscribe(subs[0]) + assert.Len(t, es.subscribers[Init], numSubs-1) + es.Unsubscribe(subs[0]) // second unsubscribe does nothing + assert.Len(t, es.subscribers[Init], numSubs-1) + + es.UnsubscribeAll() + assert.Len(t, es.subscribers[Init], 0) + }) +} + +func processEvents(ctx context.Context, es *System, sid uint64, evtCh <-chan *Event) ([]*Event, error) { + result := make([]*Event, 0) + + for { + select { + case evt, ok := <-evtCh: + if !ok { + return result, nil + } + result = append(result, evt) + evt.Done() + if evt.Type == Exit { + es.Unsubscribe(sid) + } + case <-ctx.Done(): + return nil, errors.New("test timed out") + } + } +} diff --git a/event/type.go b/event/type.go new file mode 100644 index 00000000000..7e02f3e04d3 --- /dev/null +++ b/event/type.go @@ -0,0 +1,43 @@ +package event + +// Type represents the different event types emitted by k6. +// +//go:generate enumer -type=Type -trimprefix Type -output type_gen.go +type Type uint8 + +const ( + // Init is emitted when k6 starts initializing outputs, VUs and executors. + Init Type = iota + 1 + // TestStart is emitted when the execution scheduler starts running the test. + TestStart + // TestEnd is emitted when the test execution ends. + TestEnd + // IterStart is emitted when a VU starts an iteration. + IterStart + // IterEnd is emitted when a VU ends an iteration. + IterEnd + // Exit is emitted when the k6 process is about to exit. + Exit +) + +//nolint:gochecknoglobals +var ( + // GlobalEvents are emitted once per test run. + GlobalEvents = []Type{Init, TestStart, TestEnd, Exit} + // VUEvents are emitted multiple times per each VU. + VUEvents = []Type{IterStart, IterEnd} +) + +// ExitData is the data sent in the Exit event. Error is the error returned by +// the run command. +type ExitData struct { + Error error +} + +// IterData is the data sent in the IterStart and IterEnd events. +type IterData struct { + Iteration int64 + VUID uint64 + ScenarioName string + Error error +} diff --git a/event/type_gen.go b/event/type_gen.go new file mode 100644 index 00000000000..21b2662ba3b --- /dev/null +++ b/event/type_gen.go @@ -0,0 +1,54 @@ +// Code generated by "enumer -type=Type -trimprefix Type -output type_gen.go"; DO NOT EDIT. + +package event + +import ( + "fmt" +) + +const _TypeName = "InitTestStartTestEndIterStartIterEndExit" + +var _TypeIndex = [...]uint8{0, 4, 13, 20, 29, 36, 40} + +func (i Type) String() string { + i -= 1 + if i >= Type(len(_TypeIndex)-1) { + return fmt.Sprintf("Type(%d)", i+1) + } + return _TypeName[_TypeIndex[i]:_TypeIndex[i+1]] +} + +var _TypeValues = []Type{1, 2, 3, 4, 5, 6} + +var _TypeNameToValueMap = map[string]Type{ + _TypeName[0:4]: 1, + _TypeName[4:13]: 2, + _TypeName[13:20]: 3, + _TypeName[20:29]: 4, + _TypeName[29:36]: 5, + _TypeName[36:40]: 6, +} + +// TypeString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func TypeString(s string) (Type, error) { + if val, ok := _TypeNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to Type values", s) +} + +// TypeValues returns all values of the enum +func TypeValues() []Type { + return _TypeValues +} + +// IsAType returns "true" if the value is listed in the enum definition. "false" otherwise +func (i Type) IsAType() bool { + for _, v := range _TypeValues { + if i == v { + return true + } + } + return false +} diff --git a/js/bundle.go b/js/bundle.go index 3b7c9a46542..0e384dc1161 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -15,6 +15,7 @@ import ( "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/compiler" "go.k6.io/k6/js/eventloop" @@ -112,7 +113,14 @@ func newBundle( // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. // TODO use a real context - vuImpl := &moduleVUImpl{ctx: context.Background(), runtime: goja.New()} + vuImpl := &moduleVUImpl{ + ctx: context.Background(), + runtime: goja.New(), + events: events{ + global: piState.Events, + local: event.NewEventSystem(100, piState.Logger), + }, + } vuImpl.eventLoop = eventloop.New(vuImpl) exports, err := bundle.instantiate(vuImpl, 0) if err != nil { @@ -220,7 +228,14 @@ func (b *Bundle) populateExports(updateOptions bool, exports *goja.Object) error func (b *Bundle) Instantiate(ctx context.Context, vuID uint64) (*BundleInstance, error) { // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. - vuImpl := &moduleVUImpl{ctx: ctx, runtime: goja.New()} + vuImpl := &moduleVUImpl{ + ctx: ctx, + runtime: goja.New(), + events: events{ + global: b.preInitState.Events, + local: event.NewEventSystem(100, b.preInitState.Logger), + }, + } vuImpl.eventLoop = eventloop.New(vuImpl) exports, err := b.instantiate(vuImpl, vuID) if err != nil { diff --git a/js/common/event.go b/js/common/event.go new file mode 100644 index 00000000000..d561c09b812 --- /dev/null +++ b/js/common/event.go @@ -0,0 +1,9 @@ +package common + +import "go.k6.io/k6/event" + +// Events are the event subscriber interfaces for the global event system, and +// the local (per-VU) event system. +type Events struct { + Global, Local event.Subscriber +} diff --git a/js/modules/modules.go b/js/modules/modules.go index 148d899b0f5..5296573f8cd 100644 --- a/js/modules/modules.go +++ b/js/modules/modules.go @@ -42,6 +42,14 @@ type VU interface { // Context return the context.Context about the current VU Context() context.Context + // Events allows subscribing to global k6 execution events, such as Init and + // Exit, and to local (per-VU) events, such as IterStart and IterEnd. + // NOTE: This API is EXPERIMENTAL and may be changed, renamed or + // completely removed in a later k6 release. + // FIXME: Subscribing to global events shouldn't be part of this VU (local) + // interface. + Events() common.Events + // InitEnv returns common.InitEnvironment instance if present InitEnv() *common.InitEnvironment @@ -55,9 +63,6 @@ type VU interface { // on the event loop *at a later point in time*. See the documentation for // `EventLoop.RegisterCallback()` in the `k6/js/eventloop` Go module for // the very important details on its usage and restrictions. - // - // Notice: This API is EXPERIMENTAL and may be changed, renamed or - // completely removed in a later k6 release. RegisterCallback() (enqueueCallback func(func() error)) // sealing field will help probably with pointing users that they just need to embed this in their Instance diff --git a/js/modules_vu.go b/js/modules_vu.go index aa2e3858fd0..ff0a42768dc 100644 --- a/js/modules_vu.go +++ b/js/modules_vu.go @@ -4,23 +4,33 @@ import ( "context" "github.com/dop251/goja" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/eventloop" "go.k6.io/k6/lib" ) +type events struct { + global, local *event.System +} + type moduleVUImpl struct { ctx context.Context initEnv *common.InitEnvironment state *lib.State runtime *goja.Runtime eventLoop *eventloop.EventLoop + events events } func (m *moduleVUImpl) Context() context.Context { return m.ctx } +func (m *moduleVUImpl) Events() common.Events { + return common.Events{Global: m.events.global, Local: m.events.local} +} + func (m *moduleVUImpl) InitEnv() *common.InitEnvironment { return m.initEnv } diff --git a/js/modulestest/modulestest.go b/js/modulestest/modulestest.go index 0b7cd96462c..6106da9878f 100644 --- a/js/modulestest/modulestest.go +++ b/js/modulestest/modulestest.go @@ -15,6 +15,7 @@ var _ modules.VU = &VU{} type VU struct { CtxField context.Context InitEnvField *common.InitEnvironment + EventsField common.Events StateField *lib.State RuntimeField *goja.Runtime RegisterCallbackField func() func(f func() error) @@ -25,6 +26,11 @@ func (m *VU) Context() context.Context { return m.CtxField } +// Events returns internally set field to conform to modules.VU interface +func (m *VU) Events() common.Events { + return m.EventsField +} + // InitEnv returns internally set field to conform to modules.VU interface func (m *VU) InitEnv() *common.InitEnvironment { m.checkIntegrity() diff --git a/js/runner.go b/js/runner.go index f604c4e4f89..977bd798017 100644 --- a/js/runner.go +++ b/js/runner.go @@ -23,6 +23,7 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/eventloop" "go.k6.io/k6/lib" @@ -765,6 +766,24 @@ func (u *ActiveVU) RunOnce() error { ctx, cancel := context.WithCancel(u.RunContext) defer cancel() u.moduleVUImpl.ctx = ctx + + eventIterData := event.IterData{ + Iteration: u.iteration, + VUID: u.ID, + ScenarioName: u.scenarioName, + } + + emitAndWaitEvent := func(evt *event.Event) { + waitDone := u.moduleVUImpl.events.local.Emit(evt) + waitCtx, waitCancel := context.WithTimeout(u.RunContext, 30*time.Minute) + defer waitCancel() + if werr := waitDone(waitCtx); werr != nil { + u.state.Logger.WithError(werr).Warn() + } + } + + emitAndWaitEvent(&event.Event{Type: event.IterStart, Data: eventIterData}) + // Call the exported function. _, isFullIteration, totalTime, err := u.runFn(ctx, true, fn, cancel, u.setupData) if err != nil { @@ -775,8 +794,11 @@ func (u *ActiveVU) RunOnce() error { err = v } } + eventIterData.Error = err } + emitAndWaitEvent(&event.Event{Type: event.IterEnd, Data: eventIterData}) + // If MinIterationDuration is specified and the iteration wasn't canceled // and was less than it, sleep for the remainder if isFullIteration && u.Runner.Bundle.Options.MinIterationDuration.Valid { diff --git a/lib/test_state.go b/lib/test_state.go index 9fd5552839d..370b12103cc 100644 --- a/lib/test_state.go +++ b/lib/test_state.go @@ -4,6 +4,7 @@ import ( "io" "github.com/sirupsen/logrus" + "go.k6.io/k6/event" "go.k6.io/k6/metrics" ) @@ -13,6 +14,7 @@ type TestPreInitState struct { RuntimeOptions RuntimeOptions Registry *metrics.Registry BuiltinMetrics *metrics.BuiltinMetrics + Events *event.System KeyLogger io.Writer LookupEnv func(key string) (val string, ok bool) Logger logrus.FieldLogger