diff --git a/sdk/metric/internal/reuse_slice.go b/sdk/metric/internal/reuse_slice.go new file mode 100644 index 00000000000..9695492b0d1 --- /dev/null +++ b/sdk/metric/internal/reuse_slice.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +// ReuseSlice returns a zeroed view of slice if its capacity is greater than or +// equal to n. Otherwise, it returns a new []T with capacity equal to n. +func ReuseSlice[T any](slice []T, n int) []T { + if cap(slice) >= n { + return slice[:n] + } + return make([]T, n) +} diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index f9b405915fc..cc1072ce7ea 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -135,9 +135,8 @@ func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr err := fmt.Errorf("manual reader: invalid producer: %T", p) return err } - // TODO (#3047): When produce is updated to accept output as param, pass rm. - rmTemp, err := ph.produce(ctx) - *rm = rmTemp + + err := ph.produce(ctx, rm) if err != nil { return err } diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3ba93293bf7..5ae185f09e9 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -120,6 +120,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade flushCh: make(chan chan error), cancel: cancel, done: make(chan struct{}), + rmPool: sync.Pool{ + New: func() interface{} { + return &metricdata.ResourceMetrics{} + }}, } r.externalProducers.Store([]Producer{}) @@ -147,6 +151,8 @@ type periodicReader struct { done chan struct{} cancel context.CancelFunc shutdownOnce sync.Once + + rmPool sync.Pool } // Compile time check the periodicReader implements Reader and is comparable. @@ -214,11 +220,12 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio // the SDK and exports it with r's exporter. func (r *periodicReader) collectAndExport(ctx context.Context) error { // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. - rm := metricdata.ResourceMetrics{} - err := r.Collect(ctx, &rm) + rm := r.rmPool.Get().(*metricdata.ResourceMetrics) + err := r.Collect(ctx, rm) if err == nil { - err = r.export(ctx, rm) + err = r.export(ctx, *rm) } + r.rmPool.Put(rm) return err } @@ -233,15 +240,13 @@ func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet return errors.New("periodic reader: *metricdata.ResourceMetrics is nil") } // TODO (#3047): When collect is updated to accept output as param, pass rm. - rmTemp, err := r.collect(ctx, r.sdkProducer.Load()) - *rm = rmTemp - return err + return r.collect(ctx, r.sdkProducer.Load(), rm) } // collect unwraps p as a produceHolder and returns its produce results. -func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) { +func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error { if p == nil { - return metricdata.ResourceMetrics{}, ErrReaderNotRegistered + return ErrReaderNotRegistered } ph, ok := p.(produceHolder) @@ -251,12 +256,12 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata // happen, return an error instead of panicking so a users code does // not halt in the processes. err := fmt.Errorf("periodic reader: invalid producer: %T", p) - return metricdata.ResourceMetrics{}, err + return err } - rm, err := ph.produce(ctx) + err := ph.produce(ctx, rm) if err != nil { - return metricdata.ResourceMetrics{}, err + return err } var errs []error for _, producer := range r.externalProducers.Load().([]Producer) { @@ -266,7 +271,7 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } - return rm, unifyErrors(errs) + return unifyErrors(errs) } // export exports metric data m using r's exporter. @@ -313,11 +318,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { if ph != nil { // Reader was registered. // Flush pending telemetry. - var m metricdata.ResourceMetrics - m, err = r.collect(ctx, ph) + m := r.rmPool.Get().(*metricdata.ResourceMetrics) + err = r.collect(ctx, ph, m) if err == nil { - err = r.export(ctx, m) + err = r.export(ctx, *m) } + r.rmPool.Put(m) } sErr := r.exporter.Shutdown(ctx) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 47a44fda71c..4cad3698dd0 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -121,7 +121,7 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. -func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error { p.Lock() defer p.Unlock() @@ -132,7 +132,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err errs.append(err) } if err := ctx.Err(); err != nil { - return metricdata.ResourceMetrics{}, err + rm.Resource = nil + rm.ScopeMetrics = rm.ScopeMetrics[:0] + return err } } for e := p.multiCallbacks.Front(); e != nil; e = e.Next() { @@ -143,36 +145,39 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err } if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. - return metricdata.ResourceMetrics{}, err + rm.Resource = nil + rm.ScopeMetrics = rm.ScopeMetrics[:0] + return err } } - sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations)) + rm.Resource = p.resource + rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations)) + + i := 0 for scope, instruments := range p.aggregations { - metrics := make([]metricdata.Metrics, 0, len(instruments)) + rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments)) + j := 0 for _, inst := range instruments { data := inst.aggregator.Aggregation() if data != nil { - metrics = append(metrics, metricdata.Metrics{ - Name: inst.name, - Description: inst.description, - Unit: inst.unit, - Data: data, - }) + rm.ScopeMetrics[i].Metrics[j].Name = inst.name + rm.ScopeMetrics[i].Metrics[j].Description = inst.description + rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit + rm.ScopeMetrics[i].Metrics[j].Data = data + j++ } } - if len(metrics) > 0 { - sm = append(sm, metricdata.ScopeMetrics{ - Scope: scope, - Metrics: metrics, - }) + rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j] + if len(rm.ScopeMetrics[i].Metrics) > 0 { + rm.ScopeMetrics[i].Scope = scope + i++ } } - return metricdata.ResourceMetrics{ - Resource: p.resource, - ScopeMetrics: sm, - }, errs.errorOrNil() + rm.ScopeMetrics = rm.ScopeMetrics[:i] + + return errs.errorOrNil() } // inserter facilitates inserting of new instruments from a single scope into a diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index c7e8d31f5ed..581ab595776 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -42,7 +42,8 @@ func (testSumAggregator) Aggregation() metricdata.Aggregation { func TestEmptyPipeline(t *testing.T) { pipe := &pipeline{} - output, err := pipe.produce(context.Background()) + output := metricdata.ResourceMetrics{} + err := pipe.produce(context.Background(), &output) require.NoError(t, err) assert.Nil(t, output.Resource) assert.Len(t, output.ScopeMetrics, 0) @@ -56,7 +57,7 @@ func TestEmptyPipeline(t *testing.T) { pipe.addMultiCallback(func(context.Context) error { return nil }) }) - output, err = pipe.produce(context.Background()) + err = pipe.produce(context.Background(), &output) require.NoError(t, err) assert.Nil(t, output.Resource) require.Len(t, output.ScopeMetrics, 1) @@ -66,7 +67,8 @@ func TestEmptyPipeline(t *testing.T) { func TestNewPipeline(t *testing.T) { pipe := newPipeline(nil, nil, nil) - output, err := pipe.produce(context.Background()) + output := metricdata.ResourceMetrics{} + err := pipe.produce(context.Background(), &output) require.NoError(t, err) assert.Equal(t, resource.Empty(), output.Resource) assert.Len(t, output.ScopeMetrics, 0) @@ -80,7 +82,7 @@ func TestNewPipeline(t *testing.T) { pipe.addMultiCallback(func(context.Context) error { return nil }) }) - output, err = pipe.produce(context.Background()) + err = pipe.produce(context.Background(), &output) require.NoError(t, err) assert.Equal(t, resource.Empty(), output.Resource) require.Len(t, output.ScopeMetrics, 1) @@ -91,7 +93,8 @@ func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) pipe := newPipeline(res, nil, nil) - output, err := pipe.produce(context.Background()) + output := metricdata.ResourceMetrics{} + err := pipe.produce(context.Background(), &output) assert.NoError(t, err) assert.Equal(t, res, output.Resource) } @@ -99,6 +102,7 @@ func TestPipelineUsesResource(t *testing.T) { func TestPipelineConcurrency(t *testing.T) { pipe := newPipeline(nil, nil, nil) ctx := context.Background() + var output metricdata.ResourceMetrics var wg sync.WaitGroup const threads = 2 @@ -106,7 +110,7 @@ func TestPipelineConcurrency(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, _ = pipe.produce(ctx) + _ = pipe.produce(ctx, &output) }() wg.Add(1) @@ -167,7 +171,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { a.Aggregate(1, *attribute.EmptySet()) } - out, err := test.pipe.produce(context.Background()) + out := metricdata.ResourceMetrics{} + err = test.pipe.produce(context.Background(), &out) require.NoError(t, err) require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline") sm := out.ScopeMetrics[0] diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 9d6972b53d8..3950ea6ecef 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -99,7 +99,7 @@ type sdkProducer interface { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. - produce(context.Context) (metricdata.ResourceMetrics, error) + produce(context.Context, *metricdata.ResourceMetrics) error } // Producer produces metrics for a Reader from an external source. @@ -113,15 +113,15 @@ type Producer interface { // produceHolder is used as an atomic.Value to wrap the non-concrete producer // type. type produceHolder struct { - produce func(context.Context) (metricdata.ResourceMetrics, error) + produce func(context.Context, *metricdata.ResourceMetrics) error } // shutdownProducer produces an ErrReaderShutdown error always. type shutdownProducer struct{} // produce returns an ErrReaderShutdown error. -func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, error) { - return metricdata.ResourceMetrics{}, ErrReaderShutdown +func (p shutdownProducer) produce(context.Context, *metricdata.ResourceMetrics) error { + return ErrReaderShutdown } // TemporalitySelector selects the temporality to use based on the InstrumentKind. diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 85ddca62288..6c523cdf7a3 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -103,10 +103,11 @@ func (ts *readerTestSuite) TestMultipleForceFlush() { func (ts *readerTestSuite) TestMultipleRegister() { p0 := testSDKProducer{ - produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { + produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { // Differentiate this producer from the second by returning an // error. - return testResourceMetricsA, assert.AnError + *rm = testResourceMetricsA + return assert.AnError }, } p1 := testSDKProducer{} @@ -144,8 +145,9 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { ts.Reader.register(testSDKProducer{ - produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { - return metricdata.ResourceMetrics{}, assert.AnError + produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { + *rm = metricdata.ResourceMetrics{} + return assert.AnError }}) ts.Reader.RegisterProducer(testExternalProducer{}) @@ -252,14 +254,15 @@ var testResourceMetricsAB = metricdata.ResourceMetrics{ } type testSDKProducer struct { - produceFunc func(context.Context) (metricdata.ResourceMetrics, error) + produceFunc func(context.Context, *metricdata.ResourceMetrics) error } -func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p testSDKProducer) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error { if p.produceFunc != nil { - return p.produceFunc(ctx) + return p.produceFunc(ctx, rm) } - return testResourceMetricsA, nil + *rm = testResourceMetricsA + return nil } type testExternalProducer struct {