Skip to content

Commit

Permalink
Drop NetTrail - emit iterations metrics in runner instead of dialer (#…
Browse files Browse the repository at this point in the history
…3908)

This is an artifact of both data sent/received and iterations being in
the same SampleContainer. So where one was emitted we needed to emit the
other.

That hasn't been a requirement for a while as evident by the dropping of
NetTrail with no breaking in outputs.

This also stops emitting `iteration_duration` in not default functions
(teardown, setup) as it already does for `iterations`.

Closes #1605
Part of #1250
  • Loading branch information
mstoykov authored Aug 22, 2024
1 parent 55eb8bc commit 93c7083
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 107 deletions.
85 changes: 63 additions & 22 deletions execution/scheduler_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func TestSchedulerSystemTags(t *testing.T) {

expTrailPVUTags := expCommonTrailTags.With("scenario", "per_vu_test")
expTrailSITags := expCommonTrailTags.With("scenario", "shared_test")
expNetTrailPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test")
expNetTrailSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test")
expDataSentPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test")
expDataSentSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test")

var gotCorrectTags int
for {
Expand All @@ -375,9 +375,13 @@ func TestSchedulerSystemTags(t *testing.T) {
if s.Tags == expTrailPVUTags || s.Tags == expTrailSITags {
gotCorrectTags++
}
case *netext.NetTrail:
if s.Tags == expNetTrailPVUTags || s.Tags == expNetTrailSITags {
gotCorrectTags++
default:
for _, sample := range s.GetSamples() {
if sample.Metric.Name == metrics.DataSentName {
if sample.Tags == expDataSentPVUTags || sample.Tags == expDataSentSITags {
gotCorrectTags++
}
}
}
}

Expand Down Expand Up @@ -487,7 +491,7 @@ func TestSchedulerRunCustomTags(t *testing.T) {
defer stopEmission()
require.NoError(t, execScheduler.Run(ctx, ctx, samples))
}()
var gotTrailTag, gotNetTrailTag bool
var gotTrailTag, gotDataSentTag bool
for {
select {
case sample := <-samples:
Expand All @@ -497,14 +501,16 @@ func TestSchedulerRunCustomTags(t *testing.T) {
gotTrailTag = true
}
}
if netTrail, ok := sample.(*netext.NetTrail); ok && !gotNetTrailTag {
tags := netTrail.Tags.Map()
if v, ok := tags["customTag"]; ok && v == "value" {
gotNetTrailTag = true
for _, s := range sample.GetSamples() {
if s.Metric.Name == metrics.DataSentName && !gotDataSentTag {
tags := s.Tags.Map()
if v, ok := tags["customTag"]; ok && v == "value" {
gotDataSentTag = true
}
}
}
case <-done:
if !gotTrailTag || !gotNetTrailTag {
if !gotTrailTag || !gotDataSentTag {
assert.FailNow(t, "a sample with expected tag wasn't received")
}
return
Expand Down Expand Up @@ -692,11 +698,15 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) {
gotSampleTags++
}
}
case *netext.NetTrail:
tags := s.Tags.Map()
for _, expTags := range expectedNetTrailTags {
if reflect.DeepEqual(expTags, tags) {
gotSampleTags++
case metrics.Samples:
for _, sample := range s.GetSamples() {
if sample.Metric.Name == metrics.DataSentName {
tags := sample.Tags.Map()
for _, expTags := range expectedNetTrailTags {
if reflect.DeepEqual(expTags, tags) {
gotSampleTags++
}
}
}
}
case metrics.ConnectedSamples:
Expand Down Expand Up @@ -1275,7 +1285,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
Value: expValue,
}
}
getDummyTrail := func(group string, emitIterations bool, addExpTags ...string) metrics.SampleContainer {
getNetworkSamples := func(group string, addExpTags ...string) metrics.SampleContainer {
expTags := []string{"group", group}
expTags = append(expTags, addExpTags...)
dialer := netext.NewDialer(
Expand All @@ -1284,25 +1294,56 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
)

ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)}
return dialer.GetTrail(time.Now(), time.Now(), true, emitIterations, ctm, piState.BuiltinMetrics)
return dialer.IOSamples(time.Now(), ctm, piState.BuiltinMetrics)
}

getIterationsSamples := func(group string, addExpTags ...string) metrics.SampleContainer {
expTags := []string{"group", group}
expTags = append(expTags, addExpTags...)
ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)}
startTime := time.Now()
endTime := time.Now()

return metrics.Samples([]metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: piState.BuiltinMetrics.IterationDuration,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: metrics.D(endTime.Sub(startTime)),
},
{
TimeSeries: metrics.TimeSeries{
Metric: piState.BuiltinMetrics.Iterations,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: 1,
},
})
}

// Initially give a long time (5s) for the execScheduler to start
expectIn(0, 5000, getSample(1, testCounter, "group", "::setup", "place", "setupBeforeSleep"))
expectIn(900, 1100, getSample(2, testCounter, "group", "::setup", "place", "setupAfterSleep"))
expectIn(0, 100, getDummyTrail("::setup", false))
expectIn(0, 100, getNetworkSamples("::setup"))

expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default"))
expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default"))
expectIn(0, 100, getDummyTrail("", true, "scenario", "default"))
expectIn(0, 100, getNetworkSamples("", "scenario", "default"))
expectIn(0, 100, getIterationsSamples("", "scenario", "default"))

expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default"))
expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default"))
expectIn(0, 100, getDummyTrail("", true, "scenario", "default"))
expectIn(0, 100, getNetworkSamples("", "scenario", "default"))
expectIn(0, 100, getIterationsSamples("", "scenario", "default"))

expectIn(0, 1000, getSample(3, testCounter, "group", "::teardown", "place", "teardownBeforeSleep"))
expectIn(900, 1100, getSample(4, testCounter, "group", "::teardown", "place", "teardownAfterSleep"))
expectIn(0, 100, getDummyTrail("::teardown", false))
expectIn(0, 100, getNetworkSamples("::teardown"))

for {
select {
Expand Down
35 changes: 32 additions & 3 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,15 +865,44 @@ func (u *VU) runFn(
u.Transport.CloseIdleConnections()
}

u.state.Samples <- u.Dialer.GetTrail(
startTime, endTime, isFullIteration,
isDefault, u.state.Tags.GetCurrentValues(), u.Runner.preInitState.BuiltinMetrics)
builtinMetrics := u.Runner.preInitState.BuiltinMetrics
ctm := u.state.Tags.GetCurrentValues()
u.state.Samples <- u.Dialer.IOSamples(endTime, ctm, builtinMetrics)

if isFullIteration && isDefault {
u.state.Samples <- iterationSamples(startTime, endTime, ctm, builtinMetrics)
}

v = unPromisify(v)

return v, isFullIteration, endTime.Sub(startTime), err
}

func iterationSamples(
startTime, endTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics,
) metrics.Samples {
return metrics.Samples([]metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.IterationDuration,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: metrics.D(endTime.Sub(startTime)),
},
{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.Iterations,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: 1,
},
})
}

