Skip to content

Commit

Permalink
Add support for non-default function execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mirić committed Apr 17, 2020
1 parent 1c2e79c commit 27c1a38
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 74 deletions.
35 changes: 24 additions & 11 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,30 @@ func (e *ExecutionScheduler) initVUsConcurrently(
func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- stats.SampleContainer) error {
logger := e.logger.WithField("phase", "local-execution-scheduler-init")

// Initialize each executor and do some basic validation.
e.state.SetExecutionStatus(lib.ExecutionStatusInitExecutors)
logger.Debugf("Start initializing executors...")
errMsg := "error while initializing executor %s: %s"
exports := e.runner.GetExports()
for _, exec := range e.executors {
executorConfig := exec.GetConfig()
execFn := executorConfig.GetExec().ValueOrZero()
execName := executorConfig.GetName()

if execFn == "" {
execFn = "default"
}
if _, ok := exports[execFn]; !ok {
return fmt.Errorf(errMsg, execName,
fmt.Sprintf("function '%s' not found in exports", execFn))
}
if err := exec.Init(ctx); err != nil {
return fmt.Errorf(errMsg, executorConfig.GetName(), err)
}
logger.Debugf("Initialized executor %s", executorConfig.GetName())
}

logger.Debugf("Finished initializing executors, start initializing VUs...")
vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan)
logger.WithFields(logrus.Fields{
"neededVUs": vusToInitialize,
Expand Down Expand Up @@ -265,17 +289,6 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- stats.S
return e.initVU(samplesOut, logger)
})

e.state.SetExecutionStatus(lib.ExecutionStatusInitExecutors)
logger.Debugf("Finished initializing needed VUs, start initializing executors...")
for _, exec := range e.executors {
executorConfig := exec.GetConfig()

if err := exec.Init(ctx); err != nil {
return fmt.Errorf("error while initializing executor %s: %s", executorConfig.GetName(), err)
}
logger.Debugf("Initialized executor %s", executorConfig.GetName())
}

e.state.SetExecutionStatus(lib.ExecutionStatusInitDone)
logger.Debugf("Initialization completed")
return nil
Expand Down
130 changes: 76 additions & 54 deletions js/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import (
"github.com/loadimpact/k6/lib/consts"

"github.com/dop251/goja"
"github.com/pkg/errors"
"github.com/spf13/afero"

"github.com/loadimpact/k6/js/common"
"github.com/loadimpact/k6/js/compiler"
jslib "github.com/loadimpact/k6/js/lib"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/loader"
"github.com/pkg/errors"
"github.com/spf13/afero"
)

