From 410a42e6f61dc142ff5abd0cc672163ab43bc70c Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 27 Jul 2022 17:56:23 -0600 Subject: [PATCH] enhancement(pulsar sink): Add end-to-end acknowledgements support --- src/sinks/pulsar.rs | 22 ++++++++++++++----- .../cue/reference/components/sinks/pulsar.cue | 2 +- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 7adc6b3fed2f8..63a7c6132894c 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -24,7 +24,7 @@ use crate::{ config::{ AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, }, - event::{Event, EventFinalizers, Finalizable}, + event::{Event, EventFinalizers, EventStatus, Finalizable}, sinks::util::metadata::RequestMetadata, }; @@ -50,6 +50,14 @@ pub struct PulsarSinkConfig { #[configurable(derived)] auth: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, } /// Authentication configuration. @@ -137,6 +145,7 @@ impl GenerateConfig for PulsarSinkConfig { topic: "topic-1234".to_string(), encoding: TextSerializerConfig::new().into(), auth: None, + acknowledgements: Default::default(), }) .unwrap() } @@ -178,7 +187,7 @@ impl SinkConfig for PulsarSinkConfig { } fn acknowledgements(&self) -> Option<&AcknowledgementsConfig> { - None + Some(&self.acknowledgements) } } @@ -292,6 +301,7 @@ impl Sink for PulsarSink { let finalizers = event.take_finalizers(); let mut bytes = BytesMut::new(); self.encoder.encode(event, &mut bytes).map_err(|_| { + finalizers.update_status(EventStatus::Errored); // Error is handled by `Encoder`. })?; @@ -333,6 +343,8 @@ impl Sink for PulsarSink { sequence_id = %result.sequence_id, ); + finalizers.update_status(EventStatus::Delivered); + emit!(EventsSent { count: metadata.event_count(), byte_size: metadata.events_byte_size(), @@ -343,10 +355,9 @@ impl Sink for PulsarSink { byte_size: metadata.request_encoded_size(), protocol: "tcp", }); - - drop(finalizers); } - Some((Err(error), _, _)) => { + Some((Err(error), _, finalizers)) => { + finalizers.update_status(EventStatus::Errored); error!(message = "Pulsar sink generated an error.", %error); return Poll::Ready(Err(())); } @@ -402,6 +413,7 @@ mod integration_tests { topic: topic.clone(), encoding: TextSerializerConfig::new().into(), auth: None, + acknowledgements: Default::default(), }; let pulsar = Pulsar::::builder(&cnf.endpoint, TokioExecutor) diff --git a/website/cue/reference/components/sinks/pulsar.cue b/website/cue/reference/components/sinks/pulsar.cue index 6131135dc6f2c..203ca0a080212 100644 --- a/website/cue/reference/components/sinks/pulsar.cue +++ b/website/cue/reference/components/sinks/pulsar.cue @@ -13,7 +13,7 @@ components: sinks: pulsar: { } features: { - acknowledgements: false + acknowledgements: true healthcheck: enabled: true send: { compression: enabled: false