Skip to content

Commit

Permalink
Add opt-in support for metric overflow attribute (#4737)
Browse files Browse the repository at this point in the history
  • Loading branch information
utpilla authored Aug 9, 2023
1 parent e227d0f commit 10a8989
Show file tree
Hide file tree
Showing 12 changed files with 651 additions and 32 deletions.
11 changes: 11 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

## Unreleased

* **Experimental Feature** Added an opt-in feature to aggregate any metric
measurements that were dropped due to reaching the [max MetricPoints
limit](https://github.com/open-telemetry/opentelemetry-dotnet/tree/core-1.6.0-alpha.1/docs/metrics/customizing-the-sdk).
When this feature is enabled, SDK would aggregate such measurements using a
reserved MetricPoint with a single tag with key as `otel.metric.overflow` and
value as `true`. The feature is turned-off by default. You can enable it by
setting the environment variable
`OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE` to `true` before
setting up the `MeterProvider`.
([#4737](https://github.com/open-telemetry/opentelemetry-dotnet/pull/4737))

## 1.6.0-alpha.1

Released 2023-Jul-12
Expand Down
93 changes: 80 additions & 13 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ namespace OpenTelemetry.Metrics;

internal sealed class AggregatorStore
{
private static readonly string MetricPointCapHitFixMessage = "Modify instrumentation to reduce the number of unique key/value pair combinations. Or use Views to drop unwanted tags. Or use MeterProviderBuilder.SetMaxMetricPointsPerMetricStream to set higher limit.";
private static readonly string MetricPointCapHitFixMessage = "Consider opting in for the experimental SDK feature to emit all the throttled metrics under the overflow attribute by setting env variable OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE = true. You could also modify instrumentation to reduce the number of unique key/value pair combinations. Or use Views to drop unwanted tags. Or use MeterProviderBuilder.SetMaxMetricPointsPerMetricStream to set higher limit.";
private static readonly Comparison<KeyValuePair<string, object>> DimensionComparisonDelegate = (x, y) => x.Key.CompareTo(y.Key);

private readonly object lockZeroTags = new();
private readonly object lockOverflowTag = new();
private readonly HashSet<string> tagKeysInteresting;
private readonly int tagsKeysInterestingCount;

Expand All @@ -43,17 +45,21 @@ internal sealed class AggregatorStore
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private readonly int maxMetricPoints;
private readonly bool emitOverflowAttribute;
private readonly ExemplarFilter exemplarFilter;

private int metricPointIndex = 0;
private int batchSize = 0;
private int metricCapHitMessageLogged;
private bool zeroTagMetricPointInitialized;
private bool overflowTagMetricPointInitialized;

internal AggregatorStore(
MetricStreamIdentity metricStreamIdentity,
AggregationType aggType,
AggregationTemporality temporality,
int maxMetricPoints,
bool emitOverflowAttribute,
ExemplarFilter exemplarFilter = null)
{
this.name = metricStreamIdentity.InstrumentName;
Expand Down Expand Up @@ -81,6 +87,15 @@ internal AggregatorStore(
this.tagKeysInteresting = hs;
this.tagsKeysInterestingCount = hs.Count;
}

this.emitOverflowAttribute = emitOverflowAttribute;

if (emitOverflowAttribute)
{
// Setting metricPointIndex to 1 as we would reserve the metricPoints[1] for overflow attribute.
// Newer attributes should be added starting at the index: 2
this.metricPointIndex = 1;
}
}

private delegate void UpdateLongDelegate(long value, ReadOnlySpan<KeyValuePair<string, object>> tags);
Expand Down Expand Up @@ -197,6 +212,22 @@ private void InitializeZeroTagPointIfNotInitialized()
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void InitializeOverflowTagPointIfNotInitialized()
{
if (!this.overflowTagMetricPointInitialized)
{
lock (this.lockOverflowTag)
{
if (!this.overflowTagMetricPointInitialized)
{
this.metricPoints[1] = new MetricPoint(this, this.aggType, new KeyValuePair<string, object>[] { new("otel.metric.overflow", true) }, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale);
this.overflowTagMetricPointInitialized = true;
}
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int LookupAggregatorStore(KeyValuePair<string, object>[] tagKeysAndValues, int length)
{
Expand Down Expand Up @@ -329,12 +360,21 @@ private void UpdateLong(long value, ReadOnlySpan<KeyValuePair<string, object>> t
var index = this.FindMetricAggregatorsDefault(tags);
if (index < 0)
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
if (this.emitOverflowAttribute)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
this.InitializeOverflowTagPointIfNotInitialized();
this.metricPoints[1].Update(value);
return;
}
else
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
}

return;
return;
}
}

// TODO: can special case built-in filters to be bit faster.
Expand All @@ -361,12 +401,21 @@ private void UpdateLongCustomTags(long value, ReadOnlySpan<KeyValuePair<string,
var index = this.FindMetricAggregatorsCustomTag(tags);
if (index < 0)
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
if (this.emitOverflowAttribute)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
this.InitializeOverflowTagPointIfNotInitialized();
this.metricPoints[1].Update(value);
return;
}
else
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
}

return;
return;
}
}

// TODO: can special case built-in filters to be bit faster.
Expand All @@ -393,12 +442,21 @@ private void UpdateDouble(double value, ReadOnlySpan<KeyValuePair<string, object
var index = this.FindMetricAggregatorsDefault(tags);
if (index < 0)
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
if (this.emitOverflowAttribute)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
this.InitializeOverflowTagPointIfNotInitialized();
this.metricPoints[1].Update(value);
return;
}
else
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
}

return;
return;
}
}

