Skip to content

Commit

Permalink
Reuse memory in metric pipelines (#3760)
Browse files Browse the repository at this point in the history
* Have pipelines reuse memory

* truncate Metric slice

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Use rm pool on periodic shutdown.

* zero out RM on ctx error

* Update sdk/metric/pipeline.go

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Peter Liu <lpfvip2008@gmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Fix lint

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Peter Liu <lpfvip2008@gmail.com>
Co-authored-by: Tyler Yahn <codingalias@gmail.com>
  • Loading branch information
4 people authored Mar 9, 2023
1 parent 7dc7b30 commit e463505
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 57 deletions.
24 changes: 24 additions & 0 deletions sdk/metric/internal/reuse_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

// ReuseSlice returns a zeroed view of slice if its capacity is greater than or
// equal to n. Otherwise, it returns a new []T with capacity equal to n.
func ReuseSlice[T any](slice []T, n int) []T {
if cap(slice) >= n {
return slice[:n]
}
return make([]T, n)
}
5 changes: 2 additions & 3 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return err
}
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rmTemp, err := ph.produce(ctx)
*rm = rmTemp

err := ph.produce(ctx, rm)
if err != nil {
return err
}
Expand Down
36 changes: 21 additions & 15 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
New: func() interface{} {
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store([]Producer{})

Expand Down Expand Up @@ -147,6 +151,8 @@ type periodicReader struct {
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once

rmPool sync.Pool
}

// Compile time check the periodicReader implements Reader and is comparable.
Expand Down Expand Up @@ -214,11 +220,12 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
err := r.Collect(ctx, &rm)
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, rm)
err = r.export(ctx, *rm)
}
r.rmPool.Put(rm)
return err
}

Expand All @@ -233,15 +240,13 @@ func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
*rm = rmTemp
return err
return r.collect(ctx, r.sdkProducer.Load(), rm)
}

// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) {
func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -251,12 +256,12 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}

rm, err := ph.produce(ctx)
err := ph.produce(ctx, rm)
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
Expand All @@ -266,7 +271,7 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}

// export exports metric data m using r's exporter.
Expand Down Expand Up @@ -313,11 +318,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {

if ph != nil { // Reader was registered.
// Flush pending telemetry.
var m metricdata.ResourceMetrics
m, err = r.collect(ctx, ph)
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, m)
err = r.export(ctx, *m)
}
r.rmPool.Put(m)
}

sErr := r.exporter.Shutdown(ctx)
Expand Down
45 changes: 25 additions & 20 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
p.Lock()
defer p.Unlock()

Expand All @@ -132,7 +132,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
errs.append(err)
}
if err := ctx.Err(); err != nil {
return metricdata.ResourceMetrics{}, err
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
Expand All @@ -143,36 +145,39 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
return metricdata.ResourceMetrics{}, err
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}

sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
rm.Resource = p.resource
rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))

i := 0
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
j := 0
for _, inst := range instruments {
data := inst.aggregator.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Description: inst.description,
Unit: inst.unit,
Data: data,
})
rm.ScopeMetrics[i].Metrics[j].Name = inst.name
rm.ScopeMetrics[i].Metrics[j].Description = inst.description
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
rm.ScopeMetrics[i].Metrics[j].Data = data
j++
}
}
if len(metrics) > 0 {
sm = append(sm, metricdata.ScopeMetrics{
Scope: scope,
Metrics: metrics,
})
rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
if len(rm.ScopeMetrics[i].Metrics) > 0 {
rm.ScopeMetrics[i].Scope = scope
i++
}
}

return metricdata.ResourceMetrics{
Resource: p.resource,
ScopeMetrics: sm,
}, errs.errorOrNil()
rm.ScopeMetrics = rm.ScopeMetrics[:i]

return errs.errorOrNil()
}

// inserter facilitates inserting of new instruments from a single scope into a
Expand Down
19 changes: 12 additions & 7 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (testSumAggregator) Aggregation() metricdata.Aggregation {
func TestEmptyPipeline(t *testing.T) {
pipe := &pipeline{}

output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Nil(t, output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
Expand All @@ -56,7 +57,7 @@ func TestEmptyPipeline(t *testing.T) {
pipe.addMultiCallback(func(context.Context) error { return nil })
})

output, err = pipe.produce(context.Background())
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Nil(t, output.Resource)
require.Len(t, output.ScopeMetrics, 1)
Expand All @@ -66,7 +67,8 @@ func TestEmptyPipeline(t *testing.T) {
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil)

output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
Expand All @@ -80,7 +82,7 @@ func TestNewPipeline(t *testing.T) {
pipe.addMultiCallback(func(context.Context) error { return nil })
})

output, err = pipe.produce(context.Background())
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
require.Len(t, output.ScopeMetrics, 1)
Expand All @@ -91,22 +93,24 @@ func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)

output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
assert.NoError(t, err)
assert.Equal(t, res, output.Resource)
}

func TestPipelineConcurrency(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
ctx := context.Background()
var output metricdata.ResourceMetrics

var wg sync.WaitGroup
const threads = 2
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = pipe.produce(ctx)
_ = pipe.produce(ctx, &output)
}()

wg.Add(1)
Expand Down Expand Up @@ -167,7 +171,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
a.Aggregate(1, *attribute.EmptySet())
}

out, err := test.pipe.produce(context.Background())
out := metricdata.ResourceMetrics{}
err = test.pipe.produce(context.Background(), &out)
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
produce(context.Context, *metricdata.ResourceMetrics) error
}

// Producer produces metrics for a Reader from an external source.
Expand All @@ -113,15 +113,15 @@ type Producer interface {
// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (metricdata.ResourceMetrics, error)
produce func(context.Context, *metricdata.ResourceMetrics) error
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, ErrReaderShutdown
func (p shutdownProducer) produce(context.Context, *metricdata.ResourceMetrics) error {
return ErrReaderShutdown
}

// TemporalitySelector selects the temporality to use based on the InstrumentKind.
Expand Down
19 changes: 11 additions & 8 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func (ts *readerTestSuite) TestMultipleForceFlush() {

func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
// Differentiate this producer from the second by returning an
// error.
return testResourceMetricsA, assert.AnError
*rm = testResourceMetricsA
return assert.AnError
},
}
p1 := testSDKProducer{}
Expand Down Expand Up @@ -144,8 +145,9 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {

func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
ts.Reader.register(testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, assert.AnError
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
*rm = metricdata.ResourceMetrics{}
return assert.AnError
}})
ts.Reader.RegisterProducer(testExternalProducer{})

Expand Down Expand Up @@ -252,14 +254,15 @@ var testResourceMetricsAB = metricdata.ResourceMetrics{
}

type testSDKProducer struct {
produceFunc func(context.Context) (metricdata.ResourceMetrics, error)
produceFunc func(context.Context, *metricdata.ResourceMetrics) error
}

func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p testSDKProducer) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if p.produceFunc != nil {
return p.produceFunc(ctx)
return p.produceFunc(ctx, rm)
}
return testResourceMetricsA, nil
*rm = testResourceMetricsA
return nil
}

type testExternalProducer struct {
Expand Down

0 comments on commit e463505

Please sign in to comment.