Skip to content

Commit

Permalink
enhancement(networking, sinks): add full jitter to retry backoff poli…
Browse files Browse the repository at this point in the history
…cy (#19106)

* enhancement(networking, sinks): add full jitter to retry backoff policy

* fmt

* fix tests

* add test

* fix

* force ci
  • Loading branch information
dsmith3197 authored and jszwedko committed Nov 16, 2023
1 parent 9f82d6a commit 3f6b237
Show file tree
Hide file tree
Showing 46 changed files with 1,076 additions and 63 deletions.
6 changes: 4 additions & 2 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ use crate::sinks::{
config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic,
sink::BatchCloudwatchRequest, CloudwatchKey,
},
util::{retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings},
util::{
retries::FibonacciRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings,
},
};

type Svc = Buffer<
ConcurrencyLimit<
RateLimit<
Retry<
FixedRetryPolicy<CloudwatchRetryLogic<()>>,
FibonacciRetryPolicy<CloudwatchRetryLogic<()>>,
Buffer<Timeout<CloudwatchLogsSvc>, Vec<InputLogEvent>>,
>,
>,
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/util/adaptive_concurrency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
metrics,
sinks::{
util::{
retries::RetryLogic, BatchSettings, Concurrency, EncodedEvent, EncodedLength,
TowerRequestConfig, VecBuffer,
retries::{JitterMode, RetryLogic},
BatchSettings, Concurrency, EncodedEvent, EncodedLength, TowerRequestConfig, VecBuffer,
},
Healthcheck, VectorSink,
},
Expand Down Expand Up @@ -417,6 +417,7 @@ async fn run_test(params: TestParams) -> TestResults {
concurrency: params.concurrency,
rate_limit_num: Some(9999),
timeout_secs: Some(1),
retry_jitter_mode: JitterMode::None,
..Default::default()
},
params,
Expand Down
117 changes: 101 additions & 16 deletions src/sinks/util/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
use futures::FutureExt;
use tokio::time::{sleep, Sleep};
use tower::{retry::Policy, timeout::error::Elapsed};
use vector_lib::configurable::configurable_component;

use crate::Error;

Expand All @@ -34,50 +35,87 @@ pub trait RetryLogic: Clone + Send + Sync + 'static {
}
}

/// The jitter mode to use for retry backoff behavior.
#[configurable_component]
#[derive(Clone, Copy, Debug, Default)]
pub enum JitterMode {
/// No jitter.
None,

/// Full jitter.
///
/// The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff
/// strategy.
///
/// Incorporating full jitter into your backoff strategy can greatly reduce the likelihood
/// of creating accidental denial of service (DoS) conditions against your own systems when
/// many clients are recovering from a failure state.
#[default]
Full,
}

