Skip to content

Commit

Permalink
enhancement(pulsar sink): Add end-to-end acknowledgements support (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg authored Jul 28, 2022
1 parent 3d65e4e commit 975b99c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
22 changes: 17 additions & 5 deletions src/sinks/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
config::{
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription,
},
event::{Event, EventFinalizers, Finalizable},
event::{Event, EventFinalizers, EventStatus, Finalizable},
sinks::util::metadata::RequestMetadata,
};

Expand All @@ -50,6 +50,14 @@ pub struct PulsarSinkConfig {

#[configurable(derived)]
auth: Option<AuthConfig>,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}

/// Authentication configuration.
Expand Down Expand Up @@ -137,6 +145,7 @@ impl GenerateConfig for PulsarSinkConfig {
topic: "topic-1234".to_string(),
encoding: TextSerializerConfig::new().into(),
auth: None,
acknowledgements: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -178,7 +187,7 @@ impl SinkConfig for PulsarSinkConfig {
}

fn acknowledgements(&self) -> Option<&AcknowledgementsConfig> {
None
Some(&self.acknowledgements)
}
}

Expand Down Expand Up @@ -292,6 +301,7 @@ impl Sink<Event> for PulsarSink {
let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();
self.encoder.encode(event, &mut bytes).map_err(|_| {
finalizers.update_status(EventStatus::Errored);
// Error is handled by `Encoder`.
})?;

Expand Down Expand Up @@ -333,6 +343,8 @@ impl Sink<Event> for PulsarSink {
sequence_id = %result.sequence_id,
);

finalizers.update_status(EventStatus::Delivered);

emit!(EventsSent {
count: metadata.event_count(),
byte_size: metadata.events_byte_size(),
Expand All @@ -343,10 +355,9 @@ impl Sink<Event> for PulsarSink {
byte_size: metadata.request_encoded_size(),
protocol: "tcp",
});

drop(finalizers);
}
Some((Err(error), _, _)) => {
Some((Err(error), _, finalizers)) => {
finalizers.update_status(EventStatus::Errored);
error!(message = "Pulsar sink generated an error.", %error);
return Poll::Ready(Err(()));
}
Expand Down Expand Up @@ -402,6 +413,7 @@ mod integration_tests {
topic: topic.clone(),
encoding: TextSerializerConfig::new().into(),
auth: None,
acknowledgements: Default::default(),
};

let pulsar = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor)
Expand Down
2 changes: 1 addition & 1 deletion website/cue/reference/components/sinks/pulsar.cue
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ components: sinks: pulsar: {
}

features: {
acknowledgements: false
acknowledgements: true
healthcheck: enabled: true
send: {
compression: enabled: false
Expand Down

0 comments on commit 975b99c

Please sign in to comment.