Skip to content

Commit

Permalink
fix(sources): emit ComponentEventsDropped when source send is cance…
Browse files Browse the repository at this point in the history
…lled (#18859)

* fix(sources): emit ComponentEventsDropped when source send is cancelled

* feedback

* fix docs
  • Loading branch information
dsmith3197 authored Oct 18, 2023
1 parent 811b7f7 commit a0e2769
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 9 deletions.
161 changes: 159 additions & 2 deletions src/source_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{collections::HashMap, fmt};
use chrono::Utc;
use futures::{Stream, StreamExt};
use metrics::{register_histogram, Histogram};
use tracing::Span;
use vector_buffers::topology::channel::{self, LimitedReceiver, LimitedSender};
use vector_common::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
#[cfg(test)]
use vector_core::event::{into_event_stream, EventStatus};
use vector_core::{
Expand Down Expand Up @@ -206,6 +208,9 @@ impl SourceSender {
recv
}

/// Send an event to the default output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
self.inner
.as_mut()
Expand All @@ -214,6 +219,9 @@ impl SourceSender {
.await
}

/// Send a stream of events to the default output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
where
S: Stream<Item = E> + Unpin,
Expand All @@ -226,10 +234,14 @@ impl SourceSender {
.await
}

/// Send a batch of events to the default output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
self.inner
.as_mut()
Expand All @@ -238,10 +250,14 @@ impl SourceSender {
.await
}

/// Send a batch of events event to a named output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
self.named_inners
.get_mut(name)
Expand All @@ -251,6 +267,47 @@ impl SourceSender {
}
}

/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to
/// increment the appropriate counters when a future is not polled to completion. Particularly,
/// this is known to happen in a Warp server when a client sends a new HTTP request on a TCP
/// connection that already has a pending request.
///
/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped]
/// event is emitted.
struct UnsentEventCount {
count: usize,
span: Span,
}

impl UnsentEventCount {
fn new(count: usize) -> Self {
Self {
count,
span: Span::current(),
}
}

fn decr(&mut self, count: usize) {
self.count = self.count.saturating_sub(count);
}

fn discard(&mut self) {
self.count = 0;
}
}

impl Drop for UnsentEventCount {
fn drop(&mut self) {
if self.count > 0 {
let _enter = self.span.enter();
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: self.count,
reason: "Source send cancelled."
});
}
}
}

#[derive(Clone)]
struct Inner {
inner: LimitedSender<EventArray>,
Expand Down Expand Up @@ -322,7 +379,15 @@ impl Inner {
}

async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
self.send(event.into()).await
let event: EventArray = event.into();
// It's possible that the caller stops polling this future while it is blocked waiting
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
// `ComponentEventsDropped` events.
let count = event.len();
let mut unsent_event_count = UnsentEventCount::new(count);
let res = self.send(event).await;
unsent_event_count.discard();
res
}

