Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse memory in metric pipelines #3760

Merged
merged 11 commits into from
Mar 9, 2023
24 changes: 24 additions & 0 deletions sdk/metric/internal/reuse_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

// ReuseSlice returns a zeroed view of slice if its capacity is greater than or
// equal to n. Otherwise, it returns a new []T with capacity equal to n.
func ReuseSlice[T any](slice []T, n int) []T {
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
if cap(slice) >= n {
return slice[:n]
}
return make([]T, n)
}
5 changes: 2 additions & 3 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return err
}
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rmTemp, err := ph.produce(ctx)
*rm = rmTemp

err := ph.produce(ctx, rm)
if err != nil {
return err
}
Expand Down
36 changes: 21 additions & 15 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
New: func() interface{} {
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store([]Producer{})

Expand Down Expand Up @@ -147,6 +151,8 @@ type periodicReader struct {
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once

rmPool sync.Pool
}

// Compile time check the periodicReader implements Reader and is comparable.
Expand Down Expand Up @@ -214,11 +220,12 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
err := r.Collect(ctx, &rm)
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, rm)
err = r.export(ctx, *rm)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}
r.rmPool.Put(rm)
return err
}

Expand All @@ -233,15 +240,13 @@ func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
*rm = rmTemp
return err
return r.collect(ctx, r.sdkProducer.Load(), rm)
}

// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) {
func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -251,12 +256,12 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}

rm, err := ph.produce(ctx)
err := ph.produce(ctx, rm)
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
Expand All @@ -266,7 +271,7 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}

// export exports metric data m using r's exporter.
Expand Down Expand Up @@ -313,11 +318,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {

if ph != nil { // Reader was registered.
// Flush pending telemetry.
var m metricdata.ResourceMetrics
m, err = r.collect(ctx, ph)
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, m)
err = r.export(ctx, *m)
}
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
r.rmPool.Put(m)
}

sErr := r.exporter.Shutdown(ctx)
Expand Down
45 changes: 25 additions & 20 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
p.Lock()
defer p.Unlock()

Expand All @@ -132,7 +132,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
errs.append(err)
}
if err := ctx.Err(); err != nil {
return metricdata.ResourceMetrics{}, err
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
Expand All @@ -143,36 +145,39 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
return metricdata.ResourceMetrics{}, err
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}

sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
rm.Resource = p.resource
rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))

i := 0
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
j := 0
for _, inst := range instruments {
data := inst.aggregator.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Description: inst.description,
Unit: inst.unit,
Data: data,
})
rm.ScopeMetrics[i].Metrics[j].Name = inst.name
rm.ScopeMetrics[i].Metrics[j].Description = inst.description
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
rm.ScopeMetrics[i].Metrics[j].Data = data
j++
}
}
if len(metrics) > 0 {
sm = append(sm, metricdata.ScopeMetrics{
Scope: scope,
Metrics: metrics,
})
rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
if len(rm.ScopeMetrics[i].Metrics) > 0 {
rm.ScopeMetrics[i].Scope = scope
i++
}
}

return metricdata.ResourceMetrics{
Resource: p.resource,
ScopeMetrics: sm,
}, errs.errorOrNil()
rm.ScopeMetrics = rm.ScopeMetrics[:i]

return errs.errorOrNil()
}

// inserter facilitates inserting of new instruments from a single scope into a
Expand Down
19 changes: 12 additions & 7 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (testSumAggregator) Aggregation() metricdata.Aggregation {
func TestEmptyPipeline(t *testing.T) {
pipe := &pipeline{}

output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Nil(t, output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
Expand All @@ -56,7 +57,7 @@ func TestEmptyPipeline(t *testing.T) {
pipe.addMultiCallback(func(context.Context) error { return nil })
})

output, err = pipe.produce(context.Background())
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Nil(t, output.Resource)
require.Len(t, output.ScopeMetrics, 1)
Expand All @@ -66,7 +67,8 @@ func TestEmptyPipeline(t *testing.T) {
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil)

output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
Expand All @@ -80,7 +82,7 @@ func TestNewPipeline(t *testing.T) {
pipe.addMultiCallback(func(context.Context) error { return nil })
})

output, err = pipe.produce(context.Background())
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
require.Len(t, output.ScopeMetrics, 1)
Expand All @@ -91,22 +93,24 @@ func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)

output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
assert.NoError(t, err)
assert.Equal(t, res, output.Resource)
}

func TestPipelineConcurrency(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
ctx := context.Background()
var output metricdata.ResourceMetrics

var wg sync.WaitGroup
const threads = 2
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = pipe.produce(ctx)
_ = pipe.produce(ctx, &output)
}()

wg.Add(1)
Expand Down Expand Up @@ -167,7 +171,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
a.Aggregate(1, *attribute.EmptySet())
}

out, err := test.pipe.produce(context.Background())
out := metricdata.ResourceMetrics{}
err = test.pipe.produce(context.Background(), &out)
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
produce(context.Context, *metricdata.ResourceMetrics) error
}

// Producer produces metrics for a Reader from an external source.
Expand All @@ -113,15 +113,15 @@ type Producer interface {
// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (metricdata.ResourceMetrics, error)
produce func(context.Context, *metricdata.ResourceMetrics) error
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, ErrReaderShutdown
func (p shutdownProducer) produce(context.Context, *metricdata.ResourceMetrics) error {
return ErrReaderShutdown
}

// TemporalitySelector selects the temporality to use based on the InstrumentKind.
Expand Down
19 changes: 11 additions & 8 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func (ts *readerTestSuite) TestMultipleForceFlush() {

func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
// Differentiate this producer from the second by returning an
// error.
return testResourceMetricsA, assert.AnError
*rm = testResourceMetricsA
return assert.AnError
},
}
p1 := testSDKProducer{}
Expand Down Expand Up @@ -144,8 +145,9 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {

func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
ts.Reader.register(testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, assert.AnError
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
*rm = metricdata.ResourceMetrics{}
return assert.AnError
}})
ts.Reader.RegisterProducer(testExternalProducer{})

Expand Down Expand Up @@ -252,14 +254,15 @@ var testResourceMetricsAB = metricdata.ResourceMetrics{
}

type testSDKProducer struct {
produceFunc func(context.Context) (metricdata.ResourceMetrics, error)
produceFunc func(context.Context, *metricdata.ResourceMetrics) error
}

func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p testSDKProducer) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if p.produceFunc != nil {
return p.produceFunc(ctx)
return p.produceFunc(ctx, rm)
}
return testResourceMetricsA, nil
*rm = testResourceMetricsA
return nil
}

type testExternalProducer struct {
Expand Down