Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event system for JS modules #3112

Merged
merged 12 commits into from
Jun 30, 2023
37 changes: 37 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/js/common"
"go.k6.io/k6/lib"
Expand All @@ -39,6 +40,12 @@ type cmdRun struct {
gs *state.GlobalState
}

// We use an excessively high timeout to wait for event processing to complete,
// since prematurely proceeding before it is done could create bigger problems.
// In practice, this effectively acts as no timeout, and the user will have to
// kill k6 if a hang happens, which is the behavior without events anyway.
const waitEventDoneTimeout = 30 * time.Minute

// TODO: split apart some more
//
//nolint:funlen,gocognit,gocyclo,cyclop
Expand Down Expand Up @@ -66,6 +73,26 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
// from sub-contexts while also attaching a reason for the abort.
runCtx, runAbort := execution.NewTestRunContext(lingerCtx, logger)

emitEvent := func(evt *event.Event) func() {
waitDone := c.gs.Events.Emit(evt)
return func() {
waitCtx, waitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout)
defer waitCancel()
if werr := waitDone(waitCtx); werr != nil {
logger.WithError(werr).Warn()
}
}
}
Comment on lines +76 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This construct repeats with basically the context and the timeout repeated.

I am not certain if it will be better to

  1. not have it, but make waitDone (returned by emit) to take a timeout instead of a context 🤷. Which IMO will make this nicer to write by hand.
  2. move it to events package and add a "few" arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did previosly receive a time.Duration, but we decided to replace it with a context because it avoided embedding one in the System struct, and passing one avoids using a separate timer (time.After()). A context is also safer since it can be based on a parent context, which ensures it can exit early. And passing both a context and a duration seems redundant.

So I see this as a helper function specific to each caller. It doesn't make sense to have it part of the event package, as the logic here could be different. It's only similar in both cmd and js because we essentially want to treat it in the same way. But what if the caller wants to log an Error instead, or fail a test, or whatever?


defer func() {
waitExitDone := emitEvent(&event.Event{
Type: event.Exit,
Data: &event.ExitData{Error: err},
})
waitExitDone()
c.gs.Events.UnsubscribeAll()
}()

