From 71a6301d97b596b2bc9608e1b73977e2eac316a8 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Tue, 7 Jun 2022 17:04:10 -0600 Subject: [PATCH 1/3] chore(console sink): Defer finalization until after the write --- src/sinks/console/sink.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sinks/console/sink.rs b/src/sinks/console/sink.rs index 16950bca9d124..ac70eb1455077 100644 --- a/src/sinks/console/sink.rs +++ b/src/sinks/console/sink.rs @@ -12,7 +12,7 @@ use vector_core::{ use crate::{ codecs::Encoder, - event::Event, + event::{Event, Finalizable}, sinks::util::{encoding::Transformer, StreamSink}, }; @@ -33,6 +33,7 @@ where let event_byte_size = event.size_of(); self.transformer.transform(&mut event); + let finalizers = event.take_finalizers(); let mut bytes = BytesMut::new(); self.encoder.encode(event, &mut bytes).map_err(|_| { // Error is handled by `Encoder`. @@ -44,6 +45,7 @@ where error!(message = "Error writing to output. Stopping sink.", %error); })?; + drop(finalizers); self.acker.ack(1); emit!(EventsSent { From f0de4f2353176ebf0edaa3df9b3b3840f3a61d10 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Tue, 7 Jun 2022 17:07:56 -0600 Subject: [PATCH 2/3] enhancement(console sink): Add end-to-end acknowledgement support --- src/sinks/console/config.rs | 9 +++- src/sinks/console/sink.rs | 44 +++++++++++-------- .../reference/components/sinks/console.cue | 2 +- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/src/sinks/console/config.rs b/src/sinks/console/config.rs index 1e04e2945228f..502e4bfa3eb8f 100644 --- a/src/sinks/console/config.rs +++ b/src/sinks/console/config.rs @@ -38,6 +38,12 @@ pub struct ConsoleSinkConfig { EncodingConfig, StandardEncodingsWithFramingMigrator, >, + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, } impl GenerateConfig for ConsoleSinkConfig { @@ -45,6 +51,7 @@ impl GenerateConfig for ConsoleSinkConfig { toml::Value::try_from(Self { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Json).into(), + acknowledgements: Default::default(), }) .unwrap() } @@ -97,7 +104,7 @@ impl SinkConfig for ConsoleSinkConfig { } fn acknowledgements(&self) -> Option<&AcknowledgementsConfig> { - None + Some(&self.acknowledgements) } } diff --git a/src/sinks/console/sink.rs b/src/sinks/console/sink.rs index ac70eb1455077..6b8b2ff94e393 100644 --- a/src/sinks/console/sink.rs +++ b/src/sinks/console/sink.rs @@ -12,7 +12,7 @@ use vector_core::{ use crate::{ codecs::Encoder, - event::{Event, Finalizable}, + event::{Event, EventStatus, Finalizable}, sinks::util::{encoding::Transformer, StreamSink}, }; @@ -37,26 +37,32 @@ where let mut bytes = BytesMut::new(); self.encoder.encode(event, &mut bytes).map_err(|_| { // Error is handled by `Encoder`. + finalizers.update_status(EventStatus::Errored); })?; - self.output.write_all(&bytes).await.map_err(|error| { - // Error when writing to stdout/stderr is likely irrecoverable, - // so stop the sink. - error!(message = "Error writing to output. Stopping sink.", %error); - })?; - - drop(finalizers); - self.acker.ack(1); - - emit!(EventsSent { - byte_size: event_byte_size, - count: 1, - output: None, - }); - emit!(BytesSent { - byte_size: bytes.len(), - protocol: "console" - }); + match self.output.write_all(&bytes).await { + Err(error) => { + // Error when writing to stdout/stderr is likely irrecoverable, + // so stop the sink. + error!(message = "Error writing to output. Stopping sink.", %error); + finalizers.update_status(EventStatus::Errored); + return Err(()); + } + Ok(()) => { + finalizers.update_status(EventStatus::Delivered); + self.acker.ack(1); + + emit!(EventsSent { + byte_size: event_byte_size, + count: 1, + output: None, + }); + emit!(BytesSent { + byte_size: bytes.len(), + protocol: "console" + }); + } + } } Ok(()) diff --git a/website/cue/reference/components/sinks/console.cue b/website/cue/reference/components/sinks/console.cue index b339b4a730968..026eb5b43e27f 100644 --- a/website/cue/reference/components/sinks/console.cue +++ b/website/cue/reference/components/sinks/console.cue @@ -13,7 +13,7 @@ components: sinks: console: { } features: { - acknowledgements: false + acknowledgements: true healthcheck: enabled: false send: { compression: enabled: false From 51f629a20c395f99c794004ebe876b7254364a09 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 8 Jun 2022 08:07:04 -0600 Subject: [PATCH 3/3] Fix missing acknowledgements field in topology tests --- src/topology/test/doesnt_reload.rs | 1 + src/topology/test/reload.rs | 5 +++++ src/topology/test/source_finished.rs | 1 + 3 files changed, 7 insertions(+) diff --git a/src/topology/test/doesnt_reload.rs b/src/topology/test/doesnt_reload.rs index d48eeb436db44..f74b0f5254f16 100644 --- a/src/topology/test/doesnt_reload.rs +++ b/src/topology/test/doesnt_reload.rs @@ -20,6 +20,7 @@ async fn topology_doesnt_reload_new_data_dir() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, ); old_config.global.data_dir = Some(Path::new("/asdf").to_path_buf()); diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index 8ff6380de565f..0b428e4bded9c 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -39,6 +39,7 @@ async fn topology_reuse_old_port() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, ); @@ -50,6 +51,7 @@ async fn topology_reuse_old_port() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, ); @@ -73,6 +75,7 @@ async fn topology_rebuild_old() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, ); @@ -84,6 +87,7 @@ async fn topology_rebuild_old() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, ); @@ -109,6 +113,7 @@ async fn topology_old() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, ); diff --git a/src/topology/test/source_finished.rs b/src/topology/test/source_finished.rs index 022fb23592f18..270b7c2051270 100644 --- a/src/topology/test/source_finished.rs +++ b/src/topology/test/source_finished.rs @@ -21,6 +21,7 @@ async fn sources_finished() { ConsoleSinkConfig { target: Target::Stdout, encoding: EncodingConfig::from(StandardEncodings::Text).into(), + acknowledgements: Default::default(), }, );