Skip to content

Commit

Permalink
Improve multi-threaded test for Counter (open-telemetry#1858)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
3 people authored Jun 4, 2024
1 parent f488006 commit f1cdaca
Showing 1 changed file with 95 additions and 32 deletions.
127 changes: 95 additions & 32 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ mod tests {
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// Run all tests in this mod
// cargo test metrics::tests --features=testing
Expand Down Expand Up @@ -1047,45 +1048,54 @@ mod tests {
let mut test_context = TestContext::new(temporality);
let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));

let mut update_threads = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);

update_threads.push(thread::spawn(move || {
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
}));
}

for thread in update_threads {
thread.join().unwrap();
for i in 0..10 {
thread::scope(|s| {
s.spawn(|| {
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);

// Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
if i % 2 == 0 {
test_context.flush_metrics();
thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
}

counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
});
});
}

test_context.flush_metrics();

// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
// We invoke `test_context.flush_metrics()` six times.
let sums =
test_context.get_from_multiple_aggregations::<data::Sum<u64>>("my_counter", None, 6);

let values = sums
.iter()
.map(|sum| {
assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series.
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);

// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");

data_point.value
})
.collect::<Vec<_>>();

let total_sum: u64 = if temporality == Temporality::Delta {
values.iter().sum()
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}
*values.last().unwrap()
};

// find and validate key1=value2 datapoint
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5.
assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5.
}

fn histogram_aggregation_helper(temporality: Temporality) {
Expand Down Expand Up @@ -1553,5 +1563,58 @@ mod tests {
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type")
}

fn get_from_multiple_aggregations<T: data::Aggregation>(
&mut self,
counter_name: &str,
unit_name: Option<&str>,
invocation_count: usize,
) -> Vec<&T> {
self.resource_metrics = self
.exporter
.get_finished_metrics()
.expect("metrics expected to be exported");

assert!(
!self.resource_metrics.is_empty(),
"no metrics were exported"
);

assert_eq!(
self.resource_metrics.len(),
invocation_count,
"Expected collect to be called {} times",
invocation_count
);

let result = self
.resource_metrics
.iter()
.map(|resource_metric| {
assert!(
!resource_metric.scope_metrics.is_empty(),
"An export with no scope metrics occurred"
);

assert!(!resource_metric.scope_metrics[0].metrics.is_empty());

let metric = &resource_metric.scope_metrics[0].metrics[0];
assert_eq!(metric.name, counter_name);

if let Some(expected_unit) = unit_name {
assert_eq!(metric.unit, expected_unit);
}

let aggregation = metric
.data
.as_any()
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type");
aggregation
})
.collect::<Vec<_>>();

result
}
}
}

0 comments on commit f1cdaca

Please sign in to comment.