Skip to content

Commit

Permalink
[exporter/signalfx] Fix correlation timeout bug (#9101)
Browse files Browse the repository at this point in the history
* Fix SignalFx Exporter correlation timeout bug

The SignalFx Exporter supports correlations between metrics
and traces. This means that when a trace occurs from a given
host, with dimensions like service and environment, these
dimensions can be used to query metrics for these traces.
These correlations are supposed to timeout after the
stale_service_timeout amount of time passes, but they
weren't. This fix makes sure correlations timeout so that
metrics aren't linked with stale trace data.

* Update CHANGELOG

* Add doc file for timeutils package

* Changes requested in PR

- Add public type comment
- Move implementing checker right under struct declaration
- Add spaces between functions
- Fix failing PR checks: Delay in tests failed in automation,
add license block in test file.

* Address racing test bug

* Fix tests that had a race in them

GitHub automation was failing unit tests because of a race
detected. The solution was to add a mutex to checking and
setting the variable in question.

* Address lint and unit test failure

* Fix linting failure and test timing

Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
  • Loading branch information
crobert-1 and dmitryax authored Apr 12, 2022
1 parent 891b4b4 commit d3508a8
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- `hostmetricsreceiver`: Use cpu times for time delta in cpu.utilization calculation (#8857)
- `dynatraceexporter`: Remove overly verbose stacktrace from certain logs (#8989)
- `googlecloudexporter`: fix the `exporter.googlecloud.OTLPDirect` fature-gate, which was not applied when the flag was provided (#9116)
- `signalfxexporter`: Fix bug to enable timeouts for correlating traces and metrics (#9101)
- `windowsperfcountersreceiver`: fix exported values being integers instead of doubles (#9138)

### 🚩 Deprecations 🚩
Expand Down
5 changes: 4 additions & 1 deletion exporter/signalfxexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ require (
go.uber.org/zap v1.21.0
)

require gopkg.in/yaml.v2 v2.4.0
require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.48.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
Expand Down
15 changes: 13 additions & 2 deletions exporter/signalfxexporter/internal/correlation/correlation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

Expand All @@ -36,6 +37,7 @@ type Tracker struct {
cfg *Config
params component.ExporterCreateSettings
traceTracker *tracetracker.ActiveServiceTracker
pTicker timeutils.TTicker
correlation *correlationContext
accessToken string
}
Expand Down Expand Up @@ -118,6 +120,9 @@ func (cor *Tracker) AddSpans(ctx context.Context, traces pdata.Traces) error {
nil,
cor.cfg.SyncAttributes)

cor.pTicker = &timeutils.PolicyTicker{OnTickFunc: cor.traceTracker.Purge}
cor.pTicker.Start(cor.cfg.StaleServiceTimeout)

cor.correlation.Start()
})

Expand All @@ -140,8 +145,14 @@ func (cor *Tracker) Start(_ context.Context, host component.Host) (err error) {

// Shutdown correlation tracking.
func (cor *Tracker) Shutdown(_ context.Context) error {
if cor != nil && cor.correlation != nil {
cor.correlation.cancel()
if cor != nil {
if cor.correlation != nil {
cor.correlation.cancel()
}

if cor.pTicker != nil {
cor.pTicker.Stop()
}
}
return nil
}
19 changes: 19 additions & 0 deletions internal/coreinternal/timeutils/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package idutils provides a set of helper functions to convert ids.
//
// Functions in big_endian_converter.go help converting uint64 ids to TraceID
// and SpanID using big endian, and vice versa.
package timeutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
62 changes: 62 additions & 0 deletions internal/coreinternal/timeutils/ticker_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package timeutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"

import "time"

// TTicker interface allows easier testing of Ticker related functionality
type TTicker interface {
// start sets the frequency of the Ticker and starts the periodic calls to OnTick.
Start(d time.Duration)
// OnTick is called when the Ticker fires.
OnTick()
// Stop firing the Ticker.
Stop()
}

// Implements TTicker and abstracts underlying time ticker's functionality to make usage
// simpler.
type PolicyTicker struct {
Ticker *time.Ticker
OnTickFunc func()
StopCh chan struct{}
}

// Ensure PolicyTicker implements TTicker interface
var _ TTicker = (*PolicyTicker)(nil)

func (pt *PolicyTicker) Start(d time.Duration) {
pt.Ticker = time.NewTicker(d)
pt.StopCh = make(chan struct{})
go func() {
for {
select {
case <-pt.Ticker.C:
pt.OnTick()
case <-pt.StopCh:
return
}
}
}()
}

func (pt *PolicyTicker) OnTick() {
pt.OnTickFunc()
}

func (pt *PolicyTicker) Stop() {
close(pt.StopCh)
pt.Ticker.Stop()
}
88 changes: 88 additions & 0 deletions internal/coreinternal/timeutils/ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package timeutils

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type testObj struct {
mu sync.Mutex
foo int
}

func (to *testObj) IncrementFoo() {
to.mu.Lock()
defer to.mu.Unlock()
to.foo++
}

func checkFooValue(to *testObj, expectedValue int) bool {
to.mu.Lock()
defer to.mu.Unlock()
return to.foo == expectedValue
}

func TestPolicyTickerFails(t *testing.T) {
to := &testObj{foo: 0}
pTicker := &PolicyTicker{OnTickFunc: to.IncrementFoo}

// Tickers with a duration <= 0 should panic
assert.Panics(t, func() { pTicker.Start(time.Duration(0)) })
assert.Panics(t, func() { pTicker.Start(time.Duration(-1)) })
}

func TestPolicyTickerStart(t *testing.T) {
to := &testObj{foo: 0}
pTicker := &PolicyTicker{OnTickFunc: to.IncrementFoo}

// Make sure no ticks occur when we immediately stop the ticker
time.Sleep(100 * time.Millisecond)
assert.True(t, checkFooValue(to, 0), "Expected value: %d, actual: %d", 0, to.foo)
pTicker.Start(1 * time.Second)
pTicker.Stop()
assert.True(t, checkFooValue(to, 0), "Expected value: %d, actual: %d", 0, to.foo)
}

func TestPolicyTickerSucceeds(t *testing.T) {
// Start the ticker, make sure variable is incremented properly,
// also make sure stop works as expected.
to := &testObj{foo: 0}
pTicker := &PolicyTicker{OnTickFunc: to.IncrementFoo}

expectedTicks := 4
defaultDuration := 500 * time.Millisecond
// Extra padding reduces the chance of a race happening here
padDuration := 200 * time.Millisecond
testSleepDuration := time.Duration(expectedTicks)*defaultDuration + padDuration

pTicker.Start(defaultDuration)
time.Sleep(testSleepDuration)
assert.True(t, checkFooValue(to, expectedTicks), "Expected value: %d, actual: %d", expectedTicks, to.foo)

pTicker.Stop()
// Since these tests are time sensitive they can be flaky. By getting the count
// after stopping, we can still test to make sure it's no longer being incremented,
// without requiring there by no OnTick calls between the sleep call and stopping.
time.Sleep(defaultDuration)
expectedTicks = to.foo

time.Sleep(testSleepDuration)
assert.True(t, checkFooValue(to, expectedTicks), "Expected value: %d, actual: %d", expectedTicks, to.foo)
}
10 changes: 5 additions & 5 deletions processor/tailsamplingprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/google/uuid v1.3.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.48.0
github.com/stretchr/testify v1.7.1
go.opencensus.io v0.23.0
go.opentelemetry.io/collector v0.48.0
Expand All @@ -15,23 +16,22 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/knadh/koanf v1.4.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
github.com/spf13/cast v1.4.1 // indirect
go.opentelemetry.io/otel v1.6.3 // indirect
go.opentelemetry.io/otel/metric v0.29.0 // indirect
go.opentelemetry.io/otel/trace v1.6.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
7 changes: 0 additions & 7 deletions processor/tailsamplingprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d3508a8

Please sign in to comment.