Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Summary callback changes #1768

Merged
merged 4 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/loadimpact/k6/stats/influxdb"
"github.com/loadimpact/k6/stats/kafka"
"github.com/loadimpact/k6/stats/statsd"
"github.com/loadimpact/k6/ui"
)

// configFlagSet returns a FlagSet with the default run configuration flags.
Expand Down Expand Up @@ -220,7 +219,7 @@ func getConsolidatedConfig(fs afero.Fs, cliConf Config, runner lib.Runner) (conf
// for CLI flags in cmd.getOptions, in case other configuration sources
// (e.g. env vars) overrode our default value. This is not done in
// lib.Options.Validate to avoid circular imports.
if err = ui.ValidateSummary(conf.SummaryTrendStats); err != nil {
if _, err = stats.GetResolversForTrendColumns(conf.SummaryTrendStats); err != nil {
return conf, err
}

Expand Down Expand Up @@ -270,11 +269,15 @@ func validateConfig(conf Config, isExecutable func(string) bool) error {
}
}

return consolidateErrorMessage(errList, "There were problems with the specified script configuration:")
}

func consolidateErrorMessage(errList []error, title string) error {
if len(errList) == 0 {
return nil
}

errMsgParts := []string{"There were problems with the specified script configuration:"}
errMsgParts := []string{title}
for _, err := range errList {
errMsgParts = append(errMsgParts, fmt.Sprintf("\t- %s", err.Error()))
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/loadimpact/k6/lib/consts"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/ui"
)

var (
Expand Down Expand Up @@ -223,7 +222,7 @@ func getOptions(flags *pflag.FlagSet) (lib.Options, error) {
if errSts != nil {
return opts, errSts
}
if errSts = ui.ValidateSummary(trendStats); err != nil {
if _, errSts = stats.GetResolversForTrendColumns(trendStats); err != nil {
return opts, errSts
}
opts.SummaryTrendStats = trendStats
Expand Down
76 changes: 42 additions & 34 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/signal"
Expand All @@ -46,7 +48,6 @@ import (
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/consts"
"github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/ui"
"github.com/loadimpact/k6/ui/pb"
)

Expand Down Expand Up @@ -120,7 +121,7 @@ a commandline interface for interacting with it.`,
return err
}

r, err := newRunner(logger, src, runType, filesystems, runtimeOptions)
initRunner, err := newRunner(logger, src, runType, filesystems, runtimeOptions)
if err != nil {
return err
}
Expand All @@ -131,18 +132,18 @@ a commandline interface for interacting with it.`,
if err != nil {
return err
}
conf, err := getConsolidatedConfig(afero.NewOsFs(), cliConf, r)
conf, err := getConsolidatedConfig(afero.NewOsFs(), cliConf, initRunner)
if err != nil {
return err
}

conf, cerr := deriveAndValidateConfig(conf, r.IsExecutable)
conf, cerr := deriveAndValidateConfig(conf, initRunner.IsExecutable)
if cerr != nil {
return ExitCode{error: cerr, Code: invalidConfigErrorCode}
}

// Write options back to the runner too.
if err = r.SetOptions(conf.Options); err != nil {
if err = initRunner.SetOptions(conf.Options); err != nil {
return err
}

Expand All @@ -165,7 +166,7 @@ a commandline interface for interacting with it.`,

// Create a local execution scheduler wrapping the runner.
logger.Debug("Initializing the execution scheduler...")
execScheduler, err := local.NewExecutionScheduler(r, logger)
execScheduler, err := local.NewExecutionScheduler(initRunner, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,36 +291,18 @@ a commandline interface for interacting with it.`,
logger.Warn("No script iterations finished, consider making the test duration longer")
}

data := ui.SummaryData{
Metrics: engine.Metrics,
RootGroup: engine.ExecutionScheduler.GetRunner().GetDefaultGroup(),
Time: executionState.GetCurrentTestRunDuration(),
TimeUnit: conf.Options.SummaryTimeUnit.String,
}
// Print the end-of-test summary.
// Handle the end-of-test summary.
if !runtimeOptions.NoSummary.Bool {
fprintf(stdout, "\n")

s := ui.NewSummary(conf.SummaryTrendStats)
s.SummarizeMetrics(stdout, "", data)

fprintf(stdout, "\n")
}

if runtimeOptions.SummaryExport.ValueOrZero() != "" { //nolint:nestif
f, err := os.Create(runtimeOptions.SummaryExport.String)
summaryResult, err := initRunner.HandleSummary(globalCtx, &lib.Summary{
Metrics: engine.Metrics,
RootGroup: engine.ExecutionScheduler.GetRunner().GetDefaultGroup(),
TestRunDuration: executionState.GetCurrentTestRunDuration(),
})
if err == nil {
err = handleSummaryResult(afero.NewOsFs(), os.Stdout, os.Stderr, summaryResult)
}
if err != nil {
logger.WithError(err).Error("failed to create summary export file")
} else {
defer func() {
if err := f.Close(); err != nil {
logger.WithError(err).Error("failed to close summary export file")
}
}()
s := ui.NewSummary(conf.SummaryTrendStats)
if err := s.SummarizeMetricsJSON(f, data); err != nil {
logger.WithError(err).Error("failed to make summary export file")
}
logger.WithError(err).Error("failed to handle the end-of-test summary")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that in a sequential release we will fallback to a default representation or the dumping the lib.Summary as JSON to the disk?

Copy link
Member Author

@na-- na-- Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean by "a sequential release", but we'll get an error here only if something goes majorly wrong... Both initRunner.HandleSummary() and handleSummaryResult() try to continue working even if there are errors. handleSummaryResult() tries to save all files, even if some of them fail (because of wrong names, lack of permissions, etc.). And initRunner.HandleSummary() has internal try / catch in the JS wrapper code: https://github.com/loadimpact/k6/blob/0838813db6edead0f6faa3cdaadf3b5eb070a1ab/js/summary.go#L97-L104

Though I should add tests for both of these things...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I missed this code at the time ...

This is probably my biggest issue reviewing the PR, the code is all over the place, although I don't know how to fix that if it's at all possible given what the code tries to do, so I don't have some concrete feedback. I hope that when we rewrite even more of the whole metric and threshold (which specifically are also over the place) this might get more streamlined

}
}

Expand Down Expand Up @@ -449,3 +432,28 @@ func detectType(data []byte) string {
}
return typeJS
}

func handleSummaryResult(fs afero.Fs, stdOut, stdErr io.Writer, result map[string]io.Reader) error {
var errs []error

getWriter := func(path string) (io.Writer, error) {
switch path {
case "stdout":
return stdOut, nil
case "stderr":
return stdErr, nil
default:
return fs.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
}
}

for path, value := range result {
if writer, err := getWriter(path); err != nil {
errs = append(errs, fmt.Errorf("could not open '%s': %w", path, err))
} else if n, err := io.Copy(writer, value); err != nil {
errs = append(errs, fmt.Errorf("error saving summary to '%s' after %d bytes: %w", path, n, err))
}
}

return consolidateErrorMessage(errs, "Could not save some summary information:")
}
20 changes: 20 additions & 0 deletions js/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
package common

import (
"bytes"
"fmt"
"io"

"github.com/dop251/goja"
)

Expand All @@ -36,3 +40,19 @@ func Throw(rt *goja.Runtime, err error) {
}
panic(rt.NewGoError(err))
}

// GetReader tries to return an io.Reader value from an exported goja value.
func GetReader(data interface{}) (io.Reader, error) {
switch r := data.(type) {
case string:
return bytes.NewBufferString(r), nil
case []byte:
return bytes.NewBuffer(r), nil
case io.Reader:
return r, nil
case goja.ArrayBuffer:
return bytes.NewBuffer(r.Bytes()), nil
default:
return nil, fmt.Errorf("invalid type %T, it needs to be a string, byte array or an ArrayBuffer", data)
}
}
90 changes: 79 additions & 11 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/cookiejar"
Expand Down Expand Up @@ -244,10 +245,7 @@ func (r *Runner) newVU(id int64, samplesOut chan<- stats.SampleContainer) (*VU,
}

func (r *Runner) Setup(ctx context.Context, out chan<- stats.SampleContainer) error {
setupCtx, setupCancel := context.WithTimeout(
ctx,
time.Duration(r.Bundle.Options.SetupTimeout.Duration),
)
setupCtx, setupCancel := context.WithTimeout(ctx, r.getTimeoutFor(consts.SetupFn))
defer setupCancel()

v, err := r.runPart(setupCtx, out, consts.SetupFn, nil)
Expand Down Expand Up @@ -279,10 +277,7 @@ func (r *Runner) SetSetupData(data []byte) {
}

func (r *Runner) Teardown(ctx context.Context, out chan<- stats.SampleContainer) error {
teardownCtx, teardownCancel := context.WithTimeout(
ctx,
time.Duration(r.Bundle.Options.TeardownTimeout.Duration),
)
teardownCtx, teardownCancel := context.WithTimeout(ctx, r.getTimeoutFor(consts.TeardownFn))
defer teardownCancel()

var data interface{}
Expand Down Expand Up @@ -312,6 +307,77 @@ func (r *Runner) IsExecutable(name string) bool {
return exists
}

// HandleSummary calls the specified summary callback, if supplied.
func (r *Runner) HandleSummary(ctx context.Context, summary *lib.Summary) (map[string]io.Reader, error) {
summaryDataForJS := summarizeMetricsToObject(summary, r.Bundle.Options)

out := make(chan stats.SampleContainer, 100)
defer close(out)

go func() { // discard all metrics
for range out {
}
}()

vu, err := r.newVU(0, out)
if err != nil {
return nil, err
}

handleSummaryFn := goja.Undefined()
if exported := vu.Runtime.Get("exports").ToObject(vu.Runtime); exported != nil {
fn := exported.Get(consts.HandleSummaryFn)
if _, ok := goja.AssertFunction(fn); ok {
imiric marked this conversation as resolved.
Show resolved Hide resolved
handleSummaryFn = fn
} else if fn != nil && !goja.IsUndefined(fn) && !goja.IsNull(fn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and Get returns nil if it doesn't exist. Did you intend for exports.handleSummary = undefined; (or null) to be a valid thing that won't raise an error or were you over zealous in the checks?

I am not particularly certain it matters either way, to be honest. If someone has defined handleSummary it is probably better to use delete exports.handleSummary, on the other hand, I don't think we will be hitting it. So the question is mostly questing what was the idea behind it, not changing it one way or another

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were you over zealous in the checks?

Likely that, and it makes sense to leave only the nil check 👍

return nil, fmt.Errorf("exported identfier %s must be a function", consts.HandleSummaryFn)
}
}
ctx = common.WithRuntime(ctx, vu.Runtime)
ctx = lib.WithState(ctx, vu.state)
ctx, cancel := context.WithTimeout(ctx, r.getTimeoutFor(consts.HandleSummaryFn))
defer cancel()
go func() {
<-ctx.Done()
vu.Runtime.Interrupt(context.Canceled)
}()
*vu.Context = ctx

handleSummaryWrapperRaw, err := vu.Runtime.RunString(summaryWrapperLambdaCode)
if err != nil {
return nil, fmt.Errorf("unexpected error while getting the summary wrapper: %w", err)
}
handleSummaryWrapper, ok := goja.AssertFunction(handleSummaryWrapperRaw)
if !ok {
return nil, fmt.Errorf("unexpected error did not get a callable summary wrapper")
}

wrapperArgs := []goja.Value{
handleSummaryFn,
vu.Runtime.ToValue(r.Bundle.RuntimeOptions.SummaryExport.String),
vu.Runtime.ToValue(summaryDataForJS),
vu.Runtime.ToValue(getOldTextSummaryFunc(summary, r.Bundle.Options)), // TODO: remove
}
rawResult, _, _, err := vu.runFn(ctx, false, handleSummaryWrapper, wrapperArgs...)

// TODO: refactor the whole JS runner to avoid copy-pasting these complicated bits...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure you can just use runPart for most of the above.
You will need to keep the check for whether exports.handleSummary is a function, but everything else seems to be done by runPart either way ... I think ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I guess you will need to move https://github.com/loadimpact/k6/blob/0838813db6edead0f6faa3cdaadf3b5eb070a1ab/js/summary.go#L95-L118 to golang code after the runPart

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I initially tried using runPart, but it was an even bigger mess. We can figure out some way to refactor the whole js.Runner internally in the future to reduce the boilerplate, but for now I don't want to mess with it more if I don't need to... 😅

// deadline is reached so we have timeouted but this might've not been registered correctly
if deadline, ok := ctx.Deadline(); ok && time.Now().After(deadline) {
// we could have an error that is not context.Canceled in which case we should return it instead
if err, ok := err.(*goja.InterruptedError); ok && rawResult != nil && err.Value() != context.Canceled {
// TODO: silence this error?
return nil, err
}
// otherwise we have timeouted
return nil, lib.NewTimeoutError(consts.HandleSummaryFn, r.getTimeoutFor(consts.HandleSummaryFn))
}

if err != nil {
return nil, fmt.Errorf("unexpected error while generating the summary: %w", err)
}
return getSummaryResult(rawResult)
}

func (r *Runner) SetOptions(opts lib.Options) error {
r.Bundle.Options = opts
r.RPSLimit = nil
Expand Down Expand Up @@ -434,19 +500,21 @@ func (r *Runner) runPart(ctx context.Context, out chan<- stats.SampleContainer,
return v, err
}
// otherwise we have timeouted
return v, lib.NewTimeoutError(name, r.timeoutErrorDuration(name))
return v, lib.NewTimeoutError(name, r.getTimeoutFor(name))
}
return v, err
}

// timeoutErrorDuration returns the timeout duration for given stage.
func (r *Runner) timeoutErrorDuration(stage string) time.Duration {
// getTimeoutFor returns the timeout duration for given special script function.
func (r *Runner) getTimeoutFor(stage string) time.Duration {
d := time.Duration(0)
switch stage {
case consts.SetupFn:
return time.Duration(r.Bundle.Options.SetupTimeout.Duration)
case consts.TeardownFn:
return time.Duration(r.Bundle.Options.TeardownTimeout.Duration)
case consts.HandleSummaryFn:
return 2 * time.Minute // TODO: make configurable
}
return d
}
Expand Down
Loading