test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig)
if err != nil {
return err
Expand Down Expand Up @@ -153,6 +180,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

waitInitDone := emitEvent(&event.Event{Type: event.Init})

// Create and start the outputs. We do it quite early to get any output URLs
// or other details below. It also allows us to ensure when they have
// flushed their samples and when they have stopped in the defer statements.
Expand Down Expand Up @@ -300,10 +329,18 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

waitInitDone()

waitTestStartDone := emitEvent(&event.Event{Type: event.TestStart})
waitTestStartDone()

// Start the test! However, we won't immediately return if there was an
// error, we still have things to do.
err = execScheduler.Run(globalCtx, runCtx, samples)

waitTestEndDone := emitEvent(&event.Event{Type: event.TestEnd})
defer waitTestEndDone()

// Init has passed successfully, so unless disabled, make sure we send a
// usage report after the context is done.
if !conf.NoUsageReport.Bool {
Expand Down
3 changes: 3 additions & 0 deletions cmd/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mattn/go-isatty"
"github.com/sirupsen/logrus"

"go.k6.io/k6/event"
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/ui/console"
)
Expand Down Expand Up @@ -39,6 +40,7 @@ type GlobalState struct {
BinaryName string
CmdArgs []string
Env map[string]string
Events *event.System

DefaultFlags, Flags GlobalFlags

Expand Down Expand Up @@ -110,6 +112,7 @@ func NewGlobalState(ctx context.Context) *GlobalState {
BinaryName: filepath.Base(binary),
CmdArgs: os.Args,
Env: env,
Events: event.NewEventSystem(100, logger),
DefaultFlags: defaultFlags,
Flags: getFlags(defaultFlags, env),
OutMutex: outMutex,
Expand Down
1 change: 1 addition & 0 deletions cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loaded
RuntimeOptions: runtimeOptions,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
Events: gs.Events,
LookupEnv: func(key string) (string, bool) {
val, ok := gs.Env[key]
return val, ok
Expand Down
234 changes: 232 additions & 2 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
Expand All @@ -23,7 +24,10 @@ import (
"github.com/tidwall/gjson"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/cmd"
"go.k6.io/k6/cmd/tests/events"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/lib/testutils"
Expand All @@ -42,7 +46,6 @@ func TestVersion(t *testing.T) {
assert.Contains(t, stdout, runtime.Version())
assert.Contains(t, stdout, runtime.GOOS)
assert.Contains(t, stdout, runtime.GOARCH)
assert.NotContains(t, stdout[:len(stdout)-1], "\n")

assert.Empty(t, ts.Stderr.Bytes())
assert.Empty(t, ts.LoggerHook.Drain())
Expand Down Expand Up @@ -1534,7 +1537,7 @@ func TestMinIterationDuration(t *testing.T) {
elapsed := time.Since(start)
assert.Greater(t, elapsed, 7*time.Second, "expected more time to have passed because of minIterationDuration")
assert.Less(
t, elapsed, 14*time.Second,
t, elapsed, 15*time.Second,
Copy link
Contributor Author

@imiric imiric Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI waiting on IterStart and IterEnd does have a non-negligible impact on iteration duration, even if the events aren't being handled. I've seen this test fail at least once after this change with the duration being ~14.1s.

Maybe we can further optimize the waiting function, but ideally, we shouldn't introduce waits in such a hot path, since this impacts the overall performance of k6. So I would suggest we start looking into ways to do the browser initialization on the Init event, and shutdown on TestEnd/Exit during this cycle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks for the heads up @imiric 👍 I don't think this will fit in this cycle, but I will bring it up again for discussion, as this will imply a change in our current model on how we handle browser lifecycle.

Copy link
Contributor

@ankur22 ankur22 Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering whether it would be possible to negate the time that k6 is waiting on IterStart and IterEnd from the iteration duration or related metrics as a short term solution? Or is there a bigger impact than just increasing the iteration duration?

I like the long term goal with working with Init, and TestEnd/Exit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ankur22 That sounds way too hacky for my taste 😅 It would effectively be faking metrics to compensate for the fact that k6 is now taking longer to run, but it wouldn't solve the actual problem of slower execution.

Besides, as you can see from this test, the issue is not with the value of the iteration duration metric, but with the elapsed time of the test. The metrics are unaffected by this, since we measure the actual iteration time in the VU.runFn method. But overall, the iteration rate would be slower, which might add up to considerably lower throughput after this change. We should benchmark it to be sure, but I think this is a blocker for merging this, and we should prioritize that change in how k6-browser handles browser processes.

Copy link
Contributor

@ankur22 ankur22 Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But overall, the iteration rate would be slower, which might add up to considerably lower throughput after this change.

👍

We should benchmark it to be sure, but I think this is a blocker for merging this, and we should prioritize that change in how k6-browser handles browser processes.

Is the benchmarking the blocker or this issue with the current implementation? If the current implementation is the blocker then that will affect the release, and we need to work on it straight away 😬

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imiric I have the same opinion as you 👍

Copy link
Contributor Author

@imiric imiric Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some benchmarks in 89fa056. See the results here.

In summary:

  • There's a very minor difference in performance between master (97d40e9) and this branch (d5897fa) when a module that uses events isn't imported; 17.32ms ± 1% vs 17.34ms ± 1%. But according to benchstat it's not statistically significant, so it might not be concerning, and the TestMinIterationDuration failure I experienced was probably a fluke(?).

    There is a minor overhead in allocations and memory usage, but it's not too bad.

  • Understandably, there is a substantial impact to performance when a module that does use events is imported. The benchmarks aren't directly comparable, but it's around +~68%.

I didn't benchmark importing the events module, but not subscribing to Iter* events. Would this be interesting?

The more realistic test would be running k6-benchmarks, but that would take more time, and I'm not sure if we need it right now.

So I'm not sure what to do with these results. Importing the browser module would have an impact on performance anyway, so whatever the overhead is from event processing is minor in comparison to everything else.

But at the same time, ActiveVU.RunOnce() is a very sensitive place to add any delays in, and I'm hesitant about adding even event emission without waiting. So I still lean towards k6-browser moving to doing initialization on the Init event and shutdown on TestEnd/Exit rather than on the Iter* events. This would be the desired end-goal anyway, and we wouldn't run risks of impacting k6 performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some benchmarks in 89fa056. See the results here.

Thanks for this @imiric !

I didn't benchmark importing the events module, but not subscribing to Iter* events. Would this be interesting?

If I'm understanding this correctly, I would say no, as, as you said, if a module is using the events, the impact of the overhead of the events system implementation is probably negligible in comparison with the actual work that is done as a reaction to the events. I think the critical point was understanding the impact of the event system implementation for a test that does not use the event system at all. Still, the benchmarks are done with a void iteration, which makes sense to isolate and understand the impact of the event system, but if we consider that usually an iteration in k6 would include at least 1 HTTP request, which would imply ~100-200ms, the impact in that context is "reduced". I understand this is what benchstat indicates with "not statistically significant".

All in all, I think waiting or not for the IterStart and IterEnd events, or even supporting these events at all, is a k6-core decision which should consider also the usage of these events for other extensions or for the JS API also, and how should it behave in these cases.
From the k6 browser side we will try to analyze what would be the effort to move the browser initialization to the Init event, but this will require proper testing and analysis also on the performance when being shared by multiple VUs, on sharing the WS connection or not, etc.

Copy link
Contributor

@mstoykov mstoykov Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow reply - I didn't manage to get to writing this last week :(.

Let's start with some presumptions/ideas about this PR and change at this point in time.

We are doing this almost entirely to support browser in their need for functionality within k6. It happens that this somewhat aligns with a more generic feature we want that isn't (well) fleshed out - events.
The PR while based on the idea is not really expected to be usable for other extensions or internal features apart from browser. Having this expectations raises (IMO) the complexity quite a lot.
This also means that IMO this change is supposed to test how this will work in particular case while supporting it for the time being.
That is to say that we don't need it to be perfect and we will likely change it.

Browser has stipulated they need the following things to be possible:

  1. to be signalled when an iteration ends, so they can disconnect/stop browser.
    Also, they would like to:
  2. Know when to start/connect to a browser.
  3. When k6 ends execution, so they can make certain they disconnect/stop all the browser if any left.

A point here is that starting as many browsers as needed for the test at the beginning depends on the understanding of "how many" are needed. Which requires some calculations as scenarios might not overlap and as such k6 will reuse VUs between them and in theory browser can be reused as well. For some cases this calculations and internal implementation might be easy (a scenario starting 5 minutes after another stops and bot using 10 browsers). But for others this will be a bit harder - 2 ramping VUs scenarios that have differing peaks - one going 0 to 5 every minute , 1 going 5 to 0 every minute.

To be perfectly honest at this point I expect that the later example would not even work all that well now that k6 execution is a lot more complex. Even before event loop waiting we in practice could've ended up waiting on a websocket (for example) connection to send end message throwing the things out of order.

A browser implementation will also need to continuously "keep tabs" on whether a browser is in use in this case - effectively needing at least iterEnd. As otherwise ran iteration can end and the browser module can still think that a browser instance is assigned to a VU, but it might never start.

IMO browser team should decide what they need and how it will work, and we should go with that, unless it is detrimental for tests not using browser. Again my expectation is that this will change at least once more trying to make stuff work and the internal implementations will likely change even more overtime.

Understandably, there is a substantial impact to performance when a module that does use events is imported. The benchmarks aren't directly comparable, but it's around +~68%.

68% on 17.32ms is still around ~11ms which while not 0 is ... likely not noticeable.

Some potential workarounds that can be used:
If browser team is okay with iterStart not being used but instead starting/connect to browser on first calls. On this first calls the VU can subscribe to iterEnd to stop the browser. This way only in those case we will have the needed calls.

If we have scenarioStart/scenarioEnd - again only on those events different VUs can subscribe to iterStart/iterEnd for the duration of the scenario.

Both of the above can done after an initial implementation as part of an optimization round.

cc @imiric @ka3de IMO it is better to continue as is with grafana/xk6-browser#944 and to see what problems will arise from what could be a considered a PoC (but full) solution given the current plan.

This way we have a point to fallback and something that works.

After that we can iterate on it if needed/ we have time.

I will write a different comment on some ... points that I feel haven't been touched on.
edit: my other comment

Copy link
Contributor Author

@imiric imiric Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mstoykov:

The PR while based on the idea is not really expected to be usable for other extensions or internal features apart from browser. Having this expectations raises (IMO) the complexity quite a lot.

There are no such expectations and k6-browser is our main focus, but at the same time we should a) ensure that whatever is done here doesn't negatively impact current k6 functionality, and b) leave an open-ended design that could be extended if needed, and doesn't couple us to specific k6 internals. This last part is why I don't want to rely on the JS event loop.

That is to say that we don't need it to be perfect and we will likely change it.

Agreed. What we need to decide here is if the performance impact is a concern or not, and if k6-browser should change the way they manage browser processes in the short-term, i.e. before this PR is merged. In the long-term we agree that we shouldn't use Iter* events for these management tasks, but whether this will happen on Init/Exit or in new scenario-specific events, is up for debate.

I agree with you and @ka3de that these benchmark results are not concerning in the grand scheme of things, so let's proceed with the current approach, and we can optimize it later.

"expected less time to have passed because minIterationDuration should not affect setup() and teardown() ",
)

Expand Down Expand Up @@ -1998,3 +2001,230 @@ func TestBadLogOutput(t *testing.T) {
})
}
}

// HACK: We need this so multiple tests can register differently named modules.
var uniqueModuleNumber uint64 //nolint:gochecknoglobals

// Tests that the appropriate events are emitted in the correct order.
func TestEventSystemOK(t *testing.T) {
t.Parallel()

ts := NewGlobalTestState(t)

moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1))
mod := events.New(event.GlobalEvents, event.VUEvents)
modules.Register(moduleName, mod)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(`
import events from '%s';
import { sleep } from 'k6';

export let options = {
vus: 1,
iterations: 5,
}

export default function () { sleep(1); }
`, moduleName)))

