Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): replace backoff with backon #1653

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2"
async-broadcast = "0.7.0"
async-stream = "0.3.5"
async-trait = "0.1.64"
backoff = "0.4.0"
backon = "1.3"
base64 = "0.22.1"
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] }
hyper = { workspace = true, features = ["client", "http1"] }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] }
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] }
edit = "0.1.3"
tokio-stream = { version = "0.1.9", features = ["net"] }
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ json-patch.workspace = true
jsonptr.workspace = true
serde_json.workspace = true
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use crate::{
ObjectRef,
},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
utils::{
trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt,
},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use educe::Educe;
use futures::{
channel,
Expand Down Expand Up @@ -915,7 +916,7 @@ where
/// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
/// but can be overridden by calling this method.
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}
Expand Down
62 changes: 29 additions & 33 deletions kube-runtime/src/utils/backoff_reset_timer.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
use std::time::{Duration, Instant};

use backoff::{backoff::Backoff, Clock, SystemClock};
pub trait Backoff: Iterator<Item = Duration> + Send + Sync + Unpin {
/// Resets the internal state to the initial value.
fn reset(&mut self);
}

impl<B: Backoff + ?Sized> Backoff for Box<B> {
fn reset(&mut self) {
let this: &mut B = self;
this.reset()
}
}

/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
pub struct ResetTimerBackoff<B, C = SystemClock> {
pub struct ResetTimerBackoff<B: Backoff> {
backoff: B,
clock: C,
last_backoff: Option<Instant>,
reset_duration: Duration,
}

impl<B: Backoff> ResetTimerBackoff<B> {
pub fn new(backoff: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(backoff, reset_duration, SystemClock {})
}
}

impl<B: Backoff, C: Clock> ResetTimerBackoff<B, C> {
fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self {
Self {
backoff,
clock,
last_backoff: None,
reset_duration,
}
}
}

impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
fn next_backoff(&mut self) -> Option<Duration> {
impl<B: Backoff> Iterator for ResetTimerBackoff<B> {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
if let Some(last_backoff) = self.last_backoff {
if self.clock.now() > last_backoff + self.reset_duration {
if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration {
tracing::debug!(
?last_backoff,
reset_duration = ?self.reset_duration,
Expand All @@ -39,48 +43,40 @@ impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
self.backoff.reset();
}
}
self.last_backoff = Some(self.clock.now());
self.backoff.next_backoff()
self.last_backoff = Some(tokio::time::Instant::now().into_std());
self.backoff.next()
}
}

impl<B: Backoff> Backoff for ResetTimerBackoff<B> {
fn reset(&mut self) {
// Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires.
self.backoff.reset();
}
}

#[cfg(test)]
mod tests {
use backoff::{backoff::Backoff, Clock};
use tokio::time::advance;

use super::ResetTimerBackoff;
use crate::utils::stream_backoff::tests::LinearBackoff;
use std::time::{Duration, Instant};
use std::time::Duration;

#[tokio::test]
async fn should_reset_when_timer_expires() {
tokio::time::pause();
let mut backoff = ResetTimerBackoff::new_with_custom_clock(
let mut backoff = ResetTimerBackoff::new(
LinearBackoff::new(Duration::from_secs(2)),
Duration::from_secs(60),
TokioClock,
);
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4)));
assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6)));
assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
}

struct TokioClock;

impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
}
}
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod reflect;
mod stream_backoff;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
pub use event_decode::EventDecode;
pub use event_modify::EventModify;
pub use predicate::{predicates, Predicate, PredicateFilter};
Expand Down
78 changes: 66 additions & 12 deletions kube-runtime/src/utils/stream_backoff.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{future::Future, pin::Pin, task::Poll};

use backoff::backoff::Backoff;
use futures::{Stream, TryStream};
use pin_project::pin_project;
use tokio::time::{sleep, Instant, Sleep};

use crate::utils::Backoff;

/// Applies a [`Backoff`] policy to a [`Stream`]
///
/// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The
Expand Down Expand Up @@ -71,7 +72,7 @@ impl<S: TryStream, B: Backoff> Stream for StreamBackoff<S, B> {
let next_item = this.stream.try_poll_next(cx);
match &next_item {
Poll::Ready(Some(Err(_))) => {
if let Some(backoff_duration) = this.backoff.next_backoff() {
if let Some(backoff_duration) = this.backoff.next() {
let backoff_sleep = sleep(backoff_duration);
tracing::debug!(
deadline = ?backoff_sleep.deadline(),
Expand All @@ -98,16 +99,54 @@ impl<S: TryStream, B: Backoff> Stream for StreamBackoff<S, B> {
pub(crate) mod tests {
use std::{pin::pin, task::Poll, time::Duration};

use crate::utils::Backoff;

use super::StreamBackoff;
use backoff::backoff::Backoff;
use backon::BackoffBuilder;
use futures::{channel::mpsc, poll, stream, StreamExt};

pub struct ConstantBackoff {
inner: backon::ConstantBackoff,
delay: Duration,
max_times: usize,
}

impl ConstantBackoff {
pub fn new(delay: Duration, max_times: usize) -> Self {
Self {
inner: backon::ConstantBuilder::default()
.with_delay(delay)
.with_max_times(max_times)
.build(),
delay,
max_times,
}
}
}

impl Iterator for ConstantBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
self.inner.next()
}
}

impl Backoff for ConstantBackoff {
fn reset(&mut self) {
self.inner = backon::ConstantBuilder::default()
.with_delay(self.delay)
.with_max_times(self.max_times)
.build();
}
}

#[tokio::test]
async fn stream_should_back_off() {
tokio::time::pause();
let tick = Duration::from_secs(1);
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick)));
let mut rx = pin!(StreamBackoff::new(rx, ConstantBackoff::new(tick, 10)));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2))));
Expand Down Expand Up @@ -149,16 +188,27 @@ pub(crate) mod tests {
#[tokio::test]
async fn backoff_should_close_when_requested() {
assert_eq!(
StreamBackoff::new(
stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]),
backoff::backoff::Stop {}
)
.collect::<Vec<_>>()
.await,
StreamBackoff::new(stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), StoppedBackoff {})
.collect::<Vec<_>>()
.await,
vec![Ok(0), Ok(1), Err(2)]
);
}

struct StoppedBackoff;

impl Backoff for StoppedBackoff {
fn reset(&mut self) {}
}

impl Iterator for StoppedBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
None
}
}

/// Dynamic backoff policy that is still deterministic and testable
pub struct LinearBackoff {
interval: Duration,
Expand All @@ -174,12 +224,16 @@ pub(crate) mod tests {
}
}

impl Backoff for LinearBackoff {
fn next_backoff(&mut self) -> Option<Duration> {
impl Iterator for LinearBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
self.current_duration += self.interval;
Some(self.current_duration)
}
}

impl Backoff for LinearBackoff {
fn reset(&mut self) {
self.current_duration = Duration::ZERO
}
Expand Down
6 changes: 4 additions & 2 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use crate::{
};
use kube_client::Resource;

use crate::{reflector::store::Writer, utils::Reflect};
use crate::{
reflector::store::Writer,
utils::{Backoff, Reflect},
};

use crate::watcher::DefaultBackoff;
use backoff::backoff::Backoff;
use futures::{Stream, TryStream};

/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
Expand Down
Loading
Loading