Skip to content

Commit

Permalink
fix: soft failure for invalid instruments (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Oct 30, 2023
1 parent 45529c5 commit 0f91858
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 20 deletions.
53 changes: 33 additions & 20 deletions stats/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/spf13/cast"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/internal/otel"
Expand All @@ -31,6 +32,7 @@ type otelStats struct {
resourceAttrs map[string]struct{}

meter metric.Meter
noopMeter metric.Meter
counters map[string]metric.Int64Counter
countersMu sync.Mutex
gauges map[string]*otelGauge
Expand Down Expand Up @@ -115,6 +117,7 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error
}

s.meter = mp.Meter(defaultMeterName)
s.noopMeter = noop.NewMeterProvider().Meter(defaultMeterName)
if s.otelConfig.enablePrometheusExporter && s.otelConfig.prometheusMetricsPort > 0 {
s.httpServerShutdownComplete = make(chan struct{})
s.httpServer = &http.Server{
Expand Down Expand Up @@ -270,23 +273,23 @@ func (s *otelStats) getMeasurement(name, statType string, tags Tags) Measurement

switch statType {
case CountType:
instr := buildOTelInstrument(s.meter, name, s.counters, &s.countersMu)
instr := buildOTelInstrument(s.meter, s.noopMeter, name, s.counters, &s.countersMu, s.logger)
return &otelCounter{counter: instr, otelMeasurement: om}
case GaugeType:
return s.getGauge(s.meter, name, om.attributes, newTags.String())
return s.getGauge(name, om.attributes, newTags.String())
case TimerType:
instr := buildOTelInstrument(s.meter, name, s.timers, &s.timersMu)
instr := buildOTelInstrument(s.meter, s.noopMeter, name, s.timers, &s.timersMu, s.logger)
return &otelTimer{timer: instr, otelMeasurement: om}
case HistogramType:
instr := buildOTelInstrument(s.meter, name, s.histograms, &s.histogramsMu)
instr := buildOTelInstrument(s.meter, s.noopMeter, name, s.histograms, &s.histogramsMu, s.logger)
return &otelHistogram{histogram: instr, otelMeasurement: om}
default:
panic(fmt.Errorf("unsupported measurement type %s", statType))
}
}

func (s *otelStats) getGauge(
meter metric.Meter, name string, attributes []attribute.KeyValue, tagsKey string,
name string, attributes []attribute.KeyValue, tagsKey string,
) *otelGauge {
var (
ok bool
Expand All @@ -304,31 +307,37 @@ func (s *otelStats) getGauge(
}

if !ok {
g, err := meter.Float64ObservableGauge(name)
if err != nil {
panic(fmt.Errorf("failed to create gauge %s: %w", name, err))
}
og = &otelGauge{otelMeasurement: &otelMeasurement{
genericMeasurement: genericMeasurement{statType: GaugeType},
attributes: attributes,
}}
_, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if value := og.getValue(); value != nil {
o.ObserveFloat64(g, cast.ToFloat64(value), metric.WithAttributes(attributes...))
}
return nil
}, g)

g, err := s.meter.Float64ObservableGauge(name)
if err != nil {
panic(fmt.Errorf("failed to register callback for gauge %s: %w", name, err))
s.logger.Warnf("failed to create gauge %s: %v", name, err)
g, _ = s.noopMeter.Float64ObservableGauge(name)
} else {
_, err = s.meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if value := og.getValue(); value != nil {
o.ObserveFloat64(g, cast.ToFloat64(value), metric.WithAttributes(attributes...))
}
return nil
}, g)
if err != nil {
panic(fmt.Errorf("failed to register callback for gauge %s: %w", name, err))
}
}

s.gauges[mapKey] = og
}

return og
}

func buildOTelInstrument[T any](
meter metric.Meter, name string, m map[string]T, mu *sync.Mutex,
meter, noopMeter metric.Meter,
name string, m map[string]T, mu *sync.Mutex,
l logger.Logger,
) T {
var (
ok bool
Expand All @@ -348,14 +357,18 @@ func buildOTelInstrument[T any](
var value interface{}
switch any(m).(type) {
case map[string]metric.Int64Counter:
value, err = meter.Int64Counter(name)
if value, err = meter.Int64Counter(name); err != nil {
value, _ = noopMeter.Int64Counter(name)
}
case map[string]metric.Float64Histogram:
value, err = meter.Float64Histogram(name)
if value, err = meter.Float64Histogram(name); err != nil {
value, _ = noopMeter.Float64Histogram(name)
}
default:
panic(fmt.Errorf("unknown instrument type %T", instr))
}
if err != nil {
panic(fmt.Errorf("failed to create instrument %T(%s): %w", instr, name, err))
l.Warnf("failed to create instrument %T(%s): %v", instr, name, err)
}
instr = value.(T)
m[name] = instr
Expand Down
55 changes: 55 additions & 0 deletions stats/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelMetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -849,6 +850,53 @@ func TestPrometheusDuplicatedAttributes(t *testing.T) {
), metrics[metricName].Metric[0].Label, "Got %+v", metrics[metricName].Metric[0].Label)
}

func TestInvalidInstrument(t *testing.T) {
newStats := func(t *testing.T, match string) *otelStats {
ctrl := gomock.NewController(t)
l := mock_logger.NewMockLogger(ctrl)
l.EXPECT().Warnf(containsMatcher(match), gomock.Any()).Times(1)

enabled := atomic.Bool{}
enabled.Store(true)

return &otelStats{
config: statsConfig{enabled: &enabled},
meter: sdkmetric.NewMeterProvider().Meter(""),
noopMeter: noop.NewMeterProvider().Meter(""),
logger: l,
}
}

t.Run("counter", func(t *testing.T) {
s := newStats(t, "failed to create instrument")
require.NotPanics(t, func() {
m := s.getMeasurement("_#@!?", CountType, nil)
m.Increment()
})
})
t.Run("gauge", func(t *testing.T) {
s := newStats(t, "failed to create gauge")
require.NotPanics(t, func() {
m := s.getMeasurement("_#@!?", GaugeType, nil)
m.Gauge(123)
})
})
t.Run("timer", func(t *testing.T) {
s := newStats(t, "failed to create instrument")
require.NotPanics(t, func() {
m := s.getMeasurement("_#@!?", TimerType, nil)
m.SendTiming(123 * time.Millisecond)
})
})
t.Run("histogram", func(t *testing.T) {
s := newStats(t, "failed to create instrument")
require.NotPanics(t, func() {
m := s.getMeasurement("_#@!?", HistogramType, nil)
m.Observe(123)
})
})
}

func getDataPoint[T any](ctx context.Context, t *testing.T, rdr sdkmetric.Reader, name string, idx int) (zero T) {
t.Helper()
rm := metricdata.ResourceMetrics{}
Expand Down Expand Up @@ -952,3 +1000,10 @@ func (f loggerSpyFactory) NewLogger() logger.Logger { return f.spy }
func newLoggerSpyFactory(l logger.Logger) loggerFactory {
return &loggerSpyFactory{spy: l}
}

type containsMatcher string

func (m containsMatcher) String() string { return string(m) }
func (m containsMatcher) Matches(arg any) bool {
return strings.Contains(arg.(string), string(m))
}

0 comments on commit 0f91858

Please sign in to comment.