cmd.ExecuteWithGlobalState(ts.GlobalState)

doneCh := make(chan struct{})
go func() {
mod.WG.Wait()
close(doneCh)
}()

select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("timed out")
}

expLog := []string{
`got event Init with data '<nil>'`,
`got event TestStart with data '<nil>'`,
`got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:2 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:2 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:3 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:3 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:4 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:4 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event TestEnd with data '<nil>'`,
`got event Exit with data '&{Error:<nil>}'`,
}
log := ts.LoggerHook.Lines()
assert.Equal(t, expLog, log)
}

// Check emitted events in the case of a script error.
func TestEventSystemError(t *testing.T) {
t.Parallel()

testCases := []struct {
name, script string
expLog []string
expExitCode exitcodes.ExitCode
}{
{
name: "abort",
script: `
import { test } from 'k6/execution';

export let options = {
vus: 1,
iterations: 5,
}

export default function () {
test.abort('oops!');
}
`, expLog: []string{
"got event Init with data '<nil>'",
"got event TestStart with data '<nil>'",
"got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:test aborted: oops! at file:///-:11:16(6)}'",
"got event TestEnd with data '<nil>'",
"got event Exit with data '&{Error:test aborted: oops! at file:///-:11:16(6)}'",
"test aborted: oops! at file:///-:11:16(6)",
},
expExitCode: exitcodes.ScriptAborted,
},
{
name: "init",
script: "undefinedVar",
expLog: []string{
"got event Exit with data '&{Error:could not initialize '-': could not load JS test " +
"'file:///-': ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n}'",
"ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n",
},
expExitCode: exitcodes.ScriptException,
},
{
name: "throw",
script: `
export let options = {
vus: 1,
iterations: 2,
}

export default function () {
throw new Error('oops!');
}
`, expLog: []string{
"got event Init with data '<nil>'",
"got event TestStart with data '<nil>'",
"got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'",
"Error: oops!\n\tat file:///-:9:11(3)\n",
"got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'",
"Error: oops!\n\tat file:///-:9:11(3)\n",
"got event TestEnd with data '<nil>'",
"got event Exit with data '&{Error:<nil>}'",
},
expExitCode: 0,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ts := NewGlobalTestState(t)

moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1))
mod := events.New(event.GlobalEvents, event.VUEvents)
modules.Register(moduleName, mod)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.ExpectedExitCode = int(tc.expExitCode)
ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf("import events from '%s';\n%s", moduleName, tc.script)))

