From 739e49d66413a13ffa9a234c621bd05bb1bacc89 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Thu, 22 Aug 2024 12:11:05 +0300 Subject: [PATCH 1/3] Drop NetTrail - no longer needed --- execution/scheduler_ext_test.go | 34 ++++++++++++++-------- lib/netext/dialer.go | 42 ++------------------------- output/cloud/insights/collect_test.go | 4 --- 3 files changed, 24 insertions(+), 56 deletions(-) diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index e175f3db526..611239fe30f 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -375,9 +375,13 @@ func TestSchedulerSystemTags(t *testing.T) { if s.Tags == expTrailPVUTags || s.Tags == expTrailSITags { gotCorrectTags++ } - case *netext.NetTrail: - if s.Tags == expNetTrailPVUTags || s.Tags == expNetTrailSITags { - gotCorrectTags++ + default: + for _, sample := range s.GetSamples() { + if sample.Metric.Name == metrics.DataSentName { + if sample.Tags == expNetTrailPVUTags || sample.Tags == expNetTrailSITags { + gotCorrectTags++ + } + } } } @@ -497,10 +501,12 @@ func TestSchedulerRunCustomTags(t *testing.T) { gotTrailTag = true } } - if netTrail, ok := sample.(*netext.NetTrail); ok && !gotNetTrailTag { - tags := netTrail.Tags.Map() - if v, ok := tags["customTag"]; ok && v == "value" { - gotNetTrailTag = true + for _, s := range sample.GetSamples() { + if s.Metric.Name == metrics.DataSentName && !gotNetTrailTag { + tags := s.Tags.Map() + if v, ok := tags["customTag"]; ok && v == "value" { + gotNetTrailTag = true + } } } case <-done: @@ -692,11 +698,15 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { gotSampleTags++ } } - case *netext.NetTrail: - tags := s.Tags.Map() - for _, expTags := range expectedNetTrailTags { - if reflect.DeepEqual(expTags, tags) { - gotSampleTags++ + case metrics.Samples: + for _, sample := range s.GetSamples() { + if sample.Metric.Name == metrics.DataSentName { + tags := sample.Tags.Map() + for _, expTags := range expectedNetTrailTags { + if reflect.DeepEqual(expTags, tags) { + gotSampleTags++ + } + } } } case metrics.ConnectedSamples: diff --git a/lib/netext/dialer.go b/lib/netext/dialer.go index e9db26fb2b6..0d0cf84f60d 100644 --- a/lib/netext/dialer.go +++ b/lib/netext/dialer.go @@ -76,7 +76,7 @@ func (d *Dialer) DialContext(ctx context.Context, proto, addr string) (net.Conn, func (d *Dialer) GetTrail( startTime, endTime time.Time, fullIteration bool, emitIterations bool, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, -) *NetTrail { +) metrics.SampleContainer { bytesWritten := atomic.SwapInt64(&d.BytesWritten, 0) bytesRead := atomic.SwapInt64(&d.BytesRead, 0) samples := []metrics.Sample{ @@ -122,15 +122,7 @@ func (d *Dialer) GetTrail( } } - return &NetTrail{ - BytesRead: bytesRead, - BytesWritten: bytesWritten, - FullIteration: fullIteration, - StartTime: startTime, - EndTime: endTime, - Tags: ctm.Tags, - Samples: samples, - } + return metrics.Samples(samples) } func (d *Dialer) getDialAddr(addr string) (string, error) { @@ -208,36 +200,6 @@ func (d *Dialer) getConfiguredHost(addr, host, port string) (*types.Host, error) return nil, nil //nolint:nilnil } -// NetTrail contains information about the exchanged data size and length of a -// series of connections from a particular netext.Dialer -type NetTrail struct { - BytesRead int64 - BytesWritten int64 - FullIteration bool - StartTime time.Time - EndTime time.Time - Tags *metrics.TagSet - Samples []metrics.Sample -} - -// Ensure that interfaces are implemented correctly -var _ metrics.ConnectedSampleContainer = &NetTrail{} - -// GetSamples implements the metrics.SampleContainer interface. -func (ntr *NetTrail) GetSamples() []metrics.Sample { - return ntr.Samples -} - -// GetTags implements the metrics.ConnectedSampleContainer interface. -func (ntr *NetTrail) GetTags() *metrics.TagSet { - return ntr.Tags -} - -// GetTime implements the metrics.ConnectedSampleContainer interface. -func (ntr *NetTrail) GetTime() time.Time { - return ntr.EndTime -} - // Conn wraps net.Conn and keeps track of sent and received data size type Conn struct { net.Conn diff --git a/output/cloud/insights/collect_test.go b/output/cloud/insights/collect_test.go index aeca952b9bc..6350dc8eb76 100644 --- a/output/cloud/insights/collect_test.go +++ b/output/cloud/insights/collect_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "go.k6.io/k6/cloudapi/insights" - "go.k6.io/k6/lib/netext" "go.k6.io/k6/lib/netext/httpext" "go.k6.io/k6/metrics" ) @@ -51,9 +50,6 @@ func Test_Collector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestM &httpext.Trail{ // HTTP trail without trace ID should be ignored }, - &netext.NetTrail{ - // Net trail should be ignored - }, &httpext.Trail{ EndTime: time.Unix(20, 0), Duration: time.Second, From c7599e4b0f385c14a621a08f903a47e67064446e Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Thu, 22 Aug 2024 12:58:36 +0300 Subject: [PATCH 2/3] js: stop emitting iterations data in the dialer This is an artifact of both data sent/received and iterations being in the same SampleContainer. So where one was emitted we needed to emit the other. That hasn't been a requirement for a while as evident by the dropping of NetTrail with no breaking in outputs. This also stops emitting `iteration_duration` in not default functions (teardown, setup) as it already does for `iterations`. Closes #1605 Part of #1250 --- execution/scheduler_ext_test.go | 43 ++++++++++++++++++++++++++++----- js/runner.go | 35 ++++++++++++++++++++++++--- js/runner_test.go | 8 +++--- lib/netext/dialer.go | 37 ++++------------------------ 4 files changed, 78 insertions(+), 45 deletions(-) diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index 611239fe30f..b9f5aeaf34e 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -1285,7 +1285,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { Value: expValue, } } - getDummyTrail := func(group string, emitIterations bool, addExpTags ...string) metrics.SampleContainer { + getNetworkSamples := func(group string, addExpTags ...string) metrics.SampleContainer { expTags := []string{"group", group} expTags = append(expTags, addExpTags...) dialer := netext.NewDialer( @@ -1294,25 +1294,56 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { ) ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)} - return dialer.GetTrail(time.Now(), time.Now(), true, emitIterations, ctm, piState.BuiltinMetrics) + return dialer.Sample(time.Now(), ctm, piState.BuiltinMetrics) + } + + getIterationsSamples := func(group string, addExpTags ...string) metrics.SampleContainer { + expTags := []string{"group", group} + expTags = append(expTags, addExpTags...) + ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)} + startTime := time.Now() + endTime := time.Now() + + return metrics.Samples([]metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: piState.BuiltinMetrics.IterationDuration, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: metrics.D(endTime.Sub(startTime)), + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: piState.BuiltinMetrics.Iterations, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: 1, + }, + }) } // Initially give a long time (5s) for the execScheduler to start expectIn(0, 5000, getSample(1, testCounter, "group", "::setup", "place", "setupBeforeSleep")) expectIn(900, 1100, getSample(2, testCounter, "group", "::setup", "place", "setupAfterSleep")) - expectIn(0, 100, getDummyTrail("::setup", false)) + expectIn(0, 100, getNetworkSamples("::setup")) expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default")) expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default")) - expectIn(0, 100, getDummyTrail("", true, "scenario", "default")) + expectIn(0, 100, getNetworkSamples("", "scenario", "default")) + expectIn(0, 100, getIterationsSamples("", "scenario", "default")) expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default")) expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default")) - expectIn(0, 100, getDummyTrail("", true, "scenario", "default")) + expectIn(0, 100, getNetworkSamples("", "scenario", "default")) + expectIn(0, 100, getIterationsSamples("", "scenario", "default")) expectIn(0, 1000, getSample(3, testCounter, "group", "::teardown", "place", "teardownBeforeSleep")) expectIn(900, 1100, getSample(4, testCounter, "group", "::teardown", "place", "teardownAfterSleep")) - expectIn(0, 100, getDummyTrail("::teardown", false)) + expectIn(0, 100, getNetworkSamples("::teardown")) for { select { diff --git a/js/runner.go b/js/runner.go index a3741fc7f86..c4b8d229011 100644 --- a/js/runner.go +++ b/js/runner.go @@ -865,15 +865,44 @@ func (u *VU) runFn( u.Transport.CloseIdleConnections() } - u.state.Samples <- u.Dialer.GetTrail( - startTime, endTime, isFullIteration, - isDefault, u.state.Tags.GetCurrentValues(), u.Runner.preInitState.BuiltinMetrics) + builtinMetrics := u.Runner.preInitState.BuiltinMetrics + ctm := u.state.Tags.GetCurrentValues() + u.state.Samples <- u.Dialer.Sample(endTime, ctm, builtinMetrics) + + if isFullIteration && isDefault { + u.state.Samples <- iterationSamples(startTime, endTime, ctm, builtinMetrics) + } v = unPromisify(v) return v, isFullIteration, endTime.Sub(startTime), err } +func iterationSamples( + startTime, endTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, +) metrics.Samples { + return metrics.Samples([]metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.IterationDuration, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: metrics.D(endTime.Sub(startTime)), + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.Iterations, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: 1, + }, + }) +} + func (u *ActiveVU) incrIteration() { u.iteration++ u.state.Iteration = u.iteration diff --git a/js/runner_test.go b/js/runner_test.go index c0ca818086c..0808547a42f 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -794,10 +794,9 @@ func TestVUIntegrationMetrics(t *testing.T) { require.NoError(t, err) sampleCount := 0 builtinMetrics := r.preInitState.BuiltinMetrics - for i, sampleC := range metrics.GetBufferedSamples(samples) { - for j, s := range sampleC.GetSamples() { - sampleCount++ - switch i + j { + for _, sampleC := range metrics.GetBufferedSamples(samples) { + for _, s := range sampleC.GetSamples() { + switch sampleCount { case 0: assert.Equal(t, 5.0, s.Value) assert.Equal(t, "my_metric", s.Metric.Name) @@ -814,6 +813,7 @@ func TestVUIntegrationMetrics(t *testing.T) { assert.Same(t, builtinMetrics.Iterations, s.Metric, "`iterations` sample is after `iteration_duration`") assert.Equal(t, float64(1), s.Value) } + sampleCount++ } } assert.Equal(t, sampleCount, 5) diff --git a/lib/netext/dialer.go b/lib/netext/dialer.go index 0d0cf84f60d..4480e9fad64 100644 --- a/lib/netext/dialer.go +++ b/lib/netext/dialer.go @@ -69,17 +69,14 @@ func (d *Dialer) DialContext(ctx context.Context, proto, addr string) (net.Conn, return conn, err } -// GetTrail creates a new NetTrail instance with the Dialer +// Sample creates a new NetTrail instance with the Dialer // sent and received data metrics and the supplied times and tags. -// TODO: Refactor this according to -// https://github.com/k6io/k6/pull/1203#discussion_r337938370 -func (d *Dialer) GetTrail( - startTime, endTime time.Time, fullIteration bool, emitIterations bool, ctm metrics.TagsAndMeta, - builtinMetrics *metrics.BuiltinMetrics, +func (d *Dialer) Sample( + endTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, ) metrics.SampleContainer { bytesWritten := atomic.SwapInt64(&d.BytesWritten, 0) bytesRead := atomic.SwapInt64(&d.BytesRead, 0) - samples := []metrics.Sample{ + return metrics.Samples([]metrics.Sample{ { TimeSeries: metrics.TimeSeries{ Metric: builtinMetrics.DataSent, @@ -98,31 +95,7 @@ func (d *Dialer) GetTrail( Metadata: ctm.Metadata, Value: float64(bytesRead), }, - } - if fullIteration { - samples = append(samples, metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.IterationDuration, - Tags: ctm.Tags, - }, - Time: endTime, - Metadata: ctm.Metadata, - Value: metrics.D(endTime.Sub(startTime)), - }) - if emitIterations { - samples = append(samples, metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.Iterations, - Tags: ctm.Tags, - }, - Time: endTime, - Metadata: ctm.Metadata, - Value: 1, - }) - } - } - - return metrics.Samples(samples) + }) } func (d *Dialer) getDialAddr(addr string) (string, error) { From cb63950a687fc613cabae765f5257d03eb3d7917 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Thu, 22 Aug 2024 15:32:14 +0300 Subject: [PATCH 3/3] fixup! js: stop emitting iterations data in the dialer --- execution/scheduler_ext_test.go | 16 ++++++++-------- js/runner.go | 2 +- lib/netext/dialer.go | 12 ++++++------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index b9f5aeaf34e..3fbffdf0ed9 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -363,8 +363,8 @@ func TestSchedulerSystemTags(t *testing.T) { expTrailPVUTags := expCommonTrailTags.With("scenario", "per_vu_test") expTrailSITags := expCommonTrailTags.With("scenario", "shared_test") - expNetTrailPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test") - expNetTrailSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test") + expDataSentPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test") + expDataSentSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test") var gotCorrectTags int for { @@ -378,7 +378,7 @@ func TestSchedulerSystemTags(t *testing.T) { default: for _, sample := range s.GetSamples() { if sample.Metric.Name == metrics.DataSentName { - if sample.Tags == expNetTrailPVUTags || sample.Tags == expNetTrailSITags { + if sample.Tags == expDataSentPVUTags || sample.Tags == expDataSentSITags { gotCorrectTags++ } } @@ -491,7 +491,7 @@ func TestSchedulerRunCustomTags(t *testing.T) { defer stopEmission() require.NoError(t, execScheduler.Run(ctx, ctx, samples)) }() - var gotTrailTag, gotNetTrailTag bool + var gotTrailTag, gotDataSentTag bool for { select { case sample := <-samples: @@ -502,15 +502,15 @@ func TestSchedulerRunCustomTags(t *testing.T) { } } for _, s := range sample.GetSamples() { - if s.Metric.Name == metrics.DataSentName && !gotNetTrailTag { + if s.Metric.Name == metrics.DataSentName && !gotDataSentTag { tags := s.Tags.Map() if v, ok := tags["customTag"]; ok && v == "value" { - gotNetTrailTag = true + gotDataSentTag = true } } } case <-done: - if !gotTrailTag || !gotNetTrailTag { + if !gotTrailTag || !gotDataSentTag { assert.FailNow(t, "a sample with expected tag wasn't received") } return @@ -1294,7 +1294,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { ) ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)} - return dialer.Sample(time.Now(), ctm, piState.BuiltinMetrics) + return dialer.IOSamples(time.Now(), ctm, piState.BuiltinMetrics) } getIterationsSamples := func(group string, addExpTags ...string) metrics.SampleContainer { diff --git a/js/runner.go b/js/runner.go index c4b8d229011..56dae12768a 100644 --- a/js/runner.go +++ b/js/runner.go @@ -867,7 +867,7 @@ func (u *VU) runFn( builtinMetrics := u.Runner.preInitState.BuiltinMetrics ctm := u.state.Tags.GetCurrentValues() - u.state.Samples <- u.Dialer.Sample(endTime, ctm, builtinMetrics) + u.state.Samples <- u.Dialer.IOSamples(endTime, ctm, builtinMetrics) if isFullIteration && isDefault { u.state.Samples <- iterationSamples(startTime, endTime, ctm, builtinMetrics) diff --git a/lib/netext/dialer.go b/lib/netext/dialer.go index 4480e9fad64..0faaef9fb63 100644 --- a/lib/netext/dialer.go +++ b/lib/netext/dialer.go @@ -69,10 +69,10 @@ func (d *Dialer) DialContext(ctx context.Context, proto, addr string) (net.Conn, return conn, err } -// Sample creates a new NetTrail instance with the Dialer -// sent and received data metrics and the supplied times and tags. -func (d *Dialer) Sample( - endTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, +// IOSamples returns samples for data send and received since it last call and zeros out. +// It uses the provided time as the sample time and tags and builtinMetrics to build the samples. +func (d *Dialer) IOSamples( + sampleTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, ) metrics.SampleContainer { bytesWritten := atomic.SwapInt64(&d.BytesWritten, 0) bytesRead := atomic.SwapInt64(&d.BytesRead, 0) @@ -82,7 +82,7 @@ func (d *Dialer) Sample( Metric: builtinMetrics.DataSent, Tags: ctm.Tags, }, - Time: endTime, + Time: sampleTime, Metadata: ctm.Metadata, Value: float64(bytesWritten), }, @@ -91,7 +91,7 @@ func (d *Dialer) Sample( Metric: builtinMetrics.DataReceived, Tags: ctm.Tags, }, - Time: endTime, + Time: sampleTime, Metadata: ctm.Metadata, Value: float64(bytesRead), },