Skip to content

Commit

Permalink
feat(kinesis sinks): implement full retry of partial failures in fire…
Browse files Browse the repository at this point in the history
…hose/streams (vectordotdev#17535)

This PR is from vectordotdev#16771 PR.
Refactor some action checking.

closes: vectordotdev#17424

---------

Signed-off-by: Spencer Gilbert <spencer.gilbert@datadoghq.com>
Co-authored-by: Jason Goodwin <jgoodwin@bluecatnetworks.com>
Co-authored-by: Jason Goodwin <jay.michael.goodwin@gmail.com>
Co-authored-by: Spencer Gilbert <spencer.gilbert@datadoghq.com>
  • Loading branch information
4 people authored Jun 16, 2023
1 parent 2ad964d commit bebac21
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 23 deletions.
8 changes: 7 additions & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub struct KinesisSinkBaseConfig {
#[serde(default)]
pub auth: AwsAuthentication,

/// Whether or not to retry successful requests containing partial failures.
#[serde(default)]
#[configurable(metadata(docs::advanced))]
pub request_retry_partial: bool,

#[configurable(derived)]
#[serde(
default,
Expand All @@ -77,6 +82,7 @@ pub fn build_sink<C, R, RR, E, RT>(
partition_key_field: Option<String>,
batch_settings: BatcherSettings,
client: C,
retry_logic: RT,
) -> crate::Result<VectorSink>
where
C: SendRecord + Clone + Send + Sync + 'static,
Expand All @@ -92,7 +98,7 @@ where

let region = config.region.region();
let service = ServiceBuilder::new()
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, RT::default())
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
.service(KinesisService::<C, R, E> {
client,
stream_name: config.stream_name.clone(),
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::FutureExt;
use snafu::Snafu;
use vector_config::configurable_component;

use crate::sinks::util::retries::RetryAction;
use crate::{
aws::{create_client, is_retriable_error, ClientBuilder},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig {
None,
batch_settings,
KinesisFirehoseClient { client },
KinesisRetryLogic {
retry_partial: self.base.request_retry_partial,
},
)?;

Ok((sink, healthcheck))
Expand All @@ -166,7 +170,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig {
}

#[derive(Clone, Default)]
struct KinesisRetryLogic;
struct KinesisRetryLogic {
retry_partial: bool,
}

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError>;
Expand All @@ -180,4 +186,13 @@ impl RetryLogic for KinesisRetryLogic {
}
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
} else {
RetryAction::Successful
}
}
}
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async fn firehose_put_records() {
tls: None,
auth: Default::default(),
acknowledgements: Default::default(),
request_retry_partial: Default::default(),
};

let config = KinesisFirehoseSinkConfig { batch, base };
Expand Down
21 changes: 18 additions & 3 deletions src/sinks/aws_kinesis/firehose/record.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use aws_sdk_firehose::output::PutRecordBatchOutput;
use aws_sdk_firehose::types::{Blob, SdkError};
use bytes::Bytes;
use tracing::Instrument;

use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
use crate::sinks::prelude::*;

use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};

#[derive(Clone)]
pub struct KinesisFirehoseRecord {
Expand Down Expand Up @@ -46,14 +49,26 @@ impl SendRecord for KinesisFirehoseClient {
type T = KinesisRecord;
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len();
let total_size = records.iter().fold(0, |acc, record| {
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
});
self.client
.put_record_batch()
.set_records(Some(records))
.delivery_stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
.map(|output: PutRecordBatchOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_put_count().unwrap_or(0) as usize,
events_byte_size: JsonSize::new(total_size),
})
}
}
2 changes: 2 additions & 0 deletions src/sinks/aws_kinesis/firehose/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn check_batch_size() {
request: Default::default(),
tls: None,
auth: Default::default(),
request_retry_partial: false,
acknowledgements: Default::default(),
};

Expand Down Expand Up @@ -62,6 +63,7 @@ async fn check_batch_events() {
request: Default::default(),
tls: None,
auth: Default::default(),
request_retry_partial: false,
acknowledgements: Default::default(),
};

Expand Down
7 changes: 6 additions & 1 deletion src/sinks/aws_kinesis/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use aws_smithy_client::SdkError;
use bytes::Bytes;

