Skip to content

Commit

Permalink
Metrics Aggregation - Improve throughput by 10x (#1833)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
  • Loading branch information
cijothomas and TommyCpp authored May 29, 2024
1 parent 6ee5579 commit 0f6de5a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
- Removed `XrayIdGenerator`, which was marked deprecated since 0.21.3. Use
[`opentelemetry-aws`](https://crates.io/crates/opentelemetry-aws), version
0.10.0 or newer.
- Performance Improvement - Counter/UpDownCounter instruments internally use
`RwLock` instead of `Mutex` to reduce contention.

- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking
Expand Down
64 changes: 37 additions & 27 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::vec;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Mutex,
collections::HashMap,
sync::{Mutex, RwLock},
time::SystemTime,
};

Expand All @@ -18,7 +18,7 @@ use super::{

/// The storage for sums.
struct ValueMap<T: Number<T>> {
values: Mutex<HashMap<AttributeSet, T>>,
values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
}
Expand All @@ -32,7 +32,7 @@ impl<T: Number<T>> Default for ValueMap<T> {
impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
ValueMap {
values: Mutex::new(HashMap::new()),
values: RwLock::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
}
Expand All @@ -45,21 +45,31 @@ impl<T: Number<T>> ValueMap<T> {
self.no_attribute_value.add(measurement);
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(mut values) = self.values.lock() {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let sum = occupied_entry.get_mut();
*sum += measurement;
}
Entry::Vacant(vacant_entry) => {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else if let Some(val) = values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) {
*val += measurement;
} else if let Ok(values) = self.values.read() {
if let Some(value_to_update) = values.get(&attrs) {
value_to_update.add(measurement);
return;
} else {
drop(values);
if let Ok(mut values) = self.values.write() {
// Recheck after acquiring write lock, in case another
// thread has added the value.
if let Some(value_to_update) = values.get(&attrs) {
value_to_update.add(measurement);
return;
} else if is_under_cardinality_limit(values.len()) {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
values.insert(attrs, new_value);
} else if let Some(overflow_value) =
values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
{
overflow_value.add(measurement);
return;
} else {
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), measurement);
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
Expand Down Expand Up @@ -114,7 +124,7 @@ impl<T: Number<T>> Sum<T> {
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let mut values = match self.value_map.values.lock() {
let mut values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
Expand Down Expand Up @@ -149,7 +159,7 @@ impl<T: Number<T>> Sum<T> {
.collect(),
start_time: Some(prev_start),
time: Some(t),
value,
value: value.get_value(),
exemplars: vec![],
});
}
Expand Down Expand Up @@ -186,7 +196,7 @@ impl<T: Number<T>> Sum<T> {
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let values = match self.value_map.values.lock() {
let values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
Expand Down Expand Up @@ -226,7 +236,7 @@ impl<T: Number<T>> Sum<T> {
.collect(),
start_time: Some(prev_start),
time: Some(t),
value: *value,
value: value.get_value(),
exemplars: vec![],
});
}
Expand Down Expand Up @@ -282,7 +292,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic;

let mut values = match self.value_map.values.lock() {
let mut values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
Expand Down Expand Up @@ -315,9 +325,9 @@ impl<T: Number<T>> PrecomputedSum<T> {

let default = T::default();
for (attrs, value) in values.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs
Expand Down Expand Up @@ -367,7 +377,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

let values = match self.value_map.values.lock() {
let values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
Expand Down Expand Up @@ -400,9 +410,9 @@ impl<T: Number<T>> PrecomputedSum<T> {

let default = T::default();
for (attrs, value) in values.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs
Expand Down
2 changes: 1 addition & 1 deletion stress/src/metrics_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
RAM: 64.0 GB
3M /sec
35 M /sec
*/

use lazy_static::lazy_static;
Expand Down

0 comments on commit 0f6de5a

Please sign in to comment.