Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric AggregatorStore optimizations for sorting tags #2805

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
* Fail-fast when using AddView with guaranteed conflict.
([2751](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2751))

* Performance Improvement: When emitting metrics, users are strongly advised to
utpilla marked this conversation as resolved.
Show resolved Hide resolved
provide tags with same Key order, to achieve maximum performance.
([2805](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2805/files))

## 1.2.0-rc1

Released 2021-Nov-29
Expand Down
158 changes: 99 additions & 59 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ namespace OpenTelemetry.Metrics
{
internal sealed class AggregatorStore
{
private static readonly ObjectArrayEqualityComparer ObjectArrayComparer = new ObjectArrayEqualityComparer();
private readonly object lockZeroTags = new object();
private readonly HashSet<string> tagKeysInteresting;
private readonly int tagsKeysInterestingCount;

// Two-Level lookup. TagKeys x [ TagValues x Metrics ]
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs =
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(new StringArrayEqualityComparer());
private readonly ConcurrentDictionary<Tags, int> tagsToMetricPointIndexDictionary =
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
new ConcurrentDictionary<Tags, int>();

private readonly AggregationTemporality temporality;
private readonly string name;
Expand Down Expand Up @@ -178,44 +176,37 @@ private void InitializeZeroTagPointIfNotInitialized()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int length)
{
int aggregatorIndex;
string[] seqKey = null;
var givenTags = new Tags(tagKeys, tagValues);

// GetOrAdd by TagKeys at 1st Level of 2-level dictionary structure.
// Get back a Dictionary of [ Values x Metrics[] ].
if (!this.keyValue2MetricAggs.TryGetValue(tagKeys, out var value2metrics))
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(givenTags, out var aggregatorIndex))
{
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
seqKey = new string[length];
tagKeys.CopyTo(seqKey, 0);

value2metrics = new ConcurrentDictionary<object[], int>(ObjectArrayComparer);
if (!this.keyValue2MetricAggs.TryAdd(seqKey, value2metrics))
if (length > 1)
{
this.keyValue2MetricAggs.TryGetValue(seqKey, out value2metrics);
}
}
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
// Create a new array for the sorted Tag keys.
var sortedTagKeys = new string[length];
utpilla marked this conversation as resolved.
Show resolved Hide resolved
tagKeys.CopyTo(sortedTagKeys, 0);

// GetOrAdd by TagValues at 2st Level of 2-level dictionary structure.
// Get back Metrics[].
if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex))
{
aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}
// Create a new array for the sorted Tag values.
var sortedTagValues = new object[length];
tagValues.CopyTo(sortedTagValues, 0);

lock (value2metrics)
{
// check again after acquiring lock.
if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex))
Array.Sort(sortedTagKeys, sortedTagValues);

var sortedTags = new Tags(sortedTagKeys, sortedTagValues);

if (!this.tagsToMetricPointIndexDictionary.TryGetValue(sortedTags, out aggregatorIndex))
{
aggregatorIndex = Interlocked.Increment(ref this.metricPointIndex);
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
var givenKeys = new string[length];
tagKeys.CopyTo(givenKeys, 0);

var givenValues = new object[length];
tagValues.CopyTo(givenValues, 0);

givenTags = new Tags(givenKeys, givenValues);

aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
Expand All @@ -225,24 +216,83 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng
return -1;
}

// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
if (seqKey == null)
lock (this.tagsToMetricPointIndexDictionary)
utpilla marked this conversation as resolved.
Show resolved Hide resolved
{
seqKey = new string[length];
tagKeys.CopyTo(seqKey, 0);
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(sortedTags, out aggregatorIndex))
{
aggregatorIndex = ++this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}

ref var metricPoint = ref this.metricPoints[aggregatorIndex];
var dt = DateTimeOffset.UtcNow;
metricPoint = new MetricPoint(this.aggType, dt, sortedTags.Keys, sortedTags.Values, this.histogramBounds);

// Add to dictionary *after* initializing MetricPoint
// as other threads can start writing to the
// MetricPoint, if dictionary entry found.

// Add the sorted order along with the given order of tags
this.tagsToMetricPointIndexDictionary.TryAdd(sortedTags, aggregatorIndex);
this.tagsToMetricPointIndexDictionary.TryAdd(givenTags, aggregatorIndex);
}
}
}
}
else
{
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
var givenKeys = new string[length];
var givenValues = new object[length];

var seqVal = new object[length];
tagValues.CopyTo(seqVal, 0);
tagKeys.CopyTo(givenKeys, 0);
tagValues.CopyTo(givenValues, 0);

ref var metricPoint = ref this.metricPoints[aggregatorIndex];
var dt = DateTimeOffset.UtcNow;
metricPoint = new MetricPoint(this.aggType, dt, seqKey, seqVal, this.histogramBounds);
givenTags = new Tags(givenKeys, givenValues);

// Add to dictionary *after* initializing MetricPoint
// as other threads can start writing to the
// MetricPoint, if dictionary entry found.
value2metrics.TryAdd(seqVal, aggregatorIndex);
aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}

lock (this.tagsToMetricPointIndexDictionary)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(givenTags, out aggregatorIndex))
{
aggregatorIndex = ++this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}

ref var metricPoint = ref this.metricPoints[aggregatorIndex];
var dt = DateTimeOffset.UtcNow;
metricPoint = new MetricPoint(this.aggType, dt, givenTags.Keys, givenTags.Values, this.histogramBounds);

// Add to dictionary *after* initializing MetricPoint
// as other threads can start writing to the
// MetricPoint, if dictionary entry found.

// givenTags will always be sorted when tags length == 1
this.tagsToMetricPointIndexDictionary.TryAdd(givenTags, aggregatorIndex);
}
}
}
}
Expand Down Expand Up @@ -355,11 +405,6 @@ private int FindMetricAggregatorsDefault(ReadOnlySpan<KeyValuePair<string, objec

storage.SplitToKeysAndValues(tags, tagLength, out var tagKeys, out var tagValues);

if (tagLength > 1)
{
Array.Sort(tagKeys, tagValues);
}

return this.LookupAggregatorStore(tagKeys, tagValues, tagLength);
}

Expand Down Expand Up @@ -388,11 +433,6 @@ private int FindMetricAggregatorsCustomTag(ReadOnlySpan<KeyValuePair<string, obj
return 0;
}

if (actualLength > 1)
{
Array.Sort(tagKeys, tagValues);
}

return this.LookupAggregatorStore(tagKeys, tagValues, actualLength);
}
}
Expand Down
68 changes: 0 additions & 68 deletions src/OpenTelemetry/Metrics/ObjectArrayEqualityComparer.cs

This file was deleted.

69 changes: 0 additions & 69 deletions src/OpenTelemetry/Metrics/StringArrayEqualityComparer.cs

This file was deleted.

Loading