From c26893a069db5d6d61c8f08f7858eb49cd60c6c8 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Tue, 30 Apr 2024 10:52:35 -0700 Subject: [PATCH 1/4] [exporter/signalfx] Refactor exporter's context lifecycle --- .chloggen/goleak_signalfxexp.yaml | 27 +++++++++++++++ exporter/signalfxexporter/exporter.go | 19 +++++++---- exporter/signalfxexporter/exporter_test.go | 15 ++++---- .../generated_package_test.go | 6 ++-- .../internal/dimensions/dimclient.go | 34 +++++++++++-------- .../internal/dimensions/dimclient_test.go | 20 ++++++----- .../internal/dimensions/package_test.go | 14 ++++++++ .../internal/dimensions/requests.go | 22 +++++++----- .../internal/translation/converter.go | 12 +++++++ .../internal/translation/delta_translator.go | 13 ++++++- .../internal/translation/package_test.go | 14 ++++++++ .../internal/translation/translator.go | 12 +++++++ .../internal/utils/package_test.go | 14 ++++++++ exporter/signalfxexporter/metadata.yaml | 2 -- .../generated_package_test.go | 6 ++-- receiver/signalfxreceiver/go.mod | 1 + receiver/signalfxreceiver/metadata.yaml | 2 -- receiver/signalfxreceiver/receiver_test.go | 7 ++-- 18 files changed, 183 insertions(+), 57 deletions(-) create mode 100644 .chloggen/goleak_signalfxexp.yaml create mode 100644 exporter/signalfxexporter/internal/dimensions/package_test.go create mode 100644 exporter/signalfxexporter/internal/translation/package_test.go create mode 100644 exporter/signalfxexporter/internal/utils/package_test.go diff --git a/.chloggen/goleak_signalfxexp.yaml b/.chloggen/goleak_signalfxexp.yaml new file mode 100644 index 000000000000..ca89b6c87c91 --- /dev/null +++ b/.chloggen/goleak_signalfxexp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: signalfxexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak by re-organizing the exporter's functionality lifecycle + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/signalfxexporter/exporter.go b/exporter/signalfxexporter/exporter.go index ce53b2d2e9cd..0d46578c7c24 100644 --- a/exporter/signalfxexporter/exporter.go +++ b/exporter/signalfxexporter/exporter.go @@ -60,7 +60,6 @@ type signalfxExporter struct { hostMetadataSyncer *hostmetadata.Syncer converter *translation.MetricsConverter dimClient *dimensions.DimensionClient - cancelFn func() } // newSignalFxExporter returns a new SignalFx exporter. @@ -100,6 +99,9 @@ func newSignalFxExporter( } func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err error) { + if se.converter != nil { + se.converter.Start() + } ingestURL, err := se.config.getIngestURL() if err != nil { return err @@ -129,8 +131,6 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err if err != nil { return fmt.Errorf("could not load API TLS config: %w", err) } - cancellable, cancelFn := context.WithCancel(ctx) - se.cancelFn = cancelFn apiURL, err := se.config.getAPIURL() if err != nil { @@ -138,7 +138,6 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err } dimClient := dimensions.NewDimensionClient( - cancellable, dimensions.DimensionClientOptions{ Token: se.config.AccessToken, APIURL: apiURL, @@ -156,7 +155,9 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err IdleConnTimeout: se.config.DimensionClient.IdleConnTimeout, Timeout: se.config.DimensionClient.Timeout, }) - dimClient.Start() + // Use background context to conform to spec's requirements for a component's + // long-running background processes. + dimClient.Start(context.Background()) var hms *hostmetadata.Syncer if se.config.SyncHostMetadata { @@ -235,8 +236,12 @@ func (se *signalfxExporter) pushLogs(ctx context.Context, ld plog.Logs) error { } func (se *signalfxExporter) shutdown(_ context.Context) error { - if se.cancelFn != nil { - se.cancelFn() + if se.dimClient != nil { + se.dimClient.Shutdown() + } + + if se.converter != nil { + se.converter.Shutdown() } return nil } diff --git a/exporter/signalfxexporter/exporter_test.go b/exporter/signalfxexporter/exporter_test.go index 093f9fbb8222..7e6b53e709eb 100644 --- a/exporter/signalfxexporter/exporter_test.go +++ b/exporter/signalfxexporter/exporter_test.go @@ -1169,7 +1169,6 @@ func TestConsumeMetadata(t *testing.T) { logger := zap.NewNop() dimClient := dimensions.NewDimensionClient( - context.Background(), dimensions.DimensionClientOptions{ Token: "foo", APIURL: serverURL, @@ -1180,7 +1179,7 @@ func TestConsumeMetadata(t *testing.T) { MetricsConverter: *converter, ExcludeProperties: tt.excludeProperties, }) - dimClient.Start() + dimClient.Start(context.Background()) se := &signalfxExporter{ dimClient: dimClient, @@ -1203,6 +1202,10 @@ func TestConsumeMetadata(t *testing.T) { case <-c: // wait 500ms longer than send delay case <-time.After(tt.sendDelay + 500*time.Millisecond): + // If no updates are supposed to be sent, the server doesn't update dimensions, and + // doesn't call Done. This is correct behavior, so the test needs to account for it here, + // or a goroutine will be leaked. + defer wg.Done() require.True(t, tt.shouldNotSendUpdate, "timeout waiting for response") } @@ -1331,6 +1334,7 @@ func TestTLSExporterInit(t *testing.T) { sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopCreateSettings()) assert.NoError(t, err) err = sfx.start(context.Background(), componenttest.NewNopHost()) + defer func() { require.NoError(t, sfx.shutdown(context.Background())) }() if tt.wantErr { require.Error(t, err) if tt.wantErrMessage != "" { @@ -1402,6 +1406,7 @@ func TestTLSIngestConnection(t *testing.T) { assert.NoError(t, err) err = sfx.start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) + defer func() { assert.NoError(t, sfx.shutdown(context.Background())) }() _, err = sfx.pushMetricsData(context.Background(), metricsPayload) if tt.wantErr { @@ -1526,10 +1531,7 @@ func TestTLSAPIConnection(t *testing.T) { require.NoError(t, err) serverURL, err := url.Parse(tt.config.APIURL) assert.NoError(t, err) - cancellable, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() dimClient := dimensions.NewDimensionClient( - cancellable, dimensions.DimensionClientOptions{ Token: "", APIURL: serverURL, @@ -1540,7 +1542,8 @@ func TestTLSAPIConnection(t *testing.T) { MetricsConverter: *converter, APITLSConfig: apiTLSCfg, }) - dimClient.Start() + dimClient.Start(context.Background()) + defer func() { dimClient.Shutdown() }() se := &signalfxExporter{ dimClient: dimClient, diff --git a/exporter/signalfxexporter/generated_package_test.go b/exporter/signalfxexporter/generated_package_test.go index 914fdb307101..05bce43252c3 100644 --- a/exporter/signalfxexporter/generated_package_test.go +++ b/exporter/signalfxexporter/generated_package_test.go @@ -3,11 +3,11 @@ package signalfxexporter import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration - os.Exit(m.Run()) + goleak.VerifyTestMain(m) } diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient.go b/exporter/signalfxexporter/internal/dimensions/dimclient.go index 98ca7c6b102c..46658a2383a9 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient.go @@ -32,7 +32,7 @@ import ( // updates are currently not done by this port. type DimensionClient struct { sync.RWMutex - ctx context.Context + cancel context.CancelFunc Token configopaque.String APIURL *url.URL client *http.Client @@ -84,7 +84,7 @@ type DimensionClientOptions struct { } // NewDimensionClient returns a new client -func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *DimensionClient { +func NewDimensionClient(options DimensionClientOptions) *DimensionClient { client := &http.Client{ Timeout: options.Timeout, Transport: &http.Transport{ @@ -102,10 +102,9 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di TLSClientConfig: options.APITLSConfig, }, } - sender := NewReqSender(ctx, client, 20, map[string]string{"client": "dimension"}) + sender := NewReqSender(client, 20, map[string]string{"client": "dimension"}) return &DimensionClient{ - ctx: ctx, Token: options.Token, APIURL: options.APIURL, sendDelay: options.SendDelay, @@ -122,8 +121,15 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di } // Start the client's processing queue -func (dc *DimensionClient) Start() { - go dc.processQueue() +func (dc *DimensionClient) Start(ctx context.Context) { + ctx, dc.cancel = context.WithCancel(ctx) + go dc.processQueue(ctx) +} + +func (dc *DimensionClient) Shutdown() { + if dc.cancel != nil { + dc.cancel() + } } // acceptDimension to be sent to the API. This will return fairly quickly and @@ -185,10 +191,10 @@ func mergeTags(tagSets ...map[string]bool) map[string]bool { return out } -func (dc *DimensionClient) processQueue() { +func (dc *DimensionClient) processQueue(ctx context.Context) { for { select { - case <-dc.ctx.Done(): + case <-ctx.Done(): return case delayedDimUpdate := <-dc.delayedQueue: now := dc.now() @@ -201,7 +207,7 @@ func (dc *DimensionClient) processQueue() { delete(dc.delayedSet, delayedDimUpdate.Key()) dc.Unlock() - if err := dc.handleDimensionUpdate(delayedDimUpdate.DimensionUpdate); err != nil { + if err := dc.handleDimensionUpdate(ctx, delayedDimUpdate.DimensionUpdate); err != nil { dc.logger.Error( "Could not send dimension update", zap.Error(err), @@ -213,13 +219,13 @@ func (dc *DimensionClient) processQueue() { } // handleDimensionUpdate will set custom properties on a specific dimension value. -func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) error { +func (dc *DimensionClient) handleDimensionUpdate(ctx context.Context, dimUpdate *DimensionUpdate) error { var ( req *http.Request err error ) - req, err = dc.makePatchRequest(dimUpdate) + req, err = dc.makePatchRequest(ctx, dimUpdate) if err != nil { return err @@ -276,7 +282,7 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err } }))) - dc.requestSender.Send(req) + dc.requestSender.Send(ctx, req) return nil } @@ -290,7 +296,7 @@ func (dc *DimensionClient) makeDimURL(key, value string) (*url.URL, error) { return url, nil } -func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request, error) { +func (dc *DimensionClient) makePatchRequest(ctx context.Context, dim *DimensionUpdate) (*http.Request, error) { var ( tagsToAdd []string tagsToRemove []string @@ -319,7 +325,7 @@ func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request } req, err := http.NewRequestWithContext( - context.Background(), + ctx, "PATCH", strings.TrimRight(url.String(), "/")+"/_/sfxagent", bytes.NewReader(json)) diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go index 83db79effe35..8c32d7038fe1 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go @@ -102,14 +102,15 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can server.Close() }() - client := NewDimensionClient(ctx, DimensionClientOptions{ - APIURL: serverURL, - LogUpdates: true, - Logger: zap.NewNop(), - SendDelay: time.Second, - MaxBuffered: 10, - }) - client.Start() + client := NewDimensionClient( + DimensionClientOptions{ + APIURL: serverURL, + LogUpdates: true, + Logger: zap.NewNop(), + SendDelay: time.Second, + MaxBuffered: 10, + }) + client.Start(ctx) return client, dimCh, forcedResp, cancel } @@ -117,6 +118,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can func TestDimensionClient(t *testing.T) { client, dimCh, forcedResp, cancel := setup(t) defer cancel() + defer client.Shutdown() t.Run("send dimension update with properties and tags", func(t *testing.T) { require.NoError(t, client.acceptDimension(&DimensionUpdate{ @@ -310,6 +312,7 @@ func TestDimensionClient(t *testing.T) { func TestFlappyUpdates(t *testing.T) { client, dimCh, _, cancel := setup(t) defer cancel() + defer client.Shutdown() // Do some flappy updates for i := 0; i < 5; i++ { @@ -348,6 +351,7 @@ func TestFlappyUpdates(t *testing.T) { func TestInvalidUpdatesNotSent(t *testing.T) { client, dimCh, _, cancel := setup(t) defer cancel() + defer client.Shutdown() require.NoError(t, client.acceptDimension(&DimensionUpdate{ Name: "host", Value: "", diff --git a/exporter/signalfxexporter/internal/dimensions/package_test.go b/exporter/signalfxexporter/internal/dimensions/package_test.go new file mode 100644 index 000000000000..5b69b4ac3787 --- /dev/null +++ b/exporter/signalfxexporter/internal/dimensions/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dimensions + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/exporter/signalfxexporter/internal/dimensions/requests.go b/exporter/signalfxexporter/internal/dimensions/requests.go index ed957ddd014e..f328474404d5 100644 --- a/exporter/signalfxexporter/internal/dimensions/requests.go +++ b/exporter/signalfxexporter/internal/dimensions/requests.go @@ -20,12 +20,11 @@ type ReqSender struct { client *http.Client requests chan *http.Request workerCount uint - ctx context.Context additionalDimensions map[string]string runningWorkers *atomic.Int64 } -func NewReqSender(ctx context.Context, client *http.Client, +func NewReqSender(client *http.Client, workerCount uint, diagnosticDimensions map[string]string) *ReqSender { return &ReqSender{ client: client, @@ -33,36 +32,41 @@ func NewReqSender(ctx context.Context, client *http.Client, // Unbuffered so that it blocks clients requests: make(chan *http.Request), workerCount: workerCount, - ctx: ctx, runningWorkers: &atomic.Int64{}, } } // Send sends the request. Not thread-safe. -func (rs *ReqSender) Send(req *http.Request) { +func (rs *ReqSender) Send(ctx context.Context, req *http.Request) { // Slight optimization to avoid spinning up unnecessary workers if there // aren't ever that many dim updates. Once workers start, they remain for the // duration of the agent. select { + case <-ctx.Done(): + return case rs.requests <- req: return default: if rs.runningWorkers.Load() < int64(rs.workerCount) { - go rs.processRequests() + go rs.processRequests(ctx) } - // Block until we can get through a request - rs.requests <- req + // Block until we can get through a request, unless context has been cancelled. + select { + case <-ctx.Done(): + return + case rs.requests <- req: + } } } -func (rs *ReqSender) processRequests() { +func (rs *ReqSender) processRequests(ctx context.Context) { rs.runningWorkers.Add(1) defer rs.runningWorkers.Add(-1) for { select { - case <-rs.ctx.Done(): + case <-ctx.Done(): return case req := <-rs.requests: if err := rs.sendRequest(req); err != nil { diff --git a/exporter/signalfxexporter/internal/translation/converter.go b/exporter/signalfxexporter/internal/translation/converter.go index 9a63c8815e12..ffc5e91e1d18 100644 --- a/exporter/signalfxexporter/internal/translation/converter.go +++ b/exporter/signalfxexporter/internal/translation/converter.go @@ -65,6 +65,12 @@ func NewMetricsConverter( }, nil } +func (c *MetricsConverter) Start() { + if c.metricTranslator != nil { + c.metricTranslator.Start() + } +} + // MetricsToSignalFxV2 converts the passed in MetricsData to SFx datapoints // and if processHistograms is set, histogram metrics are not converted to SFx format. // It returns those datapoints and the number of time series that had to be @@ -161,6 +167,12 @@ func (c *MetricsConverter) ConvertDimension(dim string) string { return filterKeyChars(res, c.datapointValidator.nonAlphanumericDimChars) } +func (c *MetricsConverter) Shutdown() { + if c.metricTranslator != nil { + c.metricTranslator.Shutdown() + } +} + // Values obtained from https://dev.splunk.com/observability/docs/datamodel/ingest#Criteria-for-metric-and-dimension-names-and-values const ( maxMetricNameLength = 256 diff --git a/exporter/signalfxexporter/internal/translation/delta_translator.go b/exporter/signalfxexporter/internal/translation/delta_translator.go index 26f19c06a419..f7d1912681b5 100644 --- a/exporter/signalfxexporter/internal/translation/delta_translator.go +++ b/exporter/signalfxexporter/internal/translation/delta_translator.go @@ -20,10 +20,15 @@ func newDeltaTranslator(ttl int64, done chan struct{}) *deltaTranslator { sweepIntervalSeconds = 1 } m := ttlmap.New(sweepIntervalSeconds, ttl, done) - m.Start() return &deltaTranslator{prevPts: m} } +func (t *deltaTranslator) start() { + if t.prevPts != nil { + t.prevPts.Start() + } +} + func (t *deltaTranslator) translate(pts []*sfxpb.DataPoint, tr Rule) []*sfxpb.DataPoint { for _, currPt := range pts { deltaMetricName, ok := tr.Mapping[currPt.Metric] @@ -64,6 +69,12 @@ func (t *deltaTranslator) deltaPt(deltaMetricName string, currPt *sfxpb.DataPoin return deltaPt } +func (t *deltaTranslator) shutdown() { + if t.prevPts != nil { + t.prevPts.Shutdown() + } +} + func doubleDeltaPt(currPt *sfxpb.DataPoint, prevPt *sfxpb.DataPoint, deltaMetricName string) *sfxpb.DataPoint { delta := *currPt.Value.DoubleValue - *prevPt.Value.DoubleValue if delta < 0 { diff --git a/exporter/signalfxexporter/internal/translation/package_test.go b/exporter/signalfxexporter/internal/translation/package_test.go new file mode 100644 index 000000000000..82f4ca268e8f --- /dev/null +++ b/exporter/signalfxexporter/internal/translation/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/exporter/signalfxexporter/internal/translation/translator.go b/exporter/signalfxexporter/internal/translation/translator.go index b899de0dc86e..573691adce86 100644 --- a/exporter/signalfxexporter/internal/translation/translator.go +++ b/exporter/signalfxexporter/internal/translation/translator.go @@ -391,6 +391,12 @@ func getMetricNamesAsSlice(metricName string, metricNames map[string]bool) []str return out } +func (mp *MetricTranslator) Start() { + if mp.deltaTranslator != nil { + mp.deltaTranslator.start() + } +} + // TranslateDataPoints transforms datapoints to a format compatible with signalfx backend // sfxDataPoints represents one metric converted to signalfx protobuf datapoints func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint { @@ -540,6 +546,12 @@ func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoint return processedDataPoints } +func (mp *MetricTranslator) Shutdown() { + if mp.deltaTranslator != nil { + mp.deltaTranslator.shutdown() + } +} + func calcNewMetricInputPairs(processedDataPoints []*sfxpb.DataPoint, tr Rule) [][2]*sfxpb.DataPoint { var operand1Pts, operand2Pts []*sfxpb.DataPoint for _, dp := range processedDataPoints { diff --git a/exporter/signalfxexporter/internal/utils/package_test.go b/exporter/signalfxexporter/internal/utils/package_test.go new file mode 100644 index 000000000000..832862afb888 --- /dev/null +++ b/exporter/signalfxexporter/internal/utils/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/exporter/signalfxexporter/metadata.yaml b/exporter/signalfxexporter/metadata.yaml index fbc5bb90b455..7e11fe3224ed 100644 --- a/exporter/signalfxexporter/metadata.yaml +++ b/exporter/signalfxexporter/metadata.yaml @@ -18,5 +18,3 @@ tests: retry_on_failure: enabled: false expect_consumer_error: true - goleak: - skip: true diff --git a/receiver/signalfxreceiver/generated_package_test.go b/receiver/signalfxreceiver/generated_package_test.go index c0e3fc300353..224abed22636 100644 --- a/receiver/signalfxreceiver/generated_package_test.go +++ b/receiver/signalfxreceiver/generated_package_test.go @@ -3,11 +3,11 @@ package signalfxreceiver import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration - os.Exit(m.Run()) + goleak.VerifyTestMain(m) } diff --git a/receiver/signalfxreceiver/go.mod b/receiver/signalfxreceiver/go.mod index f8d9813da503..33fe6d322b4c 100644 --- a/receiver/signalfxreceiver/go.mod +++ b/receiver/signalfxreceiver/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/collector/receiver v0.100.1-0.20240517133416-ede9e304314d go.opentelemetry.io/otel/metric v1.26.0 go.opentelemetry.io/otel/trace v1.26.0 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) diff --git a/receiver/signalfxreceiver/metadata.yaml b/receiver/signalfxreceiver/metadata.yaml index 35daf291146f..85b0dbc7a894 100644 --- a/receiver/signalfxreceiver/metadata.yaml +++ b/receiver/signalfxreceiver/metadata.yaml @@ -11,5 +11,3 @@ status: emeritus: tests: - goleak: - skip: true diff --git a/receiver/signalfxreceiver/receiver_test.go b/receiver/signalfxreceiver/receiver_test.go index a91117f88776..008704dd8385 100644 --- a/receiver/signalfxreceiver/receiver_test.go +++ b/receiver/signalfxreceiver/receiver_test.go @@ -38,7 +38,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func Test_signalfxeceiver_New(t *testing.T) { +func Test_signalfxreceiver_New(t *testing.T) { defaultConfig := createDefaultConfig().(*Config) type args struct { config Config @@ -77,11 +77,14 @@ func Test_signalfxeceiver_New(t *testing.T) { } err = got.Start(context.Background(), componenttest.NewNopHost()) assert.Equal(t, tt.wantStartErr, err) + if err == nil { + assert.NoError(t, got.Shutdown(context.Background())) + } }) } } -func Test_signalfxeceiver_EndToEnd(t *testing.T) { +func Test_signalfxreceiver_EndToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) cfg := createDefaultConfig().(*Config) cfg.Endpoint = addr From ed173191a4763601fd786d44a1e93a8508ed833e Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Tue, 30 Apr 2024 11:26:11 -0700 Subject: [PATCH 2/4] Update .chloggen/goleak_signalfxexp.yaml --- .chloggen/goleak_signalfxexp.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/goleak_signalfxexp.yaml b/.chloggen/goleak_signalfxexp.yaml index ca89b6c87c91..6cb03bd5b113 100644 --- a/.chloggen/goleak_signalfxexp.yaml +++ b/.chloggen/goleak_signalfxexp.yaml @@ -10,7 +10,7 @@ component: signalfxexporter note: Fix memory leak by re-organizing the exporter's functionality lifecycle # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [32781] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From dfbe5e9af6399ac517dc4b0ed2391d82463e9766 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Mon, 6 May 2024 13:49:20 -0700 Subject: [PATCH 3/4] Change requested in PR - Don't pass in context background, just use it in start. --- exporter/signalfxexporter/exporter.go | 4 +--- exporter/signalfxexporter/exporter_test.go | 4 ++-- .../signalfxexporter/internal/dimensions/dimclient.go | 8 ++++++-- .../internal/dimensions/dimclient_test.go | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/exporter/signalfxexporter/exporter.go b/exporter/signalfxexporter/exporter.go index 0d46578c7c24..f42833724588 100644 --- a/exporter/signalfxexporter/exporter.go +++ b/exporter/signalfxexporter/exporter.go @@ -155,9 +155,7 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err IdleConnTimeout: se.config.DimensionClient.IdleConnTimeout, Timeout: se.config.DimensionClient.Timeout, }) - // Use background context to conform to spec's requirements for a component's - // long-running background processes. - dimClient.Start(context.Background()) + dimClient.Start() var hms *hostmetadata.Syncer if se.config.SyncHostMetadata { diff --git a/exporter/signalfxexporter/exporter_test.go b/exporter/signalfxexporter/exporter_test.go index 7e6b53e709eb..31baee421775 100644 --- a/exporter/signalfxexporter/exporter_test.go +++ b/exporter/signalfxexporter/exporter_test.go @@ -1179,7 +1179,7 @@ func TestConsumeMetadata(t *testing.T) { MetricsConverter: *converter, ExcludeProperties: tt.excludeProperties, }) - dimClient.Start(context.Background()) + dimClient.Start() se := &signalfxExporter{ dimClient: dimClient, @@ -1542,7 +1542,7 @@ func TestTLSAPIConnection(t *testing.T) { MetricsConverter: *converter, APITLSConfig: apiTLSCfg, }) - dimClient.Start(context.Background()) + dimClient.Start() defer func() { dimClient.Shutdown() }() se := &signalfxExporter{ diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient.go b/exporter/signalfxexporter/internal/dimensions/dimclient.go index 46658a2383a9..62366b433331 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient.go @@ -121,8 +121,12 @@ func NewDimensionClient(options DimensionClientOptions) *DimensionClient { } // Start the client's processing queue -func (dc *DimensionClient) Start(ctx context.Context) { - ctx, dc.cancel = context.WithCancel(ctx) +func (dc *DimensionClient) Start() { + var ctx context.Context + // The dimension client is started during the exporter's startup functionality. + // The collector spec states that for long-running operations, components should + // use the background context, rather than the passed in context. + ctx, dc.cancel = context.WithCancel(context.Background()) go dc.processQueue(ctx) } diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go index 8c32d7038fe1..24abea02039f 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go @@ -110,7 +110,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can SendDelay: time.Second, MaxBuffered: 10, }) - client.Start(ctx) + client.Start() return client, dimCh, forcedResp, cancel } From 3360bb60c754720147d44346e91d3bdc7a026a6d Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Mon, 12 Aug 2024 10:22:42 -0700 Subject: [PATCH 4/4] make gotidy --- receiver/signalfxreceiver/go.mod | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/receiver/signalfxreceiver/go.mod b/receiver/signalfxreceiver/go.mod index 0d945185c63d..de3a52fae7c5 100644 --- a/receiver/signalfxreceiver/go.mod +++ b/receiver/signalfxreceiver/go.mod @@ -11,15 +11,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.106.1 github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/collector/component v0.106.0 - go.opentelemetry.io/collector/config/confighttp v0.106.0 - go.opentelemetry.io/collector/config/configtls v1.12.0 - go.opentelemetry.io/collector/confmap v0.106.0 - go.opentelemetry.io/collector/consumer v0.106.0 - go.opentelemetry.io/collector/consumer/consumertest v0.106.0 - go.opentelemetry.io/collector/exporter v0.106.0 - go.opentelemetry.io/collector/pdata v1.12.0 - go.opentelemetry.io/collector/receiver v0.106.0 go.opentelemetry.io/collector/component v0.106.2-0.20240809191011-ef07ea073562 go.opentelemetry.io/collector/config/confighttp v0.106.2-0.20240809191011-ef07ea073562 go.opentelemetry.io/collector/config/configtls v1.12.1-0.20240809191011-ef07ea073562 @@ -29,8 +20,6 @@ require ( go.opentelemetry.io/collector/exporter v0.106.2-0.20240809191011-ef07ea073562 go.opentelemetry.io/collector/pdata v1.12.1-0.20240809191011-ef07ea073562 go.opentelemetry.io/collector/receiver v0.106.2-0.20240809191011-ef07ea073562 - go.opentelemetry.io/otel/metric v1.28.0 - go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 )