func (u *ActiveVU) incrIteration() {
u.iteration++
u.state.Iteration = u.iteration
Expand Down
8 changes: 4 additions & 4 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,9 @@ func TestVUIntegrationMetrics(t *testing.T) {
require.NoError(t, err)
sampleCount := 0
builtinMetrics := r.preInitState.BuiltinMetrics
for i, sampleC := range metrics.GetBufferedSamples(samples) {
for j, s := range sampleC.GetSamples() {
sampleCount++
switch i + j {
for _, sampleC := range metrics.GetBufferedSamples(samples) {
for _, s := range sampleC.GetSamples() {
switch sampleCount {
case 0:
assert.Equal(t, 5.0, s.Value)
assert.Equal(t, "my_metric", s.Metric.Name)
Expand All @@ -814,6 +813,7 @@ func TestVUIntegrationMetrics(t *testing.T) {
assert.Same(t, builtinMetrics.Iterations, s.Metric, "`iterations` sample is after `iteration_duration`")
assert.Equal(t, float64(1), s.Value)
}
sampleCount++
}
}
assert.Equal(t, sampleCount, 5)
Expand Down
83 changes: 9 additions & 74 deletions lib/netext/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,20 @@ func (d *Dialer) DialContext(ctx context.Context, proto, addr string) (net.Conn,
return conn, err
}

// GetTrail creates a new NetTrail instance with the Dialer
// sent and received data metrics and the supplied times and tags.
// TODO: Refactor this according to
// https://github.com/k6io/k6/pull/1203#discussion_r337938370
func (d *Dialer) GetTrail(
startTime, endTime time.Time, fullIteration bool, emitIterations bool, ctm metrics.TagsAndMeta,
builtinMetrics *metrics.BuiltinMetrics,
) *NetTrail {
// IOSamples returns samples for data send and received since it last call and zeros out.
// It uses the provided time as the sample time and tags and builtinMetrics to build the samples.
func (d *Dialer) IOSamples(
sampleTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics,
) metrics.SampleContainer {
bytesWritten := atomic.SwapInt64(&d.BytesWritten, 0)
bytesRead := atomic.SwapInt64(&d.BytesRead, 0)
samples := []metrics.Sample{
return metrics.Samples([]metrics.Sample{
{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.DataSent,
Tags: ctm.Tags,
},
Time: endTime,
Time: sampleTime,
Metadata: ctm.Metadata,
Value: float64(bytesWritten),
},
Expand All @@ -94,43 +91,11 @@ func (d *Dialer) GetTrail(
Metric: builtinMetrics.DataReceived,
Tags: ctm.Tags,
},
Time: endTime,
Time: sampleTime,
Metadata: ctm.Metadata,
Value: float64(bytesRead),
},
}
if fullIteration {
samples = append(samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.IterationDuration,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: metrics.D(endTime.Sub(startTime)),
})
if emitIterations {
samples = append(samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: builtinMetrics.Iterations,
Tags: ctm.Tags,
},
Time: endTime,
Metadata: ctm.Metadata,
Value: 1,
})
}
}

return &NetTrail{
BytesRead: bytesRead,
BytesWritten: bytesWritten,
FullIteration: fullIteration,
StartTime: startTime,
EndTime: endTime,
Tags: ctm.Tags,
Samples: samples,
}
})
}

func (d *Dialer) getDialAddr(addr string) (string, error) {
Expand Down Expand Up @@ -208,36 +173,6 @@ func (d *Dialer) getConfiguredHost(addr, host, port string) (*types.Host, error)
return nil, nil //nolint:nilnil
}

// NetTrail contains information about the exchanged data size and length of a
// series of connections from a particular netext.Dialer
type NetTrail struct {
BytesRead int64
BytesWritten int64
FullIteration bool
StartTime time.Time
EndTime time.Time
Tags *metrics.TagSet
Samples []metrics.Sample
}

// Ensure that interfaces are implemented correctly
var _ metrics.ConnectedSampleContainer = &NetTrail{}

// GetSamples implements the metrics.SampleContainer interface.
func (ntr *NetTrail) GetSamples() []metrics.Sample {
return ntr.Samples
}

// GetTags implements the metrics.ConnectedSampleContainer interface.
func (ntr *NetTrail) GetTags() *metrics.TagSet {
return ntr.Tags
}

// GetTime implements the metrics.ConnectedSampleContainer interface.
func (ntr *NetTrail) GetTime() time.Time {
return ntr.EndTime
}

// Conn wraps net.Conn and keeps track of sent and received data size
type Conn struct {
net.Conn
Expand Down
4 changes: 0 additions & 4 deletions output/cloud/insights/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/require"

"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/lib/netext"
"go.k6.io/k6/lib/netext/httpext"
"go.k6.io/k6/metrics"
)
Expand Down Expand Up @@ -51,9 +50,6 @@ func Test_Collector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestM
&httpext.Trail{
// HTTP trail without trace ID should be ignored
},
&netext.NetTrail{
// Net trail should be ignored
},
&httpext.Trail{
EndTime: time.Unix(20, 0),
Duration: time.Second,
Expand Down

0 comments on commit 93c7083

Please sign in to comment.