diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 1cf4e4c6cc..2c6b709fef 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -151,6 +151,7 @@ mod tests { use std::borrow::Cow; use std::sync::{Arc, Mutex}; use std::thread; + use std::time::Duration; // Run all tests in this mod // cargo test metrics::tests --features=testing @@ -1047,45 +1048,54 @@ mod tests { let mut test_context = TestContext::new(temporality); let counter = Arc::new(test_context.u64_counter("test", "my_counter", None)); - let mut update_threads = vec![]; - for _ in 0..10 { - let counter = Arc::clone(&counter); - - update_threads.push(thread::spawn(move || { - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - })); - } - - for thread in update_threads { - thread.join().unwrap(); + for i in 0..10 { + thread::scope(|s| { + s.spawn(|| { + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + + // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time. + if i % 2 == 0 { + test_context.flush_metrics(); + thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing + } + + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + }); + }); } test_context.flush_metrics(); // Assert - let sum = test_context.get_aggregation::>("my_counter", None); - // Expecting 2 time-series. - assert_eq!(sum.data_points.len(), 1); - assert!(sum.is_monotonic, "Counter should produce monotonic."); - assert_eq!(sum.temporality, temporality); - if let Temporality::Cumulative = temporality { - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); + // We invoke `test_context.flush_metrics()` six times. + let sums = + test_context.get_from_multiple_aggregations::>("my_counter", None, 6); + + let values = sums + .iter() + .map(|sum| { + assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series. + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); + + // find and validate key1=value1 datapoint + let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + + data_point.value + }) + .collect::>(); + + let total_sum: u64 = if temporality == Temporality::Delta { + values.iter().sum() } else { - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - } + *values.last().unwrap() + }; - // find and validate key1=value2 datapoint - let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5. + assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5. } fn histogram_aggregation_helper(temporality: Temporality) { @@ -1553,5 +1563,58 @@ mod tests { .downcast_ref::() .expect("Failed to cast aggregation to expected type") } + + fn get_from_multiple_aggregations( + &mut self, + counter_name: &str, + unit_name: Option<&str>, + invocation_count: usize, + ) -> Vec<&T> { + self.resource_metrics = self + .exporter + .get_finished_metrics() + .expect("metrics expected to be exported"); + + assert!( + !self.resource_metrics.is_empty(), + "no metrics were exported" + ); + + assert_eq!( + self.resource_metrics.len(), + invocation_count, + "Expected collect to be called {} times", + invocation_count + ); + + let result = self + .resource_metrics + .iter() + .map(|resource_metric| { + assert!( + !resource_metric.scope_metrics.is_empty(), + "An export with no scope metrics occurred" + ); + + assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); + + let metric = &resource_metric.scope_metrics[0].metrics[0]; + assert_eq!(metric.name, counter_name); + + if let Some(expected_unit) = unit_name { + assert_eq!(metric.unit, expected_unit); + } + + let aggregation = metric + .data + .as_any() + .downcast_ref::() + .expect("Failed to cast aggregation to expected type"); + aggregation + }) + .collect::>(); + + result + } } }