From 6d390354c890f3da29a1ac89478ea4934aa0c1c2 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 10 Jan 2022 19:26:46 -0800 Subject: [PATCH 1/8] AggregatorStore optimizations for sorting tag keys --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 54 +++++++++++++------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index a4b3df38aef..322726d57b2 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -26,13 +26,17 @@ namespace OpenTelemetry.Metrics internal sealed class AggregatorStore { private static readonly ObjectArrayEqualityComparer ObjectArrayComparer = new ObjectArrayEqualityComparer(); + private static readonly StringArrayEqualityComparer StringArrayComparer = new StringArrayEqualityComparer(); private readonly object lockZeroTags = new object(); private readonly HashSet tagKeysInteresting; private readonly int tagsKeysInterestingCount; + private readonly ConcurrentDictionary tagKeyCombinations = + new ConcurrentDictionary(StringArrayComparer); + // Two-Level lookup. TagKeys x [ TagValues x Metrics ] private readonly ConcurrentDictionary> keyValue2MetricAggs = - new ConcurrentDictionary>(new StringArrayEqualityComparer()); + new ConcurrentDictionary>(StringArrayComparer); private readonly AggregationTemporality temporality; private readonly string name; @@ -180,19 +184,41 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng { int aggregatorIndex; string[] seqKey = null; + string[] sortedTagKeys = null; + ConcurrentDictionary value2metrics = null; - // GetOrAdd by TagKeys at 1st Level of 2-level dictionary structure. - // Get back a Dictionary of [ Values x Metrics[] ]. - if (!this.keyValue2MetricAggs.TryGetValue(tagKeys, out var value2metrics)) + if (length > 1) { - // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. - seqKey = new string[length]; - tagKeys.CopyTo(seqKey, 0); + if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) + { + // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. + seqKey = new string[length]; + tagKeys.CopyTo(seqKey, 0); + + // Create a new array for the sorted Tag keys. + sortedTagKeys = new string[length]; + tagKeys.CopyTo(sortedTagKeys, 0); + + Array.Sort(sortedTagKeys, tagValues); + this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); + } + } + else + { + // Create a new array for the sorted Tag keys. + sortedTagKeys = new string[length]; + tagKeys.CopyTo(sortedTagKeys, 0); + } + + // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. + // Get back a Dictionary of [ Values x Metrics[] ]. + if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) + { value2metrics = new ConcurrentDictionary(ObjectArrayComparer); - if (!this.keyValue2MetricAggs.TryAdd(seqKey, value2metrics)) + if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) { - this.keyValue2MetricAggs.TryGetValue(seqKey, out value2metrics); + this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); } } @@ -355,11 +381,6 @@ private int FindMetricAggregatorsDefault(ReadOnlySpan 1) - { - Array.Sort(tagKeys, tagValues); - } - return this.LookupAggregatorStore(tagKeys, tagValues, tagLength); } @@ -388,11 +409,6 @@ private int FindMetricAggregatorsCustomTag(ReadOnlySpan 1) - { - Array.Sort(tagKeys, tagValues); - } - return this.LookupAggregatorStore(tagKeys, tagValues, actualLength); } } From 98396058cc26a83499b91241531bc527509dd911 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 10 Jan 2022 20:41:21 -0800 Subject: [PATCH 2/8] Code fix when tagKeysLength is less than one --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 47 +++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index 322726d57b2..0093a6239c8 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -185,10 +185,11 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng int aggregatorIndex; string[] seqKey = null; string[] sortedTagKeys = null; - ConcurrentDictionary value2metrics = null; + ConcurrentDictionary value2metrics; if (length > 1) { + // We only need to sort if there is more than one Tag Key. if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) { // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. @@ -203,22 +204,33 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); } + + // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. + // Get back a Dictionary of [ Values x Metrics[] ]. + if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) + { + value2metrics = new ConcurrentDictionary(ObjectArrayComparer); + if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) + { + this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); + } + } } else { - // Create a new array for the sorted Tag keys. - sortedTagKeys = new string[length]; - tagKeys.CopyTo(sortedTagKeys, 0); - } - - // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. - // Get back a Dictionary of [ Values x Metrics[] ]. - if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) - { - value2metrics = new ConcurrentDictionary(ObjectArrayComparer); - if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) + // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. + // Get back a Dictionary of [ Values x Metrics[] ]. + if (!this.keyValue2MetricAggs.TryGetValue(tagKeys, out value2metrics)) { - this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); + // Create a new array for the sorted Tag keys. + sortedTagKeys = new string[length]; + tagKeys.CopyTo(sortedTagKeys, 0); + + value2metrics = new ConcurrentDictionary(ObjectArrayComparer); + if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) + { + this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); + } } } @@ -251,19 +263,12 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng return -1; } - // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. - if (seqKey == null) - { - seqKey = new string[length]; - tagKeys.CopyTo(seqKey, 0); - } - var seqVal = new object[length]; tagValues.CopyTo(seqVal, 0); ref var metricPoint = ref this.metricPoints[aggregatorIndex]; var dt = DateTimeOffset.UtcNow; - metricPoint = new MetricPoint(this.aggType, dt, seqKey, seqVal, this.histogramBounds); + metricPoint = new MetricPoint(this.aggType, dt, sortedTagKeys, seqVal, this.histogramBounds); // Add to dictionary *after* initializing MetricPoint // as other threads can start writing to the From 18f660774993ba0889b810d44dc0288acfbb64a1 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 10 Jan 2022 21:30:16 -0800 Subject: [PATCH 3/8] Code changes to fix the issue when tag keys length <= 1 --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 49 +++++++------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index 0093a6239c8..d53a0fdf92f 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -187,50 +187,37 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng string[] sortedTagKeys = null; ConcurrentDictionary value2metrics; - if (length > 1) + // We only need to sort if there is more than one Tag Key. + if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) { - // We only need to sort if there is more than one Tag Key. - if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) - { - // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. - seqKey = new string[length]; - tagKeys.CopyTo(seqKey, 0); + // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. + seqKey = new string[length]; + tagKeys.CopyTo(seqKey, 0); + if (length > 1) + { // Create a new array for the sorted Tag keys. sortedTagKeys = new string[length]; tagKeys.CopyTo(sortedTagKeys, 0); Array.Sort(sortedTagKeys, tagValues); - - this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); } - - // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. - // Get back a Dictionary of [ Values x Metrics[] ]. - if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) + else { - value2metrics = new ConcurrentDictionary(ObjectArrayComparer); - if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) - { - this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); - } + sortedTagKeys = seqKey; } + + this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); } - else + + // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. + // Get back a Dictionary of [ Values x Metrics[] ]. + if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) { - // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. - // Get back a Dictionary of [ Values x Metrics[] ]. - if (!this.keyValue2MetricAggs.TryGetValue(tagKeys, out value2metrics)) + value2metrics = new ConcurrentDictionary(ObjectArrayComparer); + if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) { - // Create a new array for the sorted Tag keys. - sortedTagKeys = new string[length]; - tagKeys.CopyTo(sortedTagKeys, 0); - - value2metrics = new ConcurrentDictionary(ObjectArrayComparer); - if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) - { - this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); - } + this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); } } From 1d72e8e5c8aa4568ab57e73294108f0c98a271ff Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Tue, 11 Jan 2022 14:34:35 -0800 Subject: [PATCH 4/8] Add unit test to ensure that dimensions can be provided in any order --- .../Metrics/MetricAPITest.cs | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index d89b748d582..b2a15437e70 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -382,6 +382,78 @@ public void ObservableCounterAggregationTest(bool exportDelta) } } + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void DimensionsAreOrderInsensitive(bool exportDelta, bool hasView) + { + var exportedItems = new List(); + + using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{exportDelta}.{hasView}"); + var counterLong = meter.CreateCounter("Counter"); + var meterProviderBuilder = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddReader(new BaseExportingMetricReader(new InMemoryExporter(exportedItems)) + { + Temporality = exportDelta ? AggregationTemporality.Delta : AggregationTemporality.Cumulative, + }); + + if (hasView) + { + meterProviderBuilder.AddView(instrumentName: "Counter", new MetricStreamConfiguration() { TagKeys = new string[] { "Key1", "Key2" } }); + } + + using var meterProvider = meterProviderBuilder.Build(); + + counterLong.Add(10, new("Key1", "Value1"), new("Key2", "Value2"), new("Key3", "Value3")); + counterLong.Add(10, new("Key1", "Value1"), new("Key3", "Value3"), new("Key2", "Value2")); + meterProvider.ForceFlush(MaxTimeToAllowForFlush); + long sumReceived = GetLongSum(exportedItems); + Assert.Equal(20, sumReceived); + + exportedItems.Clear(); + counterLong.Add(10, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); + counterLong.Add(10, new("Key2", "Value2"), new("Key3", "Value3"), new("Key1", "Value1")); + meterProvider.ForceFlush(MaxTimeToAllowForFlush); + sumReceived = GetLongSum(exportedItems); + if (exportDelta) + { + Assert.Equal(20, sumReceived); + } + else + { + Assert.Equal(40, sumReceived); + } + + exportedItems.Clear(); + meterProvider.ForceFlush(MaxTimeToAllowForFlush); + sumReceived = GetLongSum(exportedItems); + if (exportDelta) + { + Assert.Equal(0, sumReceived); + } + else + { + Assert.Equal(40, sumReceived); + } + + exportedItems.Clear(); + counterLong.Add(40, new("Key3", "Value3"), new("Key1", "Value1"), new("Key2", "Value2")); + counterLong.Add(20, new("Key3", "Value3"), new("Key2", "Value2"), new("Key1", "Value1")); + meterProvider.ForceFlush(MaxTimeToAllowForFlush); + sumReceived = GetLongSum(exportedItems); + if (exportDelta) + { + Assert.Equal(60, sumReceived); + } + else + { + Assert.Equal(100, sumReceived); + } + } + [Theory] [InlineData(AggregationTemporality.Cumulative)] [InlineData(AggregationTemporality.Delta)] From 4cd8314350267651afe61f30bf0ff927aecab548 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Wed, 12 Jan 2022 10:05:20 -0800 Subject: [PATCH 5/8] Add a dictionary for mapping sorted tag values --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 50 +++++++++++--- .../Metrics/MetricAPITest.cs | 66 ++++++++++++++++++- 2 files changed, 104 insertions(+), 12 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index d53a0fdf92f..d79a21def46 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -34,6 +34,9 @@ internal sealed class AggregatorStore private readonly ConcurrentDictionary tagKeyCombinations = new ConcurrentDictionary(StringArrayComparer); + private readonly ConcurrentDictionary tagValueCombinations = + new ConcurrentDictionary(ObjectArrayComparer); + // Two-Level lookup. TagKeys x [ TagValues x Metrics ] private readonly ConcurrentDictionary> keyValue2MetricAggs = new ConcurrentDictionary>(StringArrayComparer); @@ -183,30 +186,39 @@ private void InitializeZeroTagPointIfNotInitialized() private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int length) { int aggregatorIndex; - string[] seqKey = null; string[] sortedTagKeys = null; + object[] sortedTagValues = null; ConcurrentDictionary value2metrics; // We only need to sort if there is more than one Tag Key. if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) { // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. - seqKey = new string[length]; + var seqKey = new string[length]; tagKeys.CopyTo(seqKey, 0); + var seqValue = new object[length]; + tagValues.CopyTo(seqValue, 0); + if (length > 1) { // Create a new array for the sorted Tag keys. sortedTagKeys = new string[length]; tagKeys.CopyTo(sortedTagKeys, 0); - Array.Sort(sortedTagKeys, tagValues); + // Create a new array for the sorted Tag values. + sortedTagValues = new object[length]; + tagValues.CopyTo(sortedTagValues, 0); + + Array.Sort(sortedTagKeys, sortedTagValues); } else { sortedTagKeys = seqKey; + sortedTagValues = seqValue; } + this.tagValueCombinations.TryAdd(seqValue, sortedTagValues); this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); } @@ -221,9 +233,30 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng } } + if (!this.tagValueCombinations.TryGetValue(tagValues, out sortedTagValues)) + { + var seqValue = new object[length]; + tagValues.CopyTo(seqValue, 0); + + if (length > 1) + { + // Create a new array for the sorted Tag values. + sortedTagValues = new object[length]; + tagValues.CopyTo(sortedTagValues, 0); + + Array.Sort(tagKeys, sortedTagValues); + } + else + { + sortedTagValues = seqValue; + } + + this.tagValueCombinations.TryAdd(seqValue, sortedTagValues); + } + // GetOrAdd by TagValues at 2st Level of 2-level dictionary structure. // Get back Metrics[]. - if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex)) + if (!value2metrics.TryGetValue(sortedTagValues, out aggregatorIndex)) { aggregatorIndex = this.metricPointIndex; if (aggregatorIndex >= this.maxMetricPoints) @@ -238,7 +271,7 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng lock (value2metrics) { // check again after acquiring lock. - if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex)) + if (!value2metrics.TryGetValue(sortedTagValues, out aggregatorIndex)) { aggregatorIndex = Interlocked.Increment(ref this.metricPointIndex); if (aggregatorIndex >= this.maxMetricPoints) @@ -250,17 +283,14 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng return -1; } - var seqVal = new object[length]; - tagValues.CopyTo(seqVal, 0); - ref var metricPoint = ref this.metricPoints[aggregatorIndex]; var dt = DateTimeOffset.UtcNow; - metricPoint = new MetricPoint(this.aggType, dt, sortedTagKeys, seqVal, this.histogramBounds); + metricPoint = new MetricPoint(this.aggType, dt, sortedTagKeys, sortedTagValues, this.histogramBounds); // Add to dictionary *after* initializing MetricPoint // as other threads can start writing to the // MetricPoint, if dictionary entry found. - value2metrics.TryAdd(seqVal, aggregatorIndex); + value2metrics.TryAdd(sortedTagValues, aggregatorIndex); } } } diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index b2a15437e70..9b8f1868203 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -410,13 +410,39 @@ public void DimensionsAreOrderInsensitive(bool exportDelta, bool hasView) counterLong.Add(10, new("Key1", "Value1"), new("Key2", "Value2"), new("Key3", "Value3")); counterLong.Add(10, new("Key1", "Value1"), new("Key3", "Value3"), new("Key2", "Value2")); meterProvider.ForceFlush(MaxTimeToAllowForFlush); + + List> tags; + if (hasView) + { + tags = new List>() + { + new("Key1", "Value1"), + new("Key2", "Value2"), + }; + } + else + { + tags = new List>() + { + new("Key1", "Value1"), + new("Key2", "Value2"), + new("Key3", "Value3"), + }; + } + + Assert.True(OnlyOneMetricPointExists(exportedItems)); + CheckTagsForFirstMetricPoint(exportedItems, tags); long sumReceived = GetLongSum(exportedItems); Assert.Equal(20, sumReceived); exportedItems.Clear(); - counterLong.Add(10, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); + counterLong.Add(5, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); + counterLong.Add(5, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); counterLong.Add(10, new("Key2", "Value2"), new("Key3", "Value3"), new("Key1", "Value1")); meterProvider.ForceFlush(MaxTimeToAllowForFlush); + + Assert.True(OnlyOneMetricPointExists(exportedItems)); + CheckTagsForFirstMetricPoint(exportedItems, tags); sumReceived = GetLongSum(exportedItems); if (exportDelta) { @@ -436,6 +462,8 @@ public void DimensionsAreOrderInsensitive(bool exportDelta, bool hasView) } else { + Assert.True(OnlyOneMetricPointExists(exportedItems)); + CheckTagsForFirstMetricPoint(exportedItems, tags); Assert.Equal(40, sumReceived); } @@ -443,6 +471,9 @@ public void DimensionsAreOrderInsensitive(bool exportDelta, bool hasView) counterLong.Add(40, new("Key3", "Value3"), new("Key1", "Value1"), new("Key2", "Value2")); counterLong.Add(20, new("Key3", "Value3"), new("Key2", "Value2"), new("Key1", "Value1")); meterProvider.ForceFlush(MaxTimeToAllowForFlush); + + Assert.True(OnlyOneMetricPointExists(exportedItems)); + CheckTagsForFirstMetricPoint(exportedItems, tags); sumReceived = GetLongSum(exportedItems); if (exportDelta) { @@ -730,8 +761,39 @@ private static double GetDoubleSum(List metrics) return sum; } + private static bool OnlyOneMetricPointExists(List metrics) + { + int count = 0; + foreach (var metric in metrics) + { + foreach (ref readonly var metricPoint in metric.GetMetricPoints()) + { + count++; + } + } + + return count == 1; + } + + // Provide tags input sorted by Key + private static void CheckTagsForFirstMetricPoint(List metrics, List> tags) + { + var metric = metrics[0]; + var metricPointEnumerator = metric.GetMetricPoints().GetEnumerator(); + Assert.True(metricPointEnumerator.MoveNext()); + + int index = 0; + var metricPoint = metricPointEnumerator.Current; + foreach (var tag in metricPoint.Tags) + { + Assert.Equal(tags[index].Key, tag.Key); + Assert.Equal(tags[index].Value, tag.Value); + index++; + } + } + private static void CounterUpdateThread(object obj) - where T : struct, IComparable + where T : struct, IComparable { if (obj is not UpdateThreadArguments arguments) { From 956d8361b52a008574c82690b45a3faf6f47ee15 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Thu, 13 Jan 2022 22:24:34 -0800 Subject: [PATCH 6/8] Fix issue with different tag keys using the same values --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 55 +++---- src/OpenTelemetry/Metrics/Tags.cs | 107 ++++++++++++++ .../Metrics/MetricAPITest.cs | 134 ------------------ 3 files changed, 124 insertions(+), 172 deletions(-) create mode 100644 src/OpenTelemetry/Metrics/Tags.cs diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index d79a21def46..64f9b9e4aec 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -31,11 +31,8 @@ internal sealed class AggregatorStore private readonly HashSet tagKeysInteresting; private readonly int tagsKeysInterestingCount; - private readonly ConcurrentDictionary tagKeyCombinations = - new ConcurrentDictionary(StringArrayComparer); - - private readonly ConcurrentDictionary tagValueCombinations = - new ConcurrentDictionary(ObjectArrayComparer); + private readonly ConcurrentDictionary sortedTagsDictionary = + new ConcurrentDictionary(); // Two-Level lookup. TagKeys x [ TagValues x Metrics ] private readonly ConcurrentDictionary> keyValue2MetricAggs = @@ -186,12 +183,12 @@ private void InitializeZeroTagPointIfNotInitialized() private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int length) { int aggregatorIndex; - string[] sortedTagKeys = null; - object[] sortedTagValues = null; ConcurrentDictionary value2metrics; + var givenTags = new Tags(tagKeys, tagValues); + // We only need to sort if there is more than one Tag Key. - if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) + if (!this.sortedTagsDictionary.TryGetValue(givenTags, out var sortedTags)) { // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. var seqKey = new string[length]; @@ -200,6 +197,9 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng var seqValue = new object[length]; tagValues.CopyTo(seqValue, 0); + string[] sortedTagKeys; + object[] sortedTagValues; + if (length > 1) { // Create a new array for the sorted Tag keys. @@ -218,45 +218,24 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng sortedTagValues = seqValue; } - this.tagValueCombinations.TryAdd(seqValue, sortedTagValues); - this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); + sortedTags = new Tags(sortedTagKeys, sortedTagValues); + this.sortedTagsDictionary.TryAdd(givenTags, sortedTags); } // GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. // Get back a Dictionary of [ Values x Metrics[] ]. - if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) + if (!this.keyValue2MetricAggs.TryGetValue(sortedTags.Keys, out value2metrics)) { value2metrics = new ConcurrentDictionary(ObjectArrayComparer); - if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) + if (!this.keyValue2MetricAggs.TryAdd(sortedTags.Keys, value2metrics)) { - this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); + this.keyValue2MetricAggs.TryGetValue(sortedTags.Keys, out value2metrics); } } - if (!this.tagValueCombinations.TryGetValue(tagValues, out sortedTagValues)) - { - var seqValue = new object[length]; - tagValues.CopyTo(seqValue, 0); - - if (length > 1) - { - // Create a new array for the sorted Tag values. - sortedTagValues = new object[length]; - tagValues.CopyTo(sortedTagValues, 0); - - Array.Sort(tagKeys, sortedTagValues); - } - else - { - sortedTagValues = seqValue; - } - - this.tagValueCombinations.TryAdd(seqValue, sortedTagValues); - } - // GetOrAdd by TagValues at 2st Level of 2-level dictionary structure. // Get back Metrics[]. - if (!value2metrics.TryGetValue(sortedTagValues, out aggregatorIndex)) + if (!value2metrics.TryGetValue(sortedTags.Values, out aggregatorIndex)) { aggregatorIndex = this.metricPointIndex; if (aggregatorIndex >= this.maxMetricPoints) @@ -271,7 +250,7 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng lock (value2metrics) { // check again after acquiring lock. - if (!value2metrics.TryGetValue(sortedTagValues, out aggregatorIndex)) + if (!value2metrics.TryGetValue(sortedTags.Values, out aggregatorIndex)) { aggregatorIndex = Interlocked.Increment(ref this.metricPointIndex); if (aggregatorIndex >= this.maxMetricPoints) @@ -285,12 +264,12 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng ref var metricPoint = ref this.metricPoints[aggregatorIndex]; var dt = DateTimeOffset.UtcNow; - metricPoint = new MetricPoint(this.aggType, dt, sortedTagKeys, sortedTagValues, this.histogramBounds); + metricPoint = new MetricPoint(this.aggType, dt, sortedTags.Keys, sortedTags.Values, this.histogramBounds); // Add to dictionary *after* initializing MetricPoint // as other threads can start writing to the // MetricPoint, if dictionary entry found. - value2metrics.TryAdd(sortedTagValues, aggregatorIndex); + value2metrics.TryAdd(sortedTags.Values, aggregatorIndex); } } } diff --git a/src/OpenTelemetry/Metrics/Tags.cs b/src/OpenTelemetry/Metrics/Tags.cs new file mode 100644 index 00000000000..f560d7d657e --- /dev/null +++ b/src/OpenTelemetry/Metrics/Tags.cs @@ -0,0 +1,107 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; + +namespace OpenTelemetry.Metrics +{ + internal readonly struct Tags : IEquatable + { + public Tags(string[] keys, object[] values) + { + this.Keys = keys; + this.Values = values; + } + + public readonly string[] Keys { get; } + + public readonly object[] Values { get; } + + public static bool operator ==(Tags tag1, Tags tag2) => tag1.Equals(tag2); + + public static bool operator !=(Tags tag1, Tags tag2) => !tag1.Equals(tag2); + + public readonly override bool Equals(object obj) + { + if (!(obj is Tags)) + { + return false; + } + + var other = (Tags)obj; + return this.Equals(other); + } + + public readonly bool Equals(Tags other) + { + // Equality check for Keys + // Check if the two string[] are equal + var keysLength = this.Keys.Length; + + if (keysLength != other.Keys.Length) + { + return false; + } + + for (int i = 0; i < keysLength; i++) + { + if (!this.Keys[i].Equals(other.Keys[i], StringComparison.Ordinal)) + { + return false; + } + } + + // Equality check for Values + // Check if the two object[] are equal + var valuesLength = this.Values.Length; + + if (valuesLength != other.Values.Length) + { + return false; + } + + for (int i = 0; i < valuesLength; i++) + { + if (!this.Values[i].Equals(other.Values[i])) + { + return false; + } + } + + return true; + } + + public readonly override int GetHashCode() + { + int hash = 17; + + unchecked + { + for (int i = 0; i < this.Keys.Length; i++) + { + hash = (hash * 31) + this.Keys[i].GetHashCode(); + } + + for (int i = 0; i < this.Values.Length; i++) + { + hash = (hash * 31) + this.Values[i].GetHashCode(); + } + } + + return hash; + } + } +} diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index ee0e35d97fe..d4d1e12d1ef 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -382,109 +382,6 @@ public void ObservableCounterAggregationTest(bool exportDelta) } } - [Theory] - [InlineData(false, false)] - [InlineData(false, true)] - [InlineData(true, false)] - [InlineData(true, true)] - public void DimensionsAreOrderInsensitive(bool exportDelta, bool hasView) - { - var exportedItems = new List(); - - using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{exportDelta}.{hasView}"); - var counterLong = meter.CreateCounter("Counter"); - var meterProviderBuilder = Sdk.CreateMeterProviderBuilder() - .AddMeter(meter.Name) - .AddReader(new BaseExportingMetricReader(new InMemoryExporter(exportedItems)) - { - Temporality = exportDelta ? AggregationTemporality.Delta : AggregationTemporality.Cumulative, - }); - - if (hasView) - { - meterProviderBuilder.AddView(instrumentName: "Counter", new MetricStreamConfiguration() { TagKeys = new string[] { "Key1", "Key2" } }); - } - - using var meterProvider = meterProviderBuilder.Build(); - - counterLong.Add(10, new("Key1", "Value1"), new("Key2", "Value2"), new("Key3", "Value3")); - counterLong.Add(10, new("Key1", "Value1"), new("Key3", "Value3"), new("Key2", "Value2")); - meterProvider.ForceFlush(MaxTimeToAllowForFlush); - - List> tags; - if (hasView) - { - tags = new List>() - { - new("Key1", "Value1"), - new("Key2", "Value2"), - }; - } - else - { - tags = new List>() - { - new("Key1", "Value1"), - new("Key2", "Value2"), - new("Key3", "Value3"), - }; - } - - Assert.True(OnlyOneMetricPointExists(exportedItems)); - CheckTagsForFirstMetricPoint(exportedItems, tags); - long sumReceived = GetLongSum(exportedItems); - Assert.Equal(20, sumReceived); - - exportedItems.Clear(); - counterLong.Add(5, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); - counterLong.Add(5, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); - counterLong.Add(10, new("Key2", "Value2"), new("Key3", "Value3"), new("Key1", "Value1")); - meterProvider.ForceFlush(MaxTimeToAllowForFlush); - - Assert.True(OnlyOneMetricPointExists(exportedItems)); - CheckTagsForFirstMetricPoint(exportedItems, tags); - sumReceived = GetLongSum(exportedItems); - if (exportDelta) - { - Assert.Equal(20, sumReceived); - } - else - { - Assert.Equal(40, sumReceived); - } - - exportedItems.Clear(); - meterProvider.ForceFlush(MaxTimeToAllowForFlush); - sumReceived = GetLongSum(exportedItems); - if (exportDelta) - { - Assert.Equal(0, sumReceived); - } - else - { - Assert.True(OnlyOneMetricPointExists(exportedItems)); - CheckTagsForFirstMetricPoint(exportedItems, tags); - Assert.Equal(40, sumReceived); - } - - exportedItems.Clear(); - counterLong.Add(40, new("Key3", "Value3"), new("Key1", "Value1"), new("Key2", "Value2")); - counterLong.Add(20, new("Key3", "Value3"), new("Key2", "Value2"), new("Key1", "Value1")); - meterProvider.ForceFlush(MaxTimeToAllowForFlush); - - Assert.True(OnlyOneMetricPointExists(exportedItems)); - CheckTagsForFirstMetricPoint(exportedItems, tags); - sumReceived = GetLongSum(exportedItems); - if (exportDelta) - { - Assert.Equal(60, sumReceived); - } - else - { - Assert.Equal(100, sumReceived); - } - } - [Theory] [InlineData(AggregationTemporality.Cumulative)] [InlineData(AggregationTemporality.Delta)] @@ -761,37 +658,6 @@ private static double GetDoubleSum(List metrics) return sum; } - private static bool OnlyOneMetricPointExists(List metrics) - { - int count = 0; - foreach (var metric in metrics) - { - foreach (ref readonly var metricPoint in metric.GetMetricPoints()) - { - count++; - } - } - - return count == 1; - } - - // Provide tags input sorted by Key - private static void CheckTagsForFirstMetricPoint(List metrics, List> tags) - { - var metric = metrics[0]; - var metricPointEnumerator = metric.GetMetricPoints().GetEnumerator(); - Assert.True(metricPointEnumerator.MoveNext()); - - int index = 0; - var metricPoint = metricPointEnumerator.Current; - foreach (var tag in metricPoint.Tags) - { - Assert.Equal(tags[index].Key, tag.Key); - Assert.Equal(tags[index].Value, tag.Value); - index++; - } - } - private static void CounterUpdateThread(object obj) where T : struct, IComparable { From 46602644a1ead784097670a4a1c5510bec110aab Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Thu, 13 Jan 2022 22:25:47 -0800 Subject: [PATCH 7/8] Remove changes to MetricAPITest --- test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index d4d1e12d1ef..6656c5b267d 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -659,7 +659,7 @@ private static double GetDoubleSum(List metrics) } private static void CounterUpdateThread(object obj) - where T : struct, IComparable + where T : struct, IComparable { if (obj is not UpdateThreadArguments arguments) { From b62c0f9638b0ccfe061d7961ea2e86f32b22ba6a Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Fri, 21 Jan 2022 17:37:06 -0800 Subject: [PATCH 8/8] Fix issue with resuing tags --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index 64f9b9e4aec..5e2a9bcc0b3 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -191,11 +191,13 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng if (!this.sortedTagsDictionary.TryGetValue(givenTags, out var sortedTags)) { // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. - var seqKey = new string[length]; - tagKeys.CopyTo(seqKey, 0); + var givenKeys = new string[length]; + tagKeys.CopyTo(givenKeys, 0); - var seqValue = new object[length]; - tagValues.CopyTo(seqValue, 0); + var givenValues = new object[length]; + tagValues.CopyTo(givenValues, 0); + + givenTags = new Tags(givenKeys, givenValues); string[] sortedTagKeys; object[] sortedTagValues; @@ -214,8 +216,8 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng } else { - sortedTagKeys = seqKey; - sortedTagValues = seqValue; + sortedTagKeys = givenKeys; + sortedTagValues = givenValues; } sortedTags = new Tags(sortedTagKeys, sortedTagValues);