Skip to content

Commit

Permalink
fix(reduce transform): Revert flushing on interval change to `expire_…
Browse files Browse the repository at this point in the history
…metrics_ms` (#17084)

Revert "fix(reduce transform): Fix flush not occuring when events arrive in high rate (#16146)"

This reverts commit ab45939.
  • Loading branch information
jszwedko authored Apr 7, 2023
1 parent 5d88655 commit e86b155
Showing 1 changed file with 3 additions and 51 deletions.
54 changes: 3 additions & 51 deletions src/transforms/reduce/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::BTreeMap;
use std::{
cmp::min,
collections::{hash_map, HashMap},
num::NonZeroUsize,
pin::Pin,
Expand Down Expand Up @@ -230,7 +229,6 @@ struct ReduceState {
events: usize,
fields: HashMap<String, Box<dyn ReduceValueMerger>>,
stale_since: Instant,
last_flushed_at: Instant,
metadata: EventMetadata,
}

Expand All @@ -242,7 +240,6 @@ impl ReduceState {
Self {
events: 0,
stale_since: Instant::now(),
last_flushed_at: Instant::now(),
fields,
metadata,
}
Expand Down Expand Up @@ -346,10 +343,9 @@ impl Reduce {
fn flush_into(&mut self, output: &mut Vec<Event>) {
let mut flush_discriminants = Vec::new();
let now = Instant::now();
for (k, t) in &mut self.reduce_merge_states {
if now - min(t.stale_since, t.last_flushed_at) >= self.expire_after {
for (k, t) in &self.reduce_merge_states {
if (now - t.stale_since) >= self.expire_after {
flush_discriminants.push(k.clone());
t.last_flushed_at = Instant::now();
}
}
for k in &flush_discriminants {
Expand Down Expand Up @@ -476,8 +472,7 @@ impl TaskTransform<Event> for Reduce {
#[cfg(test)]
mod test {
use serde_json::json;
use tokio::sync::mpsc::{self, Sender};
use tokio::time::sleep;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use value::Kind;

Expand Down Expand Up @@ -908,47 +903,4 @@ merge_strategies.bar = "concat"
})
.await;
}

/// Tests the case where both starts_when and ends_when are not defined,
/// and aggregation continues on and on, without flushing as long as events
/// arrive in rate that is faster than the rate of expire_ms between events.
#[tokio::test]
async fn last_flush_at() {
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
group_by = [ "user_id" ]
expire_after_ms = 200
flush_period_ms = 250
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

async fn send_event(tx: &Sender<Event>, user_id: i32) {
let mut log_event = LogEvent::from("test message");
log_event.insert("user_id", user_id.to_string());
tx.send(log_event.into()).await.unwrap();
}

// send in a rate that is double than the rate of of expire_ms between events
for _ in 0..5 {
send_event(&tx, 1).await;
sleep(Duration::from_millis(50)).await;
send_event(&tx, 2).await;
sleep(Duration::from_millis(50)).await;
}

// verify messages arrive during this time
out.try_recv().expect("No message arrived");
sleep(Duration::from_millis(10)).await;
out.try_recv().expect("No message arrived");

drop(tx);
topology.stop().await;
})
.await;
}
}

0 comments on commit e86b155

Please sign in to comment.