use super::KinesisResponse;
/// An AWS Kinesis record type primarily to store the underlying aws crates' actual record `T`, and
/// to abstract the encoded length calculation.
pub trait Record {
Expand All @@ -24,5 +25,9 @@ pub trait SendRecord {
type E;

/// Sends the records.
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>>;
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>>;
}
20 changes: 7 additions & 13 deletions src/sinks/aws_kinesis/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ where
}

pub struct KinesisResponse {
count: usize,
events_byte_size: JsonSize,
pub(crate) count: usize,
pub(crate) failure_count: usize,
pub(crate) events_byte_size: JsonSize,
}

impl DriverResponse for KinesisResponse {
Expand Down Expand Up @@ -72,7 +73,6 @@ where
let events_byte_size = requests
.get_metadata()
.events_estimated_json_encoded_byte_size();
let count = requests.get_metadata().event_count();

let records = requests
.events
Expand All @@ -84,16 +84,10 @@ where
let stream_name = self.stream_name.clone();

Box::pin(async move {
// Returning a Result (a trait that implements Try) is not a stable feature,
// so instead we have to explicitly check for error and return.
// https://github.com/rust-lang/rust/issues/84277
if let Some(e) = client.send(records, stream_name).await {
return Err(e);
}

Ok(KinesisResponse {
count,
events_byte_size,
client.send(records, stream_name).await.map(|mut r| {
// augment the response
r.events_byte_size = events_byte_size;
r
})
})
}
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::FutureExt;
use snafu::Snafu;
use vector_config::{component::GenerateConfig, configurable_component};

use crate::sinks::util::retries::RetryAction;
use crate::{
aws::{create_client, is_retriable_error, ClientBuilder},
config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig {
self.partition_key_field.clone(),
batch_settings,
KinesisStreamClient { client },
KinesisRetryLogic {
retry_partial: self.base.request_retry_partial,
},
)?;

Ok((sink, healthcheck))
Expand All @@ -173,7 +177,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig {
}
}
#[derive(Default, Clone)]
struct KinesisRetryLogic;
struct KinesisRetryLogic {
retry_partial: bool,
}

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError>;
Expand All @@ -193,6 +199,15 @@ impl RetryLogic for KinesisRetryLogic {
}
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
} else {
RetryAction::Successful
}
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/streams/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async fn kinesis_put_records_without_partition_key() {
tls: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
request_retry_partial: Default::default(),
};

let config = KinesisStreamsSinkConfig {
Expand Down
21 changes: 18 additions & 3 deletions src/sinks/aws_kinesis/streams/record.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use aws_sdk_kinesis::output::PutRecordsOutput;
use aws_sdk_kinesis::types::{Blob, SdkError};
use bytes::Bytes;
use tracing::Instrument;

use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
use crate::sinks::prelude::*;

use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};

#[derive(Clone)]
pub struct KinesisStreamRecord {
Expand Down Expand Up @@ -62,14 +65,26 @@ impl SendRecord for KinesisStreamClient {
type T = KinesisRecord;
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len();
let total_size = records.iter().fold(0, |acc, record| {
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
});
self.client
.put_records()
.set_records(Some(records))
.stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
.map(|output: PutRecordsOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_record_count().unwrap_or(0) as usize,
events_byte_size: JsonSize::new(total_size),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ components: sinks: aws_kinesis_firehose: components._aws & {

configuration: base.components.sinks.aws_kinesis_firehose.configuration & {
_aws_include: false
request_retry_partial: warnings: ["This can cause duplicate logs to be published."]
}

input: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
}
}
}
request_retry_partial: {
description: "Whether or not to retry successful requests containing partial failures."
required: false
type: bool: default: false
}
stream_name: {
description: """
The [stream name][stream_name] of the target Kinesis Firehose delivery stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ base: components: sinks: aws_kinesis_streams: configuration: {
}
}
}
request_retry_partial: {
description: "Whether or not to retry successful requests containing partial failures."
required: false
type: bool: default: false
}
stream_name: {
description: """
The [stream name][stream_name] of the target Kinesis Firehose delivery stream.
Expand Down

0 comments on commit bebac21

Please sign in to comment.