Skip to content

Commit

Permalink
Merge 89fa056 into 97d40e9
Browse files Browse the repository at this point in the history
  • Loading branch information
imiric authored Jun 23, 2023
2 parents 97d40e9 + 89fa056 commit 14d02c9
Show file tree
Hide file tree
Showing 19 changed files with 981 additions and 4 deletions.
38 changes: 38 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/js/common"
"go.k6.io/k6/lib"
Expand All @@ -39,6 +40,12 @@ type cmdRun struct {
gs *state.GlobalState
}

// We use an excessively high timeout to wait for event processing to complete,
// since prematurely proceeding before it is done could create bigger problems.
// In practice, this effectively acts as no timeout, and the user will have to
// kill k6 if a hang happens, which is the behavior without events anyway.
const waitEventDoneTimeout = 30 * time.Minute

// TODO: split apart some more
//
//nolint:funlen,gocognit,gocyclo,cyclop
Expand Down Expand Up @@ -66,6 +73,25 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
// from sub-contexts while also attaching a reason for the abort.
runCtx, runAbort := execution.NewTestRunContext(lingerCtx, logger)

emitEvent := func(evt *event.Event) func() {
waitDone := c.gs.Events.Emit(evt)
return func() {
waitCtx, waitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout)
defer waitCancel()
if werr := waitDone(waitCtx); werr != nil {
logger.WithError(werr).Warn()
}
}
}

defer func() {
emitEvent(&event.Event{
Type: event.Exit,
Data: &event.ExitData{Error: err},
})()
c.gs.Events.UnsubscribeAll()
}()

test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig)
if err != nil {
return err
Expand Down Expand Up @@ -153,6 +179,12 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

// TODO: Subscribe all initialization processes (outputs, VUs and executors)
// to the Init event. This would allow running them concurrently, and they
// could be synchronized by waiting for the event processing to complete.
// This could later be expanded to also initialize browser processes.
waitInitDone := emitEvent(&event.Event{Type: event.Init})

// Create and start the outputs. We do it quite early to get any output URLs
// or other details below. It also allows us to ensure when they have
// flushed their samples and when they have stopped in the defer statements.
Expand Down Expand Up @@ -300,10 +332,16 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

waitInitDone()

emitEvent(&event.Event{Type: event.TestStart})()

// Start the test! However, we won't immediately return if there was an
// error, we still have things to do.
err = execScheduler.Run(globalCtx, runCtx, samples)

defer emitEvent(&event.Event{Type: event.TestEnd})()

// Init has passed successfully, so unless disabled, make sure we send a
// usage report after the context is done.
if !conf.NoUsageReport.Bool {
Expand Down
3 changes: 3 additions & 0 deletions cmd/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mattn/go-isatty"
"github.com/sirupsen/logrus"

"go.k6.io/k6/event"
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/ui/console"
)
Expand Down Expand Up @@ -39,6 +40,7 @@ type GlobalState struct {
BinaryName string
CmdArgs []string
Env map[string]string
Events *event.System

DefaultFlags, Flags GlobalFlags

Expand Down Expand Up @@ -110,6 +112,7 @@ func NewGlobalState(ctx context.Context) *GlobalState {
BinaryName: filepath.Base(binary),
CmdArgs: os.Args,
Env: env,
Events: event.NewEventSystem(100, logger),
DefaultFlags: defaultFlags,
Flags: getFlags(defaultFlags, env),
OutMutex: outMutex,
Expand Down
1 change: 1 addition & 0 deletions cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loaded
RuntimeOptions: runtimeOptions,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
Events: gs.Events,
LookupEnv: func(key string) (string, bool) {
val, ok := gs.Env[key]
return val, ok
Expand Down
234 changes: 232 additions & 2 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
Expand All @@ -23,7 +24,10 @@ import (
"github.com/tidwall/gjson"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/cmd"
"go.k6.io/k6/cmd/tests/events"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/fsext"
"go.k6.io/k6/lib/testutils"
Expand All @@ -42,7 +46,6 @@ func TestVersion(t *testing.T) {
assert.Contains(t, stdout, runtime.Version())
assert.Contains(t, stdout, runtime.GOOS)
assert.Contains(t, stdout, runtime.GOARCH)
assert.NotContains(t, stdout[:len(stdout)-1], "\n")

assert.Empty(t, ts.Stderr.Bytes())
assert.Empty(t, ts.LoggerHook.Drain())
Expand Down Expand Up @@ -1534,7 +1537,7 @@ func TestMinIterationDuration(t *testing.T) {
elapsed := time.Since(start)
assert.Greater(t, elapsed, 7*time.Second, "expected more time to have passed because of minIterationDuration")
assert.Less(
t, elapsed, 14*time.Second,
t, elapsed, 15*time.Second,
"expected less time to have passed because minIterationDuration should not affect setup() and teardown() ",
)

Expand Down Expand Up @@ -1998,3 +2001,230 @@ func TestBadLogOutput(t *testing.T) {
})
}
}

// HACK: We need this so multiple tests can register differently named modules.
var uniqueModuleNumber uint64 //nolint:gochecknoglobals

// Tests that the appropriate events are emitted in the correct order.
func TestEventSystemOK(t *testing.T) {
t.Parallel()

ts := NewGlobalTestState(t)

moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1))
mod := events.New(event.GlobalEvents, event.VUEvents)
modules.Register(moduleName, mod)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(`
import events from '%s';
import { sleep } from 'k6';
export let options = {
vus: 1,
iterations: 5,
}
export default function () { sleep(1); }
`, moduleName)))

cmd.ExecuteWithGlobalState(ts.GlobalState)

doneCh := make(chan struct{})
go func() {
mod.WG.Wait()
close(doneCh)
}()

select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("timed out")
}