async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
Expand All @@ -341,10 +406,22 @@ impl Inner {
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
// It's possible that the caller stops polling this future while it is blocked waiting
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
// `ComponentEventsDropped` events.
let events = events.into_iter().map(Into::into);
let mut unsent_event_count = UnsentEventCount::new(events.len());
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
self.send(events).await?;
let count = events.len();
self.send(events).await.map_err(|err| {
// The unsent event count is discarded here because the caller emits the
// `StreamClosedError`.
unsent_event_count.discard();
err
})?;
unsent_event_count.decr(count);
}
Ok(())
}
Expand Down Expand Up @@ -394,6 +471,7 @@ fn get_timestamp_millis(value: &Value) -> Option<i64> {
mod tests {
use chrono::{DateTime, Duration};
use rand::{thread_rng, Rng};
use tokio::time::timeout;
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
use vrl::event_path;

Expand Down Expand Up @@ -483,4 +561,83 @@ mod tests {
_ => panic!("source_lag_time_seconds has invalid type"),
}
}

#[tokio::test]
async fn emits_component_discarded_events_total_for_send_event() {
metrics::init_test();
let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);

let event = Event::Metric(Metric::new(
"name",
MetricKind::Absolute,
MetricValue::Gauge { value: 123.4 },
));

// First send will succeed.
sender
.send_event(event.clone())
.await
.expect("First send should not fail");

// Second send will timeout, so the future will not be polled to completion.
let res = timeout(
std::time::Duration::from_millis(100),
sender.send_event(event.clone()),
)
.await;
assert!(res.is_err(), "Send should have timed out.");

let component_discarded_events_total = Controller::get()
.expect("There must be a controller")
.capture_metrics()
.into_iter()
.filter(|metric| metric.name() == "component_discarded_events_total")
.collect::<Vec<_>>();
assert_eq!(component_discarded_events_total.len(), 1);

let component_discarded_events_total = &component_discarded_events_total[0];
let MetricValue::Counter { value } = component_discarded_events_total.value() else {
panic!("component_discarded_events_total has invalid type")
};
assert_eq!(*value, 1.0);
}

#[tokio::test]
async fn emits_component_discarded_events_total_for_send_batch() {
metrics::init_test();
let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);

let expected_drop = 100;
let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
.map(|_| {
Event::Metric(Metric::new(
"name",
MetricKind::Absolute,
MetricValue::Gauge { value: 123.4 },
))
})
.collect();

// `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion.
let res = timeout(
std::time::Duration::from_millis(100),
sender.send_batch(events),
)
.await;
assert!(res.is_err(), "Send should have timed out.");

let component_discarded_events_total = Controller::get()
.expect("There must be a controller")
.capture_metrics()
.into_iter()
.filter(|metric| metric.name() == "component_discarded_events_total")
.collect::<Vec<_>>();
assert_eq!(component_discarded_events_total.len(), 1);

let component_discarded_events_total = &component_discarded_events_total[0];
let MetricValue::Counter { value } = component_discarded_events_total.value() else {
panic!("component_discarded_events_total has invalid type")
};
assert_eq!(*value, expected_drop as f64);
}
}
4 changes: 2 additions & 2 deletions src/sources/mongodb_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ impl SourceConfig for MongoDbMetricsConfig {
while interval.next().await.is_some() {
let start = Instant::now();
let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
let count = metrics.len();
emit!(CollectionCompleted {
start,
end: Instant::now()
});

let metrics = metrics.into_iter().flatten();
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
let count = metrics.len();

if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
Expand Down
4 changes: 2 additions & 2 deletions src/sources/nginx_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ impl SourceConfig for NginxMetricsConfig {
while interval.next().await.is_some() {
let start = Instant::now();
let metrics = join_all(sources.iter().map(|nginx| nginx.collect())).await;
let count = metrics.len();
emit!(CollectionCompleted {
start,
end: Instant::now()
});

let metrics = metrics.into_iter().flatten();
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
let count = metrics.len();

if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
Expand Down
5 changes: 3 additions & 2 deletions src/sources/postgresql_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,14 @@ impl SourceConfig for PostgresqlMetricsConfig {
while interval.next().await.is_some() {
let start = Instant::now();
let metrics = join_all(sources.iter_mut().map(|source| source.collect())).await;
let count = metrics.len();
emit!(CollectionCompleted {
start,
end: Instant::now()
});

let metrics = metrics.into_iter().flatten();
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
let count = metrics.len();

if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
Expand Down
4 changes: 3 additions & 1 deletion src/topology/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ async fn topology_swap_transform_is_atomic() {
}
};
let input = async move {
in1.send_batch(iter::from_fn(events)).await.unwrap();
in1.send_event_stream(stream::iter(iter::from_fn(events)))
.await
.unwrap();
};
let output = out1.for_each(move |_| {
recv_counter.fetch_add(1, Ordering::Release);
Expand Down

0 comments on commit a0e2769

Please sign in to comment.