Skip to content

Commit

Permalink
feat: Remove dependences on pin_project and futures_core
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Aug 7, 2024
1 parent 97f24cc commit 414719c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 56 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ version = "0.4.4"

[dependencies]
fastrand = "2.0.0"
futures-core = "0.3.26"
pin-project = "1"
tokio = { version = "1", features = ["time"] }

[dev-dependencies]
Expand Down
74 changes: 43 additions & 31 deletions src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::future::Future;
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;

use futures_core::ready;
use pin_project::pin_project;

use crate::backoff::BackoffBuilder;
use crate::Backoff;

Expand Down Expand Up @@ -66,7 +64,6 @@ where
}

/// Retry struct generated by [`Retryable`].
#[pin_project]
pub struct Retry<
B: Backoff,
T,
Expand All @@ -81,7 +78,6 @@ pub struct Retry<
notify: NF,
future_fn: FutureFn,

#[pin]
state: State<T, E, Fut>,
}

Expand Down Expand Up @@ -207,13 +203,12 @@ where
/// `tokio::time::Sleep` is a very struct that occupy 640B, so we wrap it
/// into a `Pin<Box<_>>` to avoid this enum too large.
#[derive(Default)]
#[pin_project(project = StateProject)]
enum State<T, E, Fut: Future<Output = Result<T, E>>> {
#[default]
Idle,
Polling(#[pin] Fut),
Polling(Fut),
// TODO: we need to support other sleeper
Sleeping(#[pin] Pin<Box<tokio::time::Sleep>>),
Sleeping(tokio::time::Sleep),
}

impl<B, T, E, Fut, FutureFn, RF, NF> Future for Retry<B, T, E, Fut, FutureFn, RF, NF>
Expand All @@ -227,36 +222,53 @@ where
type Output = Result<T, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// Safety: This is safe because we don't move the `Retry` struct itself,
// only its internal state.
//
// We do the exactly same thing like `pin_project` but without depending on it directly.
let this = unsafe { self.get_unchecked_mut() };

loop {
let state = this.state.as_mut().project();
match state {
StateProject::Idle => {
match &mut this.state {
State::Idle => {
let fut = (this.future_fn)();
this.state.set(State::Polling(fut));
this.state = State::Polling(fut);
continue;
}
StateProject::Polling(fut) => match ready!(fut.poll(cx)) {
Ok(v) => return Poll::Ready(Ok(v)),
Err(err) => {
// If input error is not retryable, return error directly.
if !(this.retryable)(&err) {
return Poll::Ready(Err(err));
}
match this.backoff.next() {
None => return Poll::Ready(Err(err)),
Some(dur) => {
(this.notify)(&err, dur);
this.state
.set(State::Sleeping(Box::pin(tokio::time::sleep(dur))));
continue;
State::Polling(fut) => {
// Safety: This is safe because we don't move the `Retry` struct and this fut,
// only its internal state.
//
// We do the exactly same thing like `pin_project` but without depending on it directly.
let mut fut = unsafe { Pin::new_unchecked(fut) };

match ready!(fut.as_mut().poll(cx)) {
Ok(v) => return Poll::Ready(Ok(v)),
Err(err) => {
// If input error is not retryable, return error directly.
if !(this.retryable)(&err) {
return Poll::Ready(Err(err));
}
match this.backoff.next() {
None => return Poll::Ready(Err(err)),
Some(dur) => {
(this.notify)(&err, dur);
this.state = State::Sleeping(tokio::time::sleep(dur));
continue;
}
}
}
}
},
StateProject::Sleeping(sl) => {
ready!(sl.poll(cx));
this.state.set(State::Idle);
}
State::Sleeping(sl) => {
// Safety: This is safe because we don't move the `Retry` struct and this fut,
// only its internal state.
//
// We do the exactly same thing like `pin_project` but without depending on it directly.
let mut sl = unsafe { Pin::new_unchecked(sl) };

ready!(sl.as_mut().poll(cx));
this.state = State::Idle;
continue;
}
}
Expand Down
55 changes: 32 additions & 23 deletions src/retry_with_context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::future::Future;
use std::pin::{pin, Pin};
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;

use futures_core::ready;
use pin_project::pin_project;

use crate::backoff::BackoffBuilder;
use crate::Backoff;

Expand Down Expand Up @@ -99,7 +97,6 @@ where
}

/// Retry struct generated by [`Retryable`].
#[pin_project]
pub struct Retry<
B: Backoff,
T,
Expand All @@ -115,7 +112,6 @@ pub struct Retry<
notify: NF,
future_fn: FutureFn,

#[pin]
state: State<T, E, Ctx, Fut>,
}

Expand Down Expand Up @@ -255,12 +251,11 @@ where
///
/// `tokio::time::Sleep` is a very struct that occupy 640B, so we wrap it
/// into a `Pin<Box<_>>` to avoid this enum too large.
#[pin_project(project = StateProject)]
enum State<T, E, Ctx, Fut: Future<Output = (Ctx, Result<T, E>)>> {
Idle(Option<Ctx>),
Polling(#[pin] Fut),
Polling(Fut),
// TODO: we need to support other sleeper
Sleeping((Option<Ctx>, Pin<Box<tokio::time::Sleep>>)),
Sleeping((Option<Ctx>, tokio::time::Sleep)),
}

impl<B, T, E, Ctx, Fut, FutureFn, RF, NF> Future for Retry<B, T, E, Ctx, Fut, FutureFn, RF, NF>
Expand All @@ -274,18 +269,28 @@ where
type Output = (Ctx, Result<T, E>);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// Safety: This is safe because we don't move the `Retry` struct itself,
// only its internal state.
//
// We do the exactly same thing like `pin_project` but without depending on it directly.
let this = unsafe { self.get_unchecked_mut() };

loop {
let state = this.state.as_mut().project();
match state {
StateProject::Idle(ctx) => {
match &mut this.state {
State::Idle(ctx) => {
let ctx = ctx.take().expect("context must be valid");
let fut = (this.future_fn)(ctx);
this.state.set(State::Polling(fut));
this.state = State::Polling(fut);
continue;
}
StateProject::Polling(fut) => {
let (ctx, res) = ready!(fut.poll(cx));
State::Polling(fut) => {
// Safety: This is safe because we don't move the `Retry` struct and this fut,
// only its internal state.
//
// We do the exactly same thing like `pin_project` but without depending on it directly.
let mut fut = unsafe { Pin::new_unchecked(fut) };

let (ctx, res) = ready!(fut.as_mut().poll(cx));
match res {
Ok(v) => return Poll::Ready((ctx, Ok(v))),
Err(err) => {
Expand All @@ -297,20 +302,24 @@ where
None => return Poll::Ready((ctx, Err(err))),
Some(dur) => {
(this.notify)(&err, dur);
this.state.set(State::Sleeping((
Some(ctx),
Box::pin(tokio::time::sleep(dur)),
)));
this.state =
State::Sleeping((Some(ctx), tokio::time::sleep(dur)));
continue;
}
}
}
}
}
StateProject::Sleeping((ctx, sl)) => {
ready!(pin!(sl).poll(cx));
State::Sleeping((ctx, sl)) => {
// Safety: This is safe because we don't move the `Retry` struct and this fut,
// only its internal state.
//
// We do the exactly same thing like `pin_project` but without depending on it directly.
let mut sl = unsafe { Pin::new_unchecked(sl) };

ready!(sl.as_mut().poll(cx));
let ctx = ctx.take().expect("context must be valid");
this.state.set(State::Idle(Some(ctx)));
this.state = State::Idle(Some(ctx));
continue;
}
}
Expand Down

0 comments on commit 414719c

Please sign in to comment.