Skip to content

Commit

Permalink
sdk/metric: Fix observable not registered error when the asynchronous…
Browse files Browse the repository at this point in the history
… instrument has a drop aggregation (#4772)

* Fix observable instrument not registered on drop aggregation

* Add TestObservableDropAggregation

* Add testcase for dropping unregistered observable

* Update CHANGELOG

* Add observable name const + suggestions

* Add suggestions

* Only error if the instrument is not dropped

* Decrease indentation

* Revert "Decrease indentation"

This reverts commit 9e7e772.

---------

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
scorpionknifes and hanyuancheung authored Jan 11, 2024
1 parent 01472db commit 7fa7d1b
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Fix `Parse` in `go.opentelemetry.io/otel/baggage` to validate member value before percent-decoding. (#4755)
- Fix whitespace encoding of `Member.String` in `go.opentelemetry.io/otel/baggage`. (#4756)
- Fix observable not registered error when the asynchronous instrument has a drop aggregation in `go.opentelemetry.io/otel/sdk/metric`. (#4772)
- Fix baggage item key so that it is not canonicalized in `go.opentelemetry.io/otel/bridge/opentracing`. (#4776)
- Fix `go.opentelemetry.io/otel/bridge/opentracing` to properly handle baggage values that requires escaping during propagation. (#4804)

Expand Down
5 changes: 3 additions & 2 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

meter *meter
measures measures[N]
meter *meter
measures measures[N]
dropAggregation bool
}

func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {
Expand Down
31 changes: 19 additions & 12 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"

"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

Expand Down Expand Up @@ -117,6 +118,7 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
Expand Down Expand Up @@ -233,6 +235,7 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
Expand Down Expand Up @@ -437,12 +440,14 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
}

if _, registered := r.float64[oImpl.observablID]; !registered {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", float64(0)),
)
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", float64(0)),
)
}
return
}
c := metric.NewObserveConfig(opts)
Expand Down Expand Up @@ -470,12 +475,14 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
}

if _, registered := r.int64[oImpl.observablID]; !registered {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", int64(0)),
)
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", int64(0)),
)
}
return
}
c := metric.NewObserveConfig(opts)
Expand Down
208 changes: 208 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package metric

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"testing"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2064,3 +2066,209 @@ func TestHistogramBucketPrecedenceOrdering(t *testing.T) {
})
}
}

func TestObservableDropAggregation(t *testing.T) {
const (
intPrefix = "observable.int64."
intCntName = "observable.int64.counter"
intUDCntName = "observable.int64.up.down.counter"
intGaugeName = "observable.int64.gauge"
floatPrefix = "observable.float64."
floatCntName = "observable.float64.counter"
floatUDCntName = "observable.float64.up.down.counter"
floatGaugeName = "observable.float64.gauge"
unregPrefix = "unregistered.observable."
unregIntCntName = "unregistered.observable.int64.counter"
unregFloatCntName = "unregistered.observable.float64.counter"
)

type log struct {
name string
number string
}

testcases := []struct {
name string
views []View
wantObservables []string
wantUnregLogs []log
}{
{
name: "default",
views: nil,
wantObservables: []string{
intCntName, intUDCntName, intGaugeName,
floatCntName, floatUDCntName, floatGaugeName,
},
wantUnregLogs: []log{
{
name: unregIntCntName,
number: "int64",
},
{
name: unregFloatCntName,
number: "float64",
},
},
},
{
name: "drop all metrics",
views: []View{
func(i Instrument) (Stream, bool) {
return Stream{Aggregation: AggregationDrop{}}, true
},
},
wantObservables: nil,
wantUnregLogs: nil,
},
{
name: "drop float64 observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, floatPrefix) {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
wantObservables: []string{
intCntName, intUDCntName, intGaugeName,
},
wantUnregLogs: []log{
{
name: unregIntCntName,
number: "int64",
},
{
name: unregFloatCntName,
number: "float64",
},
},
},
{
name: "drop int64 observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, intPrefix) {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
wantObservables: []string{
floatCntName, floatUDCntName, floatGaugeName,
},
wantUnregLogs: []log{
{
name: unregIntCntName,
number: "int64",
},
{
name: unregFloatCntName,
number: "float64",
},
},
},
{
name: "drop unregistered observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, unregPrefix) {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
wantObservables: []string{
intCntName, intUDCntName, intGaugeName,
floatCntName, floatUDCntName, floatGaugeName,
},
wantUnregLogs: nil,
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var unregLogs []log
otel.SetLogger(
funcr.NewJSON(
func(obj string) {
var entry map[string]interface{}
_ = json.Unmarshal([]byte(obj), &entry)

// All unregistered observables should log `errUnregObserver` error.
// A observable with drop aggregation is also unregistered,
// however this is expected and should not log an error.
assert.Equal(t, errUnregObserver.Error(), entry["error"])

unregLogs = append(unregLogs, log{
name: fmt.Sprintf("%v", entry["name"]),
number: fmt.Sprintf("%v", entry["number"]),
})
},
funcr.Options{Verbosity: 0},
),
)
defer otel.SetLogger(logr.Discard())

reader := NewManualReader()
meter := NewMeterProvider(WithView(tt.views...), WithReader(reader)).Meter("TestObservableDropAggregation")

intCnt, err := meter.Int64ObservableCounter(intCntName)
require.NoError(t, err)
intUDCnt, err := meter.Int64ObservableUpDownCounter(intUDCntName)
require.NoError(t, err)
intGaugeCnt, err := meter.Int64ObservableGauge(intGaugeName)
require.NoError(t, err)

floatCnt, err := meter.Float64ObservableCounter(floatCntName)
require.NoError(t, err)
floatUDCnt, err := meter.Float64ObservableUpDownCounter(floatUDCntName)
require.NoError(t, err)
floatGaugeCnt, err := meter.Float64ObservableGauge(floatGaugeName)
require.NoError(t, err)

unregIntCnt, err := meter.Int64ObservableCounter(unregIntCntName)
require.NoError(t, err)
unregFloatCnt, err := meter.Float64ObservableCounter(unregFloatCntName)
require.NoError(t, err)

_, err = meter.RegisterCallback(
func(ctx context.Context, obs metric.Observer) error {
obs.ObserveInt64(intCnt, 1)
obs.ObserveInt64(intUDCnt, 1)
obs.ObserveInt64(intGaugeCnt, 1)
obs.ObserveFloat64(floatCnt, 1)
obs.ObserveFloat64(floatUDCnt, 1)
obs.ObserveFloat64(floatGaugeCnt, 1)
// We deliberately call observe to unregistered observables
obs.ObserveInt64(unregIntCnt, 1)
obs.ObserveFloat64(unregFloatCnt, 1)

return nil
},
intCnt, intUDCnt, intGaugeCnt,
floatCnt, floatUDCnt, floatGaugeCnt,
// We deliberately do not register `unregIntCnt` and `unregFloatCnt`
// to test that `errUnregObserver` is logged when observed by callback.
)
require.NoError(t, err)

var rm metricdata.ResourceMetrics
err = reader.Collect(context.Background(), &rm)
require.NoError(t, err)

if len(tt.wantObservables) == 0 {
require.Len(t, rm.ScopeMetrics, 0)
return
}

require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, len(tt.wantObservables))

for i, m := range rm.ScopeMetrics[0].Metrics {
assert.Equal(t, tt.wantObservables[i], m.Name)
}
assert.Equal(t, tt.wantUnregLogs, unregLogs)
})
}
}

0 comments on commit 7fa7d1b

Please sign in to comment.