expLog := []string{
`got event Init with data '<nil>'`,
`got event TestStart with data '<nil>'`,
`got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:2 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:2 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:3 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:3 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterStart with data '{Iteration:4 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event IterEnd with data '{Iteration:4 VUID:1 ScenarioName:default Error:<nil>}'`,
`got event TestEnd with data '<nil>'`,
`got event Exit with data '&{Error:<nil>}'`,
}
log := ts.LoggerHook.Lines()
assert.Equal(t, expLog, log)
}

// Check emitted events in the case of a script error.
func TestEventSystemError(t *testing.T) {
t.Parallel()

testCases := []struct {
name, script string
expLog []string
expExitCode exitcodes.ExitCode
}{
{
name: "abort",
script: `
import { test } from 'k6/execution';
export let options = {
vus: 1,
iterations: 5,
}
export default function () {
test.abort('oops!');
}
`, expLog: []string{
"got event Init with data '<nil>'",
"got event TestStart with data '<nil>'",
"got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:test aborted: oops! at file:///-:11:16(6)}'",
"got event TestEnd with data '<nil>'",
"got event Exit with data '&{Error:test aborted: oops! at file:///-:11:16(6)}'",
"test aborted: oops! at file:///-:11:16(6)",
},
expExitCode: exitcodes.ScriptAborted,
},
{
name: "init",
script: "undefinedVar",
expLog: []string{
"got event Exit with data '&{Error:could not initialize '-': could not load JS test " +
"'file:///-': ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n}'",
"ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n",
},
expExitCode: exitcodes.ScriptException,
},
{
name: "throw",
script: `
export let options = {
vus: 1,
iterations: 2,
}
export default function () {
throw new Error('oops!');
}
`, expLog: []string{
"got event Init with data '<nil>'",
"got event TestStart with data '<nil>'",
"got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'",
"Error: oops!\n\tat file:///-:9:11(3)\n",
"got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:<nil>}'",
"got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'",
"Error: oops!\n\tat file:///-:9:11(3)\n",
"got event TestEnd with data '<nil>'",
"got event Exit with data '&{Error:<nil>}'",
},
expExitCode: 0,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ts := NewGlobalTestState(t)

moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1))
mod := events.New(event.GlobalEvents, event.VUEvents)
modules.Register(moduleName, mod)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.ExpectedExitCode = int(tc.expExitCode)
ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf("import events from '%s';\n%s", moduleName, tc.script)))

cmd.ExecuteWithGlobalState(ts.GlobalState)

doneCh := make(chan struct{})
go func() {
mod.WG.Wait()
close(doneCh)
}()

select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("timed out")
}

log := ts.LoggerHook.Lines()
assert.Equal(t, tc.expLog, log)
})
}
}

func BenchmarkRun(b *testing.B) {
b.StopTimer()

for i := 0; i < b.N; i++ {
ts := NewGlobalTestState(b)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.Stdin = bytes.NewBuffer([]byte(`
export let options = {
vus: 10,
iterations: 100,
}
export default function () {}
`))
ts.ExpectedExitCode = 0

b.StartTimer()
cmd.ExecuteWithGlobalState(ts.GlobalState)
b.StopTimer()
}
}

func BenchmarkRunEvents(b *testing.B) {
b.StopTimer()

for i := 0; i < b.N; i++ {
ts := NewGlobalTestState(b)

moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1))
mod := events.New(event.GlobalEvents, event.VUEvents)
modules.Register(moduleName, mod)

ts.CmdArgs = []string{"k6", "--quiet", "run", "-"}
ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(`
import events from '%s';
export let options = {
vus: 10,
iterations: 100,
}
export default function () {}
`, moduleName)))
ts.ExpectedExitCode = 0

b.StartTimer()
cmd.ExecuteWithGlobalState(ts.GlobalState)
b.StopTimer()

doneCh := make(chan struct{})
go func() {
mod.WG.Wait()
close(doneCh)
}()

select {
case <-doneCh:
case <-time.After(time.Second):
b.Fatal("timed out")
}
}
}
Loading

0 comments on commit 14d02c9

Please sign in to comment.