// TODO: can special case built-in filters to be bit faster.
Expand All @@ -425,12 +483,21 @@ private void UpdateDoubleCustomTags(double value, ReadOnlySpan<KeyValuePair<stri
var index = this.FindMetricAggregatorsCustomTag(tags);
if (index < 0)
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
if (this.emitOverflowAttribute)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
this.InitializeOverflowTagPointIfNotInitialized();
this.metricPoints[1].Update(value);
return;
}
else
{
if (Interlocked.CompareExchange(ref this.metricCapHitMessageLogged, 1, 0) == 0)
{
OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, this.metricPointCapHitMessage, MetricPointCapHitFixMessage);
}

return;
return;
}
}

// TODO: can special case built-in filters to be bit faster.
Expand Down
8 changes: 7 additions & 1 deletion src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Text;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
Expand All @@ -32,6 +33,8 @@ internal sealed class MeterProviderSdk : MeterProvider
internal int ShutdownCount;
internal bool Disposed;

private const string EmitOverFlowAttributeConfigKey = "OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE";

private readonly List<object> instrumentations = new();
private readonly List<Func<Instrument, MetricStreamConfiguration?>> viewConfigs;
private readonly object collectLock = new();
Expand All @@ -48,6 +51,9 @@ internal MeterProviderSdk(
var state = serviceProvider!.GetRequiredService<MeterProviderBuilderSdk>();
state.RegisterProvider(this);

var config = serviceProvider!.GetRequiredService<IConfiguration>();
_ = config.TryGetBoolValue(EmitOverFlowAttributeConfigKey, out bool isEmitOverflowAttributeKeySet);

this.ServiceProvider = serviceProvider!;

if (ownsServiceProvider)
Expand Down Expand Up @@ -79,7 +85,7 @@ internal MeterProviderSdk(

reader.SetParentProvider(this);
reader.SetMaxMetricStreams(state.MaxMetricStreams);
reader.SetMaxMetricPointsPerMetricStream(state.MaxMetricPointsPerMetricStream);
reader.SetMaxMetricPointsPerMetricStream(state.MaxMetricPointsPerMetricStream, isEmitOverflowAttributeKeySet);
reader.SetExemplarFilter(state.ExemplarFilter);

if (this.reader == null)
Expand Down
3 changes: 2 additions & 1 deletion src/OpenTelemetry/Metrics/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ internal Metric(
MetricStreamIdentity instrumentIdentity,
AggregationTemporality temporality,
int maxMetricPointsPerMetricStream,
bool emitOverflowAttribute,
ExemplarFilter exemplarFilter = null)
{
this.InstrumentIdentity = instrumentIdentity;
Expand Down Expand Up @@ -141,7 +142,7 @@ internal Metric(
throw new NotSupportedException($"Unsupported Instrument Type: {instrumentIdentity.InstrumentType.FullName}");
}

this.aggStore = new AggregatorStore(instrumentIdentity, aggType, temporality, maxMetricPointsPerMetricStream, exemplarFilter);
this.aggStore = new AggregatorStore(instrumentIdentity, aggType, temporality, maxMetricPointsPerMetricStream, emitOverflowAttribute, exemplarFilter);
this.Temporality = temporality;
this.InstrumentDisposed = false;
}
Expand Down
16 changes: 13 additions & 3 deletions src/OpenTelemetry/Metrics/MetricReaderExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract partial class MetricReader
private Metric[] metrics;
private Metric[] metricsCurrentBatch;
private int metricIndex = -1;
private bool emitOverflowAttribute;

private ExemplarFilter exemplarFilter;

Expand Down Expand Up @@ -71,7 +72,7 @@ internal Metric AddMetricWithNoViews(Instrument instrument)
Metric metric = null;
try
{
metric = new Metric(metricStreamIdentity, this.GetAggregationTemporality(metricStreamIdentity.InstrumentType), this.maxMetricPointsPerMetricStream, exemplarFilter: this.exemplarFilter);
metric = new Metric(metricStreamIdentity, this.GetAggregationTemporality(metricStreamIdentity.InstrumentType), this.maxMetricPointsPerMetricStream, this.emitOverflowAttribute, this.exemplarFilter);
}
catch (NotSupportedException nse)
{
Expand Down Expand Up @@ -156,7 +157,7 @@ internal List<Metric> AddMetricsListWithViews(Instrument instrument, List<Metric
}
else
{
Metric metric = new(metricStreamIdentity, this.GetAggregationTemporality(metricStreamIdentity.InstrumentType), this.maxMetricPointsPerMetricStream, this.exemplarFilter);
Metric metric = new(metricStreamIdentity, this.GetAggregationTemporality(metricStreamIdentity.InstrumentType), this.maxMetricPointsPerMetricStream, this.emitOverflowAttribute, this.exemplarFilter);

this.instrumentIdentityToMetric[metricStreamIdentity] = metric;
this.metrics[index] = metric;
Expand Down Expand Up @@ -230,9 +231,18 @@ internal void SetExemplarFilter(ExemplarFilter exemplarFilter)
this.exemplarFilter = exemplarFilter;
}

internal void SetMaxMetricPointsPerMetricStream(int maxMetricPointsPerMetricStream)
internal void SetMaxMetricPointsPerMetricStream(int maxMetricPointsPerMetricStream, bool isEmitOverflowAttributeKeySet)
{
this.maxMetricPointsPerMetricStream = maxMetricPointsPerMetricStream;

if (isEmitOverflowAttributeKeySet)
{
// We need at least two metric points. One is reserved for zero tags and the other one for overflow attribute
if (maxMetricPointsPerMetricStream > 1)
{
this.emitOverflowAttribute = true;
}
}
}

private Batch<Metric> GetMetricsBatch()
Expand Down
20 changes: 20 additions & 0 deletions src/Shared/Options/ConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ public static bool TryGetIntValue(
return true;
}

public static bool TryGetBoolValue(
this IConfiguration configuration,
string key,
out bool value)
{
if (!configuration.TryGetStringValue(key, out var stringValue))
{
value = default;
return false;
}

if (!bool.TryParse(stringValue, out value))
{
LogInvalidEnvironmentVariable?.Invoke(key, stringValue!);
return false;
}

return true;
}

public static bool TryGetValue<T>(
this IConfiguration configuration,
string key,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="AggregatorTest.cs" company="OpenTelemetry Authors">
// <copyright file="AggregatorTestsBase.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,13 +19,27 @@

namespace OpenTelemetry.Metrics.Tests;

public class AggregatorTest
#pragma warning disable SA1402

public abstract class AggregatorTestsBase
{
private static readonly Meter Meter = new("testMeter");
private static readonly Instrument Instrument = Meter.CreateHistogram<long>("testInstrument");
private static readonly ExplicitBucketHistogramConfiguration HistogramConfiguration = new() { Boundaries = Metric.DefaultHistogramBounds };
private static readonly MetricStreamIdentity MetricStreamIdentity = new(Instrument, HistogramConfiguration);
private readonly AggregatorStore aggregatorStore = new(MetricStreamIdentity, AggregationType.HistogramWithBuckets, AggregationTemporality.Cumulative, 1024);

private readonly bool emitOverflowAttribute;
private readonly AggregatorStore aggregatorStore;

protected AggregatorTestsBase(bool emitOverflowAttribute)
{
if (emitOverflowAttribute)
{
this.emitOverflowAttribute = emitOverflowAttribute;
}

this.aggregatorStore = new(MetricStreamIdentity, AggregationType.HistogramWithBuckets, AggregationTemporality.Cumulative, 1024, emitOverflowAttribute);
}

[Fact]
public void HistogramDistributeToAllBucketsDefault()
Expand Down Expand Up @@ -284,6 +298,7 @@ internal void ExponentialHistogramTests(AggregationType aggregationType, Aggrega
aggregationType,
aggregationTemporality,
maxMetricPoints: 1024,
this.emitOverflowAttribute,
exemplarsEnabled ? new AlwaysOnExemplarFilter() : null);

var expectedHistogram = new Base2ExponentialBucketHistogram();
Expand Down Expand Up @@ -391,7 +406,8 @@ internal void ExponentialMaxScaleConfigWorks(int? maxScale)
metricStreamIdentity,
AggregationType.Base2ExponentialHistogram,
AggregationTemporality.Cumulative,
maxMetricPoints: 1024);
maxMetricPoints: 1024,
this.emitOverflowAttribute);

aggregatorStore.Update(10, Array.Empty<KeyValuePair<string, object>>());

Expand Down Expand Up @@ -463,3 +479,19 @@ private class ThreadArguments
public double SumOfDelta;
}
}

public class AggregatorTests : AggregatorTestsBase
{
public AggregatorTests()
: base(false)
{
}
}

public class AggregatorTestsWithOverflowAttribute : AggregatorTestsBase
{
public AggregatorTestsWithOverflowAttribute()
: base(true)
{
}
}
Loading

0 comments on commit 10a8989

Please sign in to comment.