diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 16a4393514141..0f895e2be7abe 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -3,6 +3,7 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; +use std::time::Duration; use crate::{ codecs::{Encoder, EncodingConfig, Transformer}, @@ -18,7 +19,7 @@ use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; use pulsar::error::AuthenticationError; use pulsar::{ message::proto, producer::SendFuture, proto::CommandSendReceipt, Authentication, - Error as PulsarError, Producer, Pulsar, TokioExecutor, + Error as PulsarError, Producer, Pulsar, TokioExecutor, OperationRetryOptions as RetryOptions }; use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; @@ -52,6 +53,9 @@ pub struct PulsarSinkConfig { #[configurable(derived)] pub encoding: EncodingConfig, + #[configurable(derived)] + pub operation_retry_options: OperationRetryOptions, + #[configurable(derived)] auth: Option, @@ -106,6 +110,18 @@ pub struct OAuth2Config { scope: Option, } +/// Pulsar topics lookup operation retry options. +#[configurable_component] +#[derive(Debug, Clone)] +pub struct OperationRetryOptions { + /// time limit to receive an answer to a Pulsar operation + pub operation_timeout: u64, + /// delay between operation retries after a ServiceNotReady error + pub retry_delay: u64, + /// maximum number of operation retries. None indicates infinite retries + pub max_retries: Option, +} + type PulsarProducer = Producer; type BoxedPulsarProducer = Box; @@ -150,6 +166,11 @@ impl GenerateConfig for PulsarSinkConfig { topic: "topic-1234".to_string(), partition_key_field: Some("message".to_string()), encoding: TextSerializerConfig::new().into(), + operation_retry_options: OperationRetryOptions { + operation_timeout: 30, + retry_delay: 5, + max_retries: None, + }, auth: None, acknowledgements: Default::default(), }) @@ -224,8 +245,20 @@ impl PulsarSinkConfig { ))), }; } - - let pulsar = builder.build().await?; + let mut operation_retry_options = RetryOptions::default(); + if self.operation_retry_options.max_retries > Some(0) { + operation_retry_options.max_retries = self.operation_retry_options.max_retries + } + if self.operation_retry_options.operation_timeout > 0 { + operation_retry_options.operation_timeout = + Duration::from_secs(self.operation_retry_options.operation_timeout) + } + if self.operation_retry_options.retry_delay > 0 { + operation_retry_options.retry_delay = + Duration::from_secs(self.operation_retry_options.retry_delay) + } + let pulsar = builder + .with_operation_retry_options(operation_retry_options).build().await?; if let SerializerConfig::Avro { avro } = self.encoding.config() { pulsar .producer() @@ -436,6 +469,11 @@ mod integration_tests { endpoint: pulsar_address(), topic: topic.clone(), encoding: TextSerializerConfig::new().into(), + operation_retry_options: OperationRetryOptions { + operation_timeout: 30, + retry_delay: 5, + max_retries: None, + }, auth: None, acknowledgements: Default::default(), partition_key_field: Some("message".to_string()),