diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fc7aa395a8..862048c31f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Fix `WithEndpointURL` to always use a secure connection when an https URL is passed in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5944) - Fix `WithEndpointURL` to always use a secure connection when an https URL is passed in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#5944) - Fix `WithEndpointURL` to always use a secure connection when an https URL is passed in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5944) +- Fix incorrect metrics generated from callbacks when multiple readers are used in `go.opentelemetry.io/otel/sdk/metric`. (#5900) ### Changed diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 8c3ae7caabf..823cdf2c62f 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6 continue } inst.appendMeasures(in) + + // Add the measures to the pipeline. It is required to maintain + // measures per pipeline to avoid calling the measure that + // is not part of the pipeline. + insert.pipeline.addInt64Measure(inst.observableID, in) for _, cback := range callbacks { inst := int64Observer{measures: in} fn := cback @@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl continue } inst.appendMeasures(in) + + // Add the measures to the pipeline. It is required to maintain + // measures per pipeline to avoid calling the measure that + // is not part of the pipeline. + insert.pipeline.addFloat64Measure(inst.observableID, in) for _, cback := range callbacks { inst := float64Observer{measures: in} fn := cback @@ -441,8 +451,8 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) return noopRegister{}, nil } - reg := newObserver() var err error + validInstruments := make([]metric.Observable, 0, len(insts)) for _, inst := range insts { switch o := inst.(type) { case int64Observable: @@ -452,7 +462,8 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } continue } - reg.registerInt64(o.observableID) + + validInstruments = append(validInstruments, inst) case float64Observable: if e := o.registerable(m); e != nil { if !errors.Is(e, errEmptyAgg) { @@ -460,41 +471,55 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } continue } - reg.registerFloat64(o.observableID) + + validInstruments = append(validInstruments, inst) default: // Instrument external to the SDK. return nil, fmt.Errorf("invalid observable: from different implementation") } } - if reg.len() == 0 { + if len(validInstruments) == 0 { // All insts use drop aggregation or are invalid. return noopRegister{}, err } - // Some or all instruments were valid. - cback := func(ctx context.Context) error { return f(ctx, reg) } - return m.pipes.registerMultiCallback(cback), err + unregs := make([]func(), len(m.pipes)) + for ix, pipe := range m.pipes { + reg := newObserver(pipe) + for _, inst := range validInstruments { + switch o := inst.(type) { + case int64Observable: + reg.registerInt64(o.observableID) + case float64Observable: + reg.registerFloat64(o.observableID) + } + } + + // Some or all instruments were valid. + cBack := func(ctx context.Context) error { return f(ctx, reg) } + unregs[ix] = pipe.addMultiCallback(cBack) + } + + return unregisterFuncs{f: unregs}, err } type observer struct { embedded.Observer + pipe *pipeline float64 map[observableID[float64]]struct{} int64 map[observableID[int64]]struct{} } -func newObserver() observer { +func newObserver(p *pipeline) observer { return observer{ + pipe: p, float64: make(map[observableID[float64]]struct{}), int64: make(map[observableID[int64]]struct{}), } } -func (r observer) len() int { - return len(r.float64) + len(r.int64) -} - func (r observer) registerFloat64(id observableID[float64]) { r.float64[id] = struct{}{} } @@ -530,7 +555,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ... return } c := metric.NewObserveConfig(opts) - oImpl.observe(v, c.Attributes()) + // Access to r.pipe.float64Measure is already guarded by a lock in pipeline.produce. + // TODO (#5946): Refactor pipeline and observable measures. + measures := r.pipe.float64Measures[oImpl.observableID] + for _, m := range measures { + m(context.Background(), v, c.Attributes()) + } } func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) { @@ -555,7 +585,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric return } c := metric.NewObserveConfig(opts) - oImpl.observe(v, c.Attributes()) + // Access to r.pipe.int64Measures is already guarded b a lock in pipeline.produce. + // TODO (#5946): Refactor pipeline and observable measures. + measures := r.pipe.int64Measures[oImpl.observableID] + for _, m := range measures { + m(context.Background(), v, c.Attributes()) + } } type noopRegister struct{ embedded.Registration } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 7e85bd2dc98..c504984df80 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -12,7 +12,6 @@ import ( "sync/atomic" "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/exemplar" @@ -43,10 +42,12 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFi res = resource.Empty() } return &pipeline{ - resource: res, - reader: reader, - views: views, - exemplarFilter: exemplarFilter, + resource: res, + reader: reader, + views: views, + int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{}, + float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{}, + exemplarFilter: exemplarFilter, // aggregations is lazy allocated when needed. } } @@ -64,10 +65,26 @@ type pipeline struct { views []View sync.Mutex - aggregations map[instrumentation.Scope][]instrumentSync - callbacks []func(context.Context) error - multiCallbacks list.List - exemplarFilter exemplar.Filter + int64Measures map[observableID[int64]][]aggregate.Measure[int64] + float64Measures map[observableID[float64]][]aggregate.Measure[float64] + aggregations map[instrumentation.Scope][]instrumentSync + callbacks []func(context.Context) error + multiCallbacks list.List + exemplarFilter exemplar.Filter +} + +// addInt64Measure adds a new int64 measure to the pipeline for each observer. +func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) { + p.Lock() + defer p.Unlock() + p.int64Measures[id] = m +} + +// addFloat64Measure adds a new float64 measure to the pipeline for each observer. +func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) { + p.Lock() + defer p.Unlock() + p.float64Measures[id] = m } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -574,14 +591,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View, exempl return pipes } -func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration { - unregs := make([]func(), len(p)) - for i, pipe := range p { - unregs[i] = pipe.addMultiCallback(c) - } - return unregisterFuncs{f: unregs} -} - type unregisterFuncs struct { embedded.Registration f []func() diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 75ccf30c5d7..6e822b1d74f 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -11,6 +11,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "github.com/go-logr/logr" @@ -24,6 +25,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/exemplar" + "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" @@ -101,6 +103,21 @@ func TestPipelineConcurrentSafe(t *testing.T) { defer wg.Done() pipe.addMultiCallback(func(context.Context) error { return nil }) }() + + wg.Add(1) + go func() { + defer wg.Done() + b := aggregate.Builder[int64]{ + Temporality: metricdata.CumulativeTemporality, + ReservoirFunc: nil, + AggregationLimit: 0, + } + var oID observableID[int64] + m, _ := b.PrecomputedSum(false) + measures := []aggregate.Measure[int64]{} + measures = append(measures, m) + pipe.addInt64Measure(oID, measures) + }() } wg.Wait() } @@ -518,3 +535,81 @@ func TestExemplars(t *testing.T) { check(t, r, 2, 2, 2) }) } + +func TestAddingAndObservingMeasureConcurrentSafe(t *testing.T) { + r1 := NewManualReader() + r2 := NewManualReader() + + mp := NewMeterProvider(WithReader(r1), WithReader(r2)) + m := mp.Meter("test") + + oc1, err := m.Int64ObservableCounter("int64-observable-counter") + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _, err := m.Int64ObservableCounter("int64-observable-counter-2") + require.NoError(t, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _, err := m.RegisterCallback( + func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(oc1, 2) + return nil + }, oc1) + require.NoError(t, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = mp.pipes[0].produce(context.Background(), &metricdata.ResourceMetrics{}) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _ = mp.pipes[1].produce(context.Background(), &metricdata.ResourceMetrics{}) + }() + + wg.Wait() +} + +func TestPipelineWithMultipleReaders(t *testing.T) { + r1 := NewManualReader() + r2 := NewManualReader() + mp := NewMeterProvider(WithReader(r1), WithReader(r2)) + m := mp.Meter("test") + var val atomic.Int64 + oc, err := m.Int64ObservableCounter("int64-observable-counter") + require.NoError(t, err) + reg, err := m.RegisterCallback( + // SDK calls this function when collecting data. + func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(oc, val.Load()) + return nil + }, oc) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, reg.Unregister()) }) + ctx := context.Background() + rm := new(metricdata.ResourceMetrics) + val.Add(1) + err = r1.Collect(ctx, rm) + require.NoError(t, err) + if assert.Len(t, rm.ScopeMetrics, 1) && + assert.Len(t, rm.ScopeMetrics[0].Metrics, 1) { + assert.Equal(t, int64(1), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value) + } + val.Add(1) + err = r2.Collect(ctx, rm) + require.NoError(t, err) + if assert.Len(t, rm.ScopeMetrics, 1) && + assert.Len(t, rm.ScopeMetrics[0].Metrics, 1) { + assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value) + } +}