diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 839743058a..1d136fe62e 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -70,8 +70,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. fn on_end(&self, span: SpanData); + /// Force the spans lying in the cache to be exported. + fn force_flush(&self); /// Shuts down the processor. Called when SDK is shut down. This is an - /// opportunity for processor to do any cleanup required. + /// opportunity for processors to do any cleanup required. fn shutdown(&mut self); } @@ -123,6 +125,10 @@ impl SpanProcessor for SimpleSpanProcessor { } } + fn force_flush(&self) { + // Ignored since all spans in Simple Processor will be exported as they ended. + } + fn shutdown(&mut self) { if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); @@ -196,6 +202,12 @@ impl SpanProcessor for BatchSpanProcessor { } } + fn force_flush(&self) { + if let Ok(mut sender) = self.message_sender.lock() { + let _ = sender.try_send(BatchMessage::Flush); + } + } + fn shutdown(&mut self) { if let Ok(mut sender) = self.message_sender.lock() { // Send shutdown message to worker future @@ -212,7 +224,7 @@ impl SpanProcessor for BatchSpanProcessor { #[derive(Debug)] enum BatchMessage { ExportSpan(SpanData), - Tick, + Flush, Shutdown, } @@ -230,7 +242,7 @@ impl BatchSpanProcessor { IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); - let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick); + let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush); // Spawn worker process via user-defined spawn function. let worker_handle = spawn(Box::pin(async move { @@ -245,8 +257,8 @@ impl BatchSpanProcessor { spans.push(span); } } - // Span batch interval time reached, export current spans. - BatchMessage::Tick => { + // Span batch interval time reached or a force flush has been invoked, export current spans. + BatchMessage::Flush => { while !spans.is_empty() { let batch = spans.split_off( spans.len().saturating_sub(config.max_export_batch_size), @@ -450,8 +462,12 @@ mod tests { OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; use crate::exporter::trace::stdout; - use crate::testing::trace::{new_test_export_span_data, new_test_exporter}; + use crate::sdk::trace::BatchConfig; + use crate::testing::trace::{ + new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, + }; use std::time; + use tokio::time::Duration; #[test] fn simple_span_processor_on_end_calls_export() { @@ -500,4 +516,35 @@ mod tests { assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); } + + #[tokio::test] + async fn test_batch_span_processor() { + let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); + let mut config = BatchConfig::default(); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let processor = BatchSpanProcessor::new( + Box::new(exporter), + tokio::spawn, + tokio::time::interval, + config, + ); + let handle = tokio::spawn(async move { + loop { + if let Some(span) = export_receiver.recv().await { + assert_eq!(span.span_context, new_test_export_span_data().span_context); + break; + } + } + }); + tokio::time::delay_for(Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + + assert!( + tokio::time::timeout(Duration::from_secs(5), handle) + .await + .is_ok(), + "timed out in 5 seconds. force_flush may not export any data when called" + ); + } } diff --git a/opentelemetry/src/testing/trace.rs b/opentelemetry/src/testing/trace.rs index 874d0ce76d..da86e7f065 100644 --- a/opentelemetry/src/testing/trace.rs +++ b/opentelemetry/src/testing/trace.rs @@ -1,3 +1,4 @@ +use crate::exporter::trace::SpanData; use crate::{ exporter::trace::{self as exporter, ExportResult, SpanExporter}, sdk::{ @@ -82,3 +83,37 @@ pub fn new_test_exporter() -> (TestSpanExporter, Receiver, R }; (exporter, rx_export, rx_shutdown) } + +#[derive(Debug)] +pub struct TokioSpanExporter { + tx_export: tokio::sync::mpsc::UnboundedSender, + tx_shutdown: tokio::sync::mpsc::UnboundedSender<()>, +} + +#[async_trait] +impl SpanExporter for TokioSpanExporter { + async fn export(&mut self, batch: Vec) -> ExportResult { + for span_data in batch { + self.tx_export.send(span_data)?; + } + Ok(()) + } + + fn shutdown(&mut self) { + self.tx_shutdown.send(()).unwrap(); + } +} + +pub fn new_tokio_test_exporter() -> ( + TokioSpanExporter, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedReceiver<()>, +) { + let (tx_export, rx_export) = tokio::sync::mpsc::unbounded_channel(); + let (tx_shutdown, rx_shutdown) = tokio::sync::mpsc::unbounded_channel(); + let exporter = TokioSpanExporter { + tx_export, + tx_shutdown, + }; + (exporter, rx_export, rx_shutdown) +}