Skip to content

Commit

Permalink
Add support for executor-specific tags
Browse files Browse the repository at this point in the history
See #1300
  • Loading branch information
Ivan Mirić committed Apr 30, 2020
1 parent 4913ad0 commit 71a7180
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 25 deletions.
109 changes: 109 additions & 0 deletions core/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import (
"github.com/loadimpact/k6/lib/executor"
"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/netext"
"github.com/loadimpact/k6/lib/netext/httpext"
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
"github.com/loadimpact/k6/lib/testutils/minirunner"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/loader"
Expand Down Expand Up @@ -277,6 +279,113 @@ func TestExecutionSchedulerRunEnv(t *testing.T) {
}
}

func TestExecutionSchedulerRunCustomTags(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
defer tb.Cleanup()
sr := tb.Replacer.Replace

scriptTemplate := sr(`
import http from "k6/http";
export let options = {
execution: {
executor: {
type: "%s",
gracefulStop: "0.5s",
%s
}
}
}
export default function () {
http.get("HTTPBIN_IP_URL/");
}`)

executorConfigs := map[string]string{
"constant-arrival-rate": `
rate: 1,
timeUnit: "0.5s",
duration: "0.5s",
preAllocatedVUs: 1,
maxVUs: 2,`,
"constant-looping-vus": `
vus: 1,
duration: "0.5s",`,
"externally-controlled": `
vus: 1,
duration: "0.5s",`,
"per-vu-iterations": `
vus: 1,
iterations: 1,`,
"shared-iterations": `
vus: 1,
iterations: 1,`,
"variable-arrival-rate": `
startRate: 5,
timeUnit: "0.5s",
preAllocatedVUs: 1,
maxVUs: 2,
stages: [ { target: 10, duration: "1s" } ],`,
"variable-looping-vus": `
startVUs: 1,
stages: [ { target: 1, duration: "0.5s" } ],`,
}

testCases := []struct{ name, script string }{}

// Generate tests using custom tags
for ename, econf := range executorConfigs {
configWithCustomTag := econf + "tags: { customTag: 'value' }"
testCases = append(testCases, struct{ name, script string }{
ename, fmt.Sprintf(scriptTemplate, ename, configWithCustomTag)})
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
runner, err := js.New(&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: []byte(tc.script)},
nil, lib.RuntimeOptions{})
require.NoError(t, err)

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := NewExecutionScheduler(runner, logger)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

done := make(chan struct{})
samples := make(chan stats.SampleContainer)
go func() {
assert.NoError(t, execScheduler.Init(ctx, samples))
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
}()
var gotTag bool
for {
select {
case sample := <-samples:
if trail, ok := sample.(*httpext.Trail); ok && !gotTag {
tags := trail.Tags.CloneTags()
if v, ok := tags["customTag"]; ok && v == "value" {
gotTag = true
}
}
case <-done:
if !gotTag {
assert.FailNow(t, "sample with tag wasn't received")
}
return
}
}
})
}
}