// A Bundle is a self-contained bundle of scripts and resources.
Expand All @@ -44,7 +45,9 @@ type Bundle struct {
Filename *url.URL
Source string
Program *goja.Program
Options lib.Options
// exported functions, for validation only
Exports map[string]struct{}
Options lib.Options

BaseInitContext *InitContext

Expand All @@ -56,7 +59,8 @@ type Bundle struct {
type BundleInstance struct {
Runtime *goja.Runtime
Context *context.Context
Default goja.Callable
// exported functions, ready for execution
Exports map[string]goja.Callable
}

// NewBundle creates a new bundle from a source file and a filesystem.
Expand All @@ -79,6 +83,7 @@ func NewBundle(src *loader.SourceData, filesystems map[string]afero.Fs, rtOpts l
Filename: src.URL,
Source: code,
Program: pgm,
Exports: make(map[string]struct{}),
BaseInitContext: NewInitContext(rt, c, compatMode, new(context.Context),
filesystems, loader.Dir(src.URL)),
Env: rtOpts.Env,
Expand All @@ -88,44 +93,9 @@ func NewBundle(src *loader.SourceData, filesystems map[string]afero.Fs, rtOpts l
return nil, err
}

// Grab exports.
exportsV := rt.Get("exports")
if goja.IsNull(exportsV) || goja.IsUndefined(exportsV) {
return nil, errors.New("exports must be an object")
}
exports := exportsV.ToObject(rt)

// Validate the default function.
def := exports.Get("default")
if def == nil || goja.IsNull(def) || goja.IsUndefined(def) {
return nil, errors.New("script must export a default function")
}
if _, ok := goja.AssertFunction(def); !ok {
return nil, errors.New("default export must be a function")
}

// Extract/validate other exports.
for _, k := range exports.Keys() {
v := exports.Get(k)
switch k {
case "default": // Already checked above.
case "options":
data, err := json.Marshal(v.Export())
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &bundle.Options); err != nil {
return nil, err
}
case "setup":
if _, ok := goja.AssertFunction(v); !ok {
return nil, errors.New("exported 'setup' must be a function")
}
case "teardown":
if _, ok := goja.AssertFunction(v); !ok {
return nil, errors.New("exported 'teardown' must be a function")
}
}
err = bundle.getExports(rt)
if err != nil {
return nil, err
}

return &bundle, nil
Expand Down Expand Up @@ -153,8 +123,8 @@ func NewBundleFromArchive(arc *lib.Archive, rtOpts lib.RuntimeOptions) (*Bundle,
if err != nil {
return nil, err
}

initctx := NewInitContext(goja.New(), c, compatMode,
rt := goja.New()
initctx := NewInitContext(rt, c, compatMode,
new(context.Context), arc.Filesystems, arc.PwdURL)

env := arc.Env
Expand All @@ -170,14 +140,22 @@ func NewBundleFromArchive(arc *lib.Archive, rtOpts lib.RuntimeOptions) (*Bundle,
Filename: arc.FilenameURL,
Source: string(arc.Data),
Program: pgm,
Exports: make(map[string]struct{}),
Options: arc.Options,
BaseInitContext: initctx,
Env: env,
CompatibilityMode: compatMode,
}
if err := bundle.instantiate(bundle.BaseInitContext.runtime, bundle.BaseInitContext); err != nil {

if err = bundle.instantiate(rt, bundle.BaseInitContext); err != nil {
return nil, err
}

err = bundle.getExports(rt)
if err != nil {
return nil, err
}

return bundle, nil
}

Expand All @@ -202,6 +180,47 @@ func (b *Bundle) makeArchive() *lib.Archive {
return arc
}

// getExports validates and extracts exported objects
func (b *Bundle) getExports(rt *goja.Runtime) error {
exportsV := rt.Get("exports")
if goja.IsNull(exportsV) || goja.IsUndefined(exportsV) {
return errors.New("exports must be an object")
}
exports := exportsV.ToObject(rt)

for _, k := range exports.Keys() {
v := exports.Get(k)
switch k {
case "options":
data, err := json.Marshal(v.Export())
if err != nil {
return err
}
if err := json.Unmarshal(data, &b.Options); err != nil {
return err
}
case "setup":
if _, ok := goja.AssertFunction(v); !ok {
return errors.New("exported 'setup' must be a function")
}
case "teardown":
if _, ok := goja.AssertFunction(v); !ok {
return errors.New("exported 'teardown' must be a function")
}
default:
if _, ok := goja.AssertFunction(v); ok {
b.Exports[k] = struct{}{}
}
}
}

if len(b.Exports) == 0 {
return errors.New("no exported functions in script")
}

return nil
}

// Instantiate creates a new runtime from this bundle.
func (b *Bundle) Instantiate() (bi *BundleInstance, instErr error) {
// TODO: actually use a real context here, so that the instantiation can be killed
Expand All @@ -216,11 +235,18 @@ func (b *Bundle) Instantiate() (bi *BundleInstance, instErr error) {
return nil, err
}

// Grab the default function; type is already checked in NewBundle().
bi = &BundleInstance{
Runtime: rt,
Context: ctxPtr,
Exports: make(map[string]goja.Callable),
}

// Grab any exported functions that could be executed. These were
// already pre-validated in NewBundle(), just get them here.
exports := rt.Get("exports").ToObject(rt)
def, ok := goja.AssertFunction(exports.Get("default"))
if !ok || def == nil {
panic("exported default is not a function")
for k := range b.Exports {
fn, _ := goja.AssertFunction(exports.Get(k))
bi.Exports[k] = fn
}

jsOptions := rt.Get("options")
Expand All @@ -237,11 +263,7 @@ func (b *Bundle) Instantiate() (bi *BundleInstance, instErr error) {
}
})

return &BundleInstance{
Runtime: rt,
Context: ctxPtr,
Default: def,
}, instErr
return bi, instErr
}

// Instantiates the bundle into an existing runtime. Not public because it also messes with a bunch
Expand Down
26 changes: 22 additions & 4 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/cookiejar"
Expand Down Expand Up @@ -271,6 +272,12 @@ func (r *Runner) GetDefaultGroup() *lib.Group {
return r.defaultGroup
}

// GetExports returns the names of exported functions in the script
// (excluding setup() and teardown()) that can be used for execution.
func (r *Runner) GetExports() map[string]struct{} {
return r.Bundle.Exports
}

func (r *Runner) GetOptions() lib.Options {
return r.Bundle.Options
}
Expand Down Expand Up @@ -385,7 +392,10 @@ type ActiveVU struct {
// Activate the VU so it will be able to run code
func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU {
u.Runtime.ClearInterrupt()
// u.Env = params.Env

if params.Exec == "" {
params.Exec = "default"
}

go func() {
<-params.RunContext.Done()
Expand All @@ -398,7 +408,7 @@ func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU {
return &ActiveVU{u, params}
}

// RunOnce runs the default function once.
// RunOnce runs the configured Exec function once.
func (u *ActiveVU) RunOnce() error {
u.runMutex.Lock()
defer u.runMutex.Unlock()
Expand All @@ -417,8 +427,16 @@ func (u *ActiveVU) RunOnce() error {
}
}

// Call the default function.
_, isFullIteration, totalTime, err := u.runFn(u.RunContext, u.Runner.defaultGroup, true, u.Default, u.setupData)
fn, ok := u.Exports[u.Exec]
if !ok {
// Shouldn't happen; this is validated in ExecutionScheduler.Init()
panic(fmt.Sprintf("function '%s' not found in exports", u.Exec))
}

// Call the exported function.
_, isFullIteration, totalTime, err := u.runFn(
u.RunContext, u.Runner.defaultGroup, true, fn, u.setupData,
)

// If MinIterationDuration is specified and the iteration wasn't cancelled
// and was less than it, sleep for the remainder
Expand Down
18 changes: 13 additions & 5 deletions lib/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (

// ActiveVU represents an actively running virtual user.
type ActiveVU interface {
// Runs the VU once. The only way to interrupt the execution is to cancel
// the context given to InitializedVU.Activate()
// Run the configured exported function in the VU once. The only
// way to interrupt the execution is to cancel the context given
// to InitializedVU.Activate()
RunOnce() error
}

Expand All @@ -47,9 +48,9 @@ type InitializedVU interface {
type VUActivationParams struct {
RunContext context.Context
DeactivateCallback func()
// Env map[string]string
// Tags map[string]string
// Exec null.String
Env map[string]string
Tags map[string]string
Exec string
}

// A Runner is a factory for VUs. It should precompute as much as possible upon
Expand Down Expand Up @@ -90,4 +91,11 @@ type Runner interface {
// values and write it back to the runner.
GetOptions() Options
SetOptions(opts Options) error

// GetExports returns the names of exported functions in the script
// (excluding setup() and teardown()) that can be used for execution.
// This is a bit janky, but it's needed for validation during
// ExecutionScheduler.Init(). The empty struct is to avoid a
// circular dep or make lib depend on goja :-/
GetExports() map[string]struct{}
}
6 changes: 6 additions & 0 deletions lib/testutils/minirunner/minirunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (r MiniRunner) GetDefaultGroup() *lib.Group {
return r.Group
}

// GetExports satisfies lib.Runner, but is a no-op for MiniRunner since
// it doesn't deal with JS.
func (r MiniRunner) GetExports() map[string]struct{} {
return make(map[string]struct{})
}

// GetOptions returns the supplied options struct.
func (r MiniRunner) GetOptions() lib.Options {
return r.Options
Expand Down

0 comments on commit 27c1a38

Please sign in to comment.