#[derive(Debug, Clone)]
pub struct FixedRetryPolicy<L> {
pub struct FibonacciRetryPolicy<L> {
remaining_attempts: usize,
previous_duration: Duration,
current_duration: Duration,
jitter_mode: JitterMode,
current_jitter_duration: Duration,
max_duration: Duration,
logic: L,
}

pub struct RetryPolicyFuture<L: RetryLogic> {
delay: Pin<Box<Sleep>>,
policy: FixedRetryPolicy<L>,
policy: FibonacciRetryPolicy<L>,
}

impl<L: RetryLogic> FixedRetryPolicy<L> {
pub const fn new(
impl<L: RetryLogic> FibonacciRetryPolicy<L> {
pub fn new(
remaining_attempts: usize,
initial_backoff: Duration,
max_duration: Duration,
logic: L,
jitter_mode: JitterMode,
) -> Self {
FixedRetryPolicy {
FibonacciRetryPolicy {
remaining_attempts,
previous_duration: Duration::from_secs(0),
current_duration: initial_backoff,
jitter_mode,
current_jitter_duration: Self::add_full_jitter(initial_backoff),
max_duration,
logic,
}
}

fn advance(&self) -> FixedRetryPolicy<L> {
let next_duration: Duration = self.previous_duration + self.current_duration;
fn add_full_jitter(d: Duration) -> Duration {
let jitter = (rand::random::<u64>() % (d.as_millis() as u64)) + 1;
Duration::from_millis(jitter)
}

fn advance(&self) -> FibonacciRetryPolicy<L> {
let next_duration: Duration = cmp::min(
self.previous_duration + self.current_duration,
self.max_duration,
);

FixedRetryPolicy {
FibonacciRetryPolicy {
remaining_attempts: self.remaining_attempts - 1,
previous_duration: self.current_duration,
current_duration: cmp::min(next_duration, self.max_duration),
current_duration: next_duration,
current_jitter_duration: Self::add_full_jitter(next_duration),
jitter_mode: self.jitter_mode,
max_duration: self.max_duration,
logic: self.logic.clone(),
}
}

const fn backoff(&self) -> Duration {
self.current_duration
match self.jitter_mode {
JitterMode::None => self.current_duration,
JitterMode::Full => self.current_jitter_duration,
}
}

fn build_retry(&self) -> RetryPolicyFuture<L> {
Expand All @@ -89,7 +127,7 @@ impl<L: RetryLogic> FixedRetryPolicy<L> {
}
}

impl<Req, Res, L> Policy<Req, Res, Error> for FixedRetryPolicy<L>
impl<Req, Res, L> Policy<Req, Res, Error> for FibonacciRetryPolicy<L>
where
Req: Clone,
L: RetryLogic<Response = Res>,
Expand Down Expand Up @@ -168,7 +206,7 @@ where
impl<L: RetryLogic> Unpin for RetryPolicyFuture<L> {}

impl<L: RetryLogic> Future for RetryPolicyFuture<L> {
type Output = FixedRetryPolicy<L>;
type Output = FibonacciRetryPolicy<L>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
std::task::ready!(self.delay.poll_unpin(cx));
Expand Down Expand Up @@ -288,11 +326,12 @@ mod tests {

time::pause();

let policy = FixedRetryPolicy::new(
let policy = FibonacciRetryPolicy::new(
5,
Duration::from_secs(1),
Duration::from_secs(10),
SvcRetryLogic,
JitterMode::None,
);

let (mut svc, mut handle) = mock::spawn_layer(RetryLayer::new(policy));
Expand All @@ -317,11 +356,12 @@ mod tests {
async fn service_error_no_retry() {
trace_init();

let policy = FixedRetryPolicy::new(
let policy = FibonacciRetryPolicy::new(
5,
Duration::from_secs(1),
Duration::from_secs(10),
SvcRetryLogic,
JitterMode::None,
);

let (mut svc, mut handle) = mock::spawn_layer(RetryLayer::new(policy));
Expand All @@ -339,11 +379,12 @@ mod tests {

time::pause();

let policy = FixedRetryPolicy::new(
let policy = FibonacciRetryPolicy::new(
5,
Duration::from_secs(1),
Duration::from_secs(10),
SvcRetryLogic,
JitterMode::None,
);

let (mut svc, mut handle) = mock::spawn_layer(RetryLayer::new(policy));
Expand All @@ -363,11 +404,12 @@ mod tests {

#[test]
fn backoff_grows_to_max() {
let mut policy = FixedRetryPolicy::new(
let mut policy = FibonacciRetryPolicy::new(
10,
Duration::from_secs(1),
Duration::from_secs(10),
SvcRetryLogic,
JitterMode::None,
);
assert_eq!(Duration::from_secs(1), policy.backoff());

Expand All @@ -393,6 +435,49 @@ mod tests {
assert_eq!(Duration::from_secs(10), policy.backoff());
}

#[test]
fn backoff_grows_to_max_with_jitter() {
let max_duration = Duration::from_secs(10);
let mut policy = FibonacciRetryPolicy::new(
10,
Duration::from_secs(1),
max_duration,
SvcRetryLogic,
JitterMode::Full,
);

let expected_fib = [1, 1, 2, 3, 5, 8];

for (i, &exp_fib_secs) in expected_fib.iter().enumerate() {
let backoff = policy.backoff();
let upper_bound = Duration::from_secs(exp_fib_secs);

// Check if the backoff is within the expected range, considering the jitter
assert!(
!backoff.is_zero() && backoff <= upper_bound,
"Attempt {}: Expected backoff to be within 0 and {:?}, got {:?}",
i + 1,
upper_bound,
backoff
);

policy = policy.advance();
}

// Once the max backoff is reached, it should not exceed the max backoff.
for _ in 0..4 {
let backoff = policy.backoff();
assert!(
!backoff.is_zero() && backoff <= max_duration,
"Expected backoff to not exceed {:?}, got {:?}",
max_duration,
backoff
);

policy = policy.advance();
}
}

#[derive(Debug, Clone)]
struct SvcRetryLogic;

Expand Down
23 changes: 17 additions & 6 deletions src/sinks/util/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
adaptive_concurrency::{
AdaptiveConcurrencyLimit, AdaptiveConcurrencyLimitLayer, AdaptiveConcurrencySettings,
},
retries::{FixedRetryPolicy, RetryLogic},
retries::{FibonacciRetryPolicy, JitterMode, RetryLogic},
service::map::MapLayer,
sink::Response,
Batch, BatchSink, Partition, PartitionBatchSink,
Expand All @@ -37,13 +37,14 @@ mod health;
mod map;
pub mod net;

pub type Svc<S, L> = RateLimit<AdaptiveConcurrencyLimit<Retry<FixedRetryPolicy<L>, Timeout<S>>, L>>;
pub type Svc<S, L> =
RateLimit<AdaptiveConcurrencyLimit<Retry<FibonacciRetryPolicy<L>, Timeout<S>>, L>>;
pub type TowerBatchedSink<S, B, RL> = BatchSink<Svc<S, RL>, B>;
pub type TowerPartitionSink<S, B, RL, K> = PartitionBatchSink<Svc<S, RL>, B, K>;

// Distributed service types
pub type DistributedService<S, RL, HL, K, Req> = RateLimit<
Retry<FixedRetryPolicy<RL>, Buffer<Balance<DiscoveryService<S, RL, HL, K>, Req>, Req>>,
Retry<FibonacciRetryPolicy<RL>, Buffer<Balance<DiscoveryService<S, RL, HL, K>, Req>, Req>>,
>;
pub type DiscoveryService<S, RL, HL, K> =
BoxStream<'static, Result<Change<K, SingleDistributedService<S, RL, HL>>, crate::Error>>;
Expand Down Expand Up @@ -85,7 +86,9 @@ impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {

/// Middleware settings for outbound requests.
///
/// Various settings can be configured, such as concurrency and rate limits, timeouts, etc.
/// Various settings can be configured, such as concurrency and rate limits, timeouts, retry behavior, etc.
///
/// Note that the retry backoff policy follows the Fibonacci sequence.
#[serde_as]
#[configurable_component]
#[configurable(metadata(docs::advanced))]
Expand Down Expand Up @@ -138,6 +141,10 @@ pub struct TowerRequestConfig {
#[configurable(metadata(docs::human_name = "Retry Initial Backoff"))]
pub retry_initial_backoff_secs: Option<u64>,

#[configurable(derived)]
#[serde(default)]
pub retry_jitter_mode: JitterMode,

#[configurable(derived)]
#[serde(default)]
pub adaptive_concurrency: AdaptiveConcurrencySettings,
Expand Down Expand Up @@ -184,6 +191,7 @@ impl Default for TowerRequestConfig {
retry_max_duration_secs: default_retry_max_duration_secs(),
retry_initial_backoff_secs: default_retry_initial_backoff_secs(),
adaptive_concurrency: AdaptiveConcurrencySettings::default(),
retry_jitter_mode: JitterMode::default(),
}
}
}
Expand Down Expand Up @@ -265,6 +273,7 @@ impl TowerRequestConfig {
.unwrap(),
),
adaptive_concurrency: self.adaptive_concurrency,
retry_jitter_mode: self.retry_jitter_mode,
}
}
}
Expand All @@ -279,15 +288,17 @@ pub struct TowerRequestSettings {
pub retry_max_duration_secs: Duration,
pub retry_initial_backoff_secs: Duration,
pub adaptive_concurrency: AdaptiveConcurrencySettings,
pub retry_jitter_mode: JitterMode,
}

impl TowerRequestSettings {
pub const fn retry_policy<L: RetryLogic>(&self, logic: L) -> FixedRetryPolicy<L> {
FixedRetryPolicy::new(
pub fn retry_policy<L: RetryLogic>(&self, logic: L) -> FibonacciRetryPolicy<L> {
FibonacciRetryPolicy::new(
self.retry_attempts,
self.retry_initial_backoff_secs,
self.retry_max_duration_secs,
logic,
self.retry_jitter_mode,
)
}

Expand Down
24 changes: 23 additions & 1 deletion website/cue/reference/components/sinks/base/appsignal.cue
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ base: components: sinks: appsignal: configuration: {
description: """
Middleware settings for outbound requests.
Various settings can be configured, such as concurrency and rate limits, timeouts, etc.
Various settings can be configured, such as concurrency and rate limits, timeouts, retry behavior, etc.
Note that the retry backoff policy follows the Fibonacci sequence.
"""
required: false
type: object: options: {
Expand Down Expand Up @@ -273,6 +275,26 @@ base: components: sinks: appsignal: configuration: {
unit: "seconds"
}
}
retry_jitter_mode: {
description: "The jitter mode to use for retry backoff behavior."
required: false
type: string: {
default: "Full"
enum: {
Full: """
Full jitter.
The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff
strategy.
Incorporating full jitter into your backoff strategy can greatly reduce the likelihood
of creating accidental denial of service (DoS) conditions against your own systems when
many clients are recovering from a failure state.
"""
None: "No jitter."
}
}
}
retry_max_duration_secs: {
description: "The maximum amount of time to wait between retries."
required: false
Expand Down
Loading

0 comments on commit 3f6b237

Please sign in to comment.