func TestExecutionSchedulerSetupTeardownRun(t *testing.T) {
t.Parallel()
t.Run("Normal", func(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU {
params.Exec = "default"
}

if len(params.Tags) > 0 {
tags := u.Runner.Bundle.Options.RunTags.CloneTags()
for k, v := range params.Tags {
tags[k] = v
}
u.Runner.Bundle.Options.RunTags = stats.IntoSampleTags(&tags)
}

avu := &ActiveVU{
VU: u,
VUActivationParams: params,
Expand Down
11 changes: 9 additions & 2 deletions lib/executor/base_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
"strings"
"time"

"github.com/loadimpact/k6/lib/types"
null "gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/types"
)

// DefaultGracefulStopValue is the graceful top value for all executors, unless
Expand All @@ -46,8 +47,9 @@ type BaseConfig struct {
GracefulStop types.NullDuration `json:"gracefulStop"`
Env map[string]string `json:"env"`
Exec null.String `json:"exec"` // function name, externally validated
Tags map[string]string `json:"tags"`

// TODO: future extensions like tags, distribution, others?
// TODO: future extensions like distribution, others?
}

// NewBaseConfig returns a default base config with the default values
Expand Down Expand Up @@ -122,6 +124,11 @@ func (bc BaseConfig) GetExec() null.String {
return bc.Exec
}

// GetTags returns any custom tags configured for the executor.
func (bc BaseConfig) GetTags() map[string]string {
return bc.Tags
}

// IsDistributable returns true since by default all executors could be run in
// a distributed manner.
func (bc BaseConfig) IsDistributable() bool {
Expand Down
7 changes: 5 additions & 2 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,17 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
}
}()

execFn := car.GetConfig().GetExec().ValueOrZero()
env := car.GetConfig().GetEnv()
conf := car.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(&lib.VUActivationParams{
RunContext: maxDurationCtx,
Exec: execFn,
Env: env,
Tags: tags,
DeactivateCallback: func() {
car.executionState.ReturnVU(initVU, true)
activeVUsWg.Done()
Expand Down
7 changes: 5 additions & 2 deletions lib/executor/constant_looping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ func (clv ConstantLoopingVUs) Run(ctx context.Context, out chan<- stats.SampleCo
regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(clv.executionState, clv.logger)

execFn := clv.GetConfig().GetExec().ValueOrZero()
env := clv.GetConfig().GetEnv()
conf := clv.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
handleVU := func(initVU lib.InitializedVU) {
ctx, cancel := context.WithCancel(maxDurationCtx)
defer cancel()
Expand All @@ -186,6 +188,7 @@ func (clv ConstantLoopingVUs) Run(ctx context.Context, out chan<- stats.SampleCo
RunContext: ctx,
Exec: execFn,
Env: env,
Tags: tags,
DeactivateCallback: func() {
clv.executionState.ReturnVU(initVU, true)
activeVUs.Done()
Expand Down
18 changes: 11 additions & 7 deletions lib/executor/externally_controlled.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ type manualVUHandle struct {
func newManualVUHandle(
parentCtx context.Context, state *lib.ExecutionState,
localActiveVUsCount *int64, initVU lib.InitializedVU,
exec string, env map[string]string, logger *logrus.Entry,
exec string, env map[string]string, tags map[string]string,
logger *logrus.Entry,
) *manualVUHandle {
wg := sync.WaitGroup{}
getVU := func() (lib.InitializedVU, error) {
Expand All @@ -358,7 +359,7 @@ func newManualVUHandle(
}
ctx, cancel := context.WithCancel(parentCtx)
return &manualVUHandle{
vuHandle: newStoppedVUHandle(ctx, getVU, returnVU, exec, env, logger),
vuHandle: newStoppedVUHandle(ctx, getVU, returnVU, exec, env, tags, logger),
initVU: initVU,
wg: &wg,
cancelVU: cancel,
Expand All @@ -378,7 +379,7 @@ type externallyControlledRunState struct {
vuHandles []*manualVUHandle // handles for manipulating and tracking all of the VUs
currentlyPaused bool // whether the executor is currently paused
exec string
env map[string]string
env, tags map[string]string

runIteration func(context.Context, lib.ActiveVU) // a helper closure function that runs a single iteration
}
Expand All @@ -395,7 +396,7 @@ func (rs *externallyControlledRunState) retrieveStartMaxVUs() error {
}
vuHandle := newManualVUHandle(
rs.ctx, rs.executor.executionState, rs.activeVUsCount,
initVU, rs.exec, rs.env, rs.executor.logger.WithField("vuNum", i),
initVU, rs.exec, rs.env, rs.tags, rs.executor.logger.WithField("vuNum", i),
)
go vuHandle.runLoopsIfPossible(rs.runIteration)
rs.vuHandles[i] = vuHandle
Expand Down Expand Up @@ -454,7 +455,7 @@ func (rs *externallyControlledRunState) handleConfigChange(oldCfg, newCfg Extern
}
vuHandle := newManualVUHandle(
rs.ctx, executionState, rs.activeVUsCount, initVU, rs.exec,
rs.env, rs.executor.logger.WithField("vuNum", i),
rs.env, rs.tags, rs.executor.logger.WithField("vuNum", i),
)
go vuHandle.runLoopsIfPossible(rs.runIteration)
rs.vuHandles = append(rs.vuHandles, vuHandle)
Expand Down Expand Up @@ -516,14 +517,17 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats
logrus.Fields{"type": externallyControlledType, "duration": duration},
).Debug("Starting executor run...")

execFn := mex.GetConfig().GetExec().ValueOrZero()
env := mex.GetConfig().GetEnv()
conf := mex.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
startMaxVUs := mex.executionState.Options.ExecutionSegment.Scale(mex.config.MaxVUs.Int64)
runState := &externallyControlledRunState{
ctx: ctx,
executor: mex,
exec: execFn,
env: env,
tags: tags,
startMaxVUs: startMaxVUs,
duration: duration,
vuHandles: make([]*manualVUHandle, startMaxVUs),
Expand Down
7 changes: 5 additions & 2 deletions lib/executor/per_vu_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta
regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(pvi.executionState, pvi.logger)

execFn := pvi.GetConfig().GetExec().ValueOrZero()
env := pvi.GetConfig().GetEnv()
conf := pvi.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
handleVU := func(initVU lib.InitializedVU) {
ctx, cancel := context.WithCancel(maxDurationCtx)
defer cancel()
Expand All @@ -206,6 +208,7 @@ func (pvi PerVUIterations) Run(ctx context.Context, out chan<- stats.SampleConta
RunContext: ctx,
Exec: execFn,
Env: env,
Tags: tags,
DeactivateCallback: func() {
pvi.executionState.ReturnVU(initVU, true)
activeVUs.Done()
Expand Down
7 changes: 5 additions & 2 deletions lib/executor/shared_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta

attemptedIters := new(uint64)

execFn := si.GetConfig().GetExec().ValueOrZero()
env := si.GetConfig().GetEnv()
conf := si.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
handleVU := func(initVU lib.InitializedVU) {
ctx, cancel := context.WithCancel(maxDurationCtx)
defer cancel()
Expand All @@ -226,6 +228,7 @@ func (si SharedIterations) Run(ctx context.Context, out chan<- stats.SampleConta
RunContext: ctx,
Exec: execFn,
Env: env,
Tags: tags,
DeactivateCallback: func() {
si.executionState.ReturnVU(initVU, true)
activeVUs.Done()
Expand Down
7 changes: 5 additions & 2 deletions lib/executor/variable_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,17 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
}
}()

execFn := varr.GetConfig().GetExec().ValueOrZero()
env := varr.GetConfig().GetEnv()
conf := varr.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(&lib.VUActivationParams{
RunContext: maxDurationCtx,
Exec: execFn,
Env: env,
Tags: tags,
DeactivateCallback: func() {
varr.executionState.ReturnVU(initVU, true)
activeVUsWg.Done()
Expand Down
8 changes: 5 additions & 3 deletions lib/executor/variable_looping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,15 @@ func (vlv VariableLoopingVUs) Run(ctx context.Context, out chan<- stats.SampleCo
activeVUs.Done()
}

execFn := vlv.GetConfig().GetExec().ValueOrZero()
env := vlv.GetConfig().GetEnv()
conf := vlv.GetConfig()
execFn := conf.GetExec().ValueOrZero()
env := conf.GetEnv()
tags := conf.GetTags()
vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
maxDurationCtx, getVU, returnVU, execFn, env,
vlv.logger.WithField("vuNum", i))
tags, vlv.logger.WithField("vuNum", i))
go vuHandle.runLoopsIfPossible(runIteration)
vuHandles[i] = vuHandle
}
Expand Down
5 changes: 4 additions & 1 deletion lib/executor/vu_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type vuHandle struct {
returnVU func(lib.InitializedVU)
exec string
env map[string]string
tags map[string]string

canStartIter chan struct{}

Expand All @@ -52,7 +53,7 @@ type vuHandle struct {
func newStoppedVUHandle(
parentCtx context.Context, getVU func() (lib.InitializedVU, error),
returnVU func(lib.InitializedVU), exec string, env map[string]string,
logger *logrus.Entry,
tags map[string]string, logger *logrus.Entry,
) *vuHandle {
lock := &sync.RWMutex{}
ctx, cancel := context.WithCancel(parentCtx)
Expand All @@ -63,6 +64,7 @@ func newStoppedVUHandle(
returnVU: returnVU,
exec: exec,
env: env,
tags: tags,

canStartIter: make(chan struct{}),

Expand Down Expand Up @@ -166,6 +168,7 @@ mainLoop:
Exec: vh.exec,
RunContext: ctx,
Env: vh.env,
Tags: vh.tags,
DeactivateCallback: deactivateVU,
})
}
Expand Down
Loading

0 comments on commit 71a7180

Please sign in to comment.