cmd.ExecuteWithGlobalState(ts.GlobalState)

doneCh := make(chan struct{})
go func() {
mod.WG.Wait()
close(doneCh)
}()

select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("timed out")
}

log := ts.LoggerHook.Lines()
assert.Equal(t, tc.expLog, log)
})
}
}

func BenchmarkRun(b *testing.B) {
b.StopTimer()

for i := 0; i < b.N; i++ {
ts := NewGlobalTestState(b)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.Stdin = bytes.NewBuffer([]byte(`
export let options = {
vus: 10,
iterations: 100,
}

export default function () {}
`))
ts.ExpectedExitCode = 0

b.StartTimer()
cmd.ExecuteWithGlobalState(ts.GlobalState)
b.StopTimer()
}
}

func BenchmarkRunEvents(b *testing.B) {
b.StopTimer()

for i := 0; i < b.N; i++ {
ts := NewGlobalTestState(b)

moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1))
mod := events.New(event.GlobalEvents, event.VUEvents)
modules.Register(moduleName, mod)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(`
import events from '%s';
export let options = {
vus: 10,
iterations: 100,
}

export default function () {}
`, moduleName)))
ts.ExpectedExitCode = 0

b.StartTimer()
cmd.ExecuteWithGlobalState(ts.GlobalState)
b.StopTimer()

doneCh := make(chan struct{})
go func() {
mod.WG.Wait()
close(doneCh)
}()

select {
case <-doneCh:
case <-time.After(time.Second):
b.Fatal("timed out")
}
}
}
Loading