Skip to content

Commit

Permalink
Remove Clone constraints and buffer the service (#348)
Browse files Browse the repository at this point in the history
* feat: remove the `Clone` requirements for services

* test save

* fix: get buffered layer working

* update: remove clone & update api

* fix: tests and api

* lint: clippy fixes

* lint: cargo fmt
  • Loading branch information
geofmureithi authored Jul 4, 2024
1 parent dd9570c commit 5861e84
Show file tree
Hide file tree
Showing 17 changed files with 613 additions and 71 deletions.
2 changes: 1 addition & 1 deletion examples/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "1"
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["redis", "timeout"]}
apalis = { path = "../../", features = ["redis", "timeout", "limit"]}
serde = "1"
env_logger = "0.10"
tracing-subscriber = "0.3.11"
Expand Down
6 changes: 4 additions & 2 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::{

use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use apalis::{layers::limit::RateLimitLayer, redis::RedisStorage};

use email_service::{send_email, Email};
use tracing::{error, info};

async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
for index in 0..1 {
for index in 0..10 {
storage
.push(Email {
to: index.to_string(),
Expand Down Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<()> {
let worker = WorkerBuilder::new("rango-tango")
.chain(|svc| svc.timeout(Duration::from_millis(500)))
.data(Count::default())
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.with_storage(storage)
.build_fn(send_email);

Expand All @@ -71,6 +72,7 @@ async fn main() -> Result<()> {
})
.shutdown_timeout(Duration::from_millis(5000))
.run_with_signal(async {
info!("Monitor started");
tokio::signal::ctrl_c().await?;
info!("Monitor starting shutdown");
Ok(())
Expand Down
45 changes: 35 additions & 10 deletions packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,43 @@ pub trait Ack<J> {
/// Acknowledges successful processing of the given request
fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
response: AckResponse<Self::Acknowledger>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

/// ACK response
#[derive(Debug, Clone)]
pub struct AckResponse<A> {
/// The worker id
pub worker: WorkerId,
/// The acknowledger
pub acknowledger: A,
/// The stringified result
pub result: String,
}

impl<A: fmt::Display> AckResponse<A> {
/// Output a json for the response
pub fn to_json(&self) -> String {
format!(
r#"{{"worker": "{}", "acknowledger": "{}", "result": "{}"}}"#,
self.worker, self.acknowledger, self.result
)
}
}

/// A generic stream that emits (worker_id, task_id)
#[derive(Debug)]
pub struct AckStream<A>(pub Sender<(WorkerId, A)>);
pub struct AckStream<A>(pub Sender<AckResponse<A>>);

impl<J, A: Send + Clone + 'static> Ack<J> for AckStream<A> {
type Acknowledger = A;
type Error = SendError;
fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
response: AckResponse<A>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.0.send((worker_id.clone(), data.clone())).boxed()
self.0.send(response).boxed()
}
}

Expand Down Expand Up @@ -248,7 +267,7 @@ where
<SV as Service<Request<J>>>::Future: std::marker::Send + 'static,
A: Ack<J> + Send + 'static + Clone + Send + Sync,
J: 'static,
<SV as Service<Request<J>>>::Response: std::marker::Send,
<SV as Service<Request<J>>>::Response: std::marker::Send + fmt::Debug + Sync,
<A as Ack<J>>::Acknowledger: Sync + Send + Clone,
{
type Response = SV::Response;
Expand All @@ -266,12 +285,18 @@ where
let mut ack = self.ack.clone();
let worker_id = self.worker_id.clone();
let data = request.get::<<A as Ack<J>>::Acknowledger>().cloned();

let fut = self.service.call(request);
let fut_with_ack = async move {
let res = fut.await;
if let Some(data) = data {
if let Err(_e) = ack.ack(&worker_id, &data).await {
if let Some(task_id) = data {
if let Err(_e) = ack
.ack(AckResponse {
worker: worker_id,
acknowledger: task_id,
result: format!("{res:?}"),
})
.await
{
// tracing::warn!("Acknowledgement Failed: {}", e);
// try get monitor, and emit
}
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ pub mod interval {
Poll::Pending => return Poll::Pending,
};
let interval = self.interval;
let _ = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
drop(fut);
Poll::Ready(Some(()))
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
/// Registers a single instance of a [Worker]
pub fn register<
J: Send + Sync + 'static,
S: Service<Request<J>> + Send + 'static + Clone,
S: Service<Request<J>> + Send + 'static,
P: Backend<Request<J>> + 'static,
>(
mut self,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
/// The monitor instance, with all workers added to the collection.
pub fn register_with_count<
J: Send + Sync + 'static,
S: Service<Request<J>> + Send + 'static + Clone,
S: Service<Request<J>> + Send + 'static,
P: Backend<Request<J>> + 'static,
>(
mut self,
Expand Down
68 changes: 68 additions & 0 deletions packages/apalis-core/src/worker/buffer/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! Error types for the `Buffer` middleware.
use std::{fmt, sync::Arc};
use tower::BoxError;

/// An error produced by a [`Service`] wrapped by a [`Buffer`]
///
/// [`Service`]: crate::Service
/// [`Buffer`]: crate::buffer::Buffer
#[derive(Debug)]
pub(crate) struct ServiceError {
inner: Arc<BoxError>,
}

/// An error produced when the a buffer's worker closes unexpectedly.
pub(crate) struct Closed {
_p: (),
}

// ===== impl ServiceError =====

impl ServiceError {
pub(crate) fn new(inner: BoxError) -> ServiceError {
let inner = Arc::new(inner);
ServiceError { inner }
}

// Private to avoid exposing `Clone` trait as part of the public API
pub(crate) fn clone(&self) -> ServiceError {
ServiceError {
inner: self.inner.clone(),
}
}
}

impl fmt::Display for ServiceError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "buffered service failed: {}", self.inner)
}
}

impl std::error::Error for ServiceError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&**self.inner)
}
}

// ===== impl Closed =====

impl Closed {
pub(crate) fn new() -> Self {
Closed { _p: () }
}
}

impl fmt::Debug for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("Closed").finish()
}
}

impl fmt::Display for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.write_str("buffer's worker closed unexpectedly")
}
}

impl std::error::Error for Closed {}
79 changes: 79 additions & 0 deletions packages/apalis-core/src/worker/buffer/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Future types for the [`Buffer`] middleware.
//!
//! [`Buffer`]: crate::buffer::Buffer
use super::{error::Closed, message};
use futures::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pin_project! {
/// Future that completes when the buffered service eventually services the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}
}

pin_project! {
#[project = ResponseStateProj]
#[derive(Debug)]
enum ResponseState<T> {
Failed {
error: Option<tower::BoxError>,
},
Rx {
#[pin]
rx: message::Rx<T>,
},
Poll {
#[pin]
fut: T,
},
}
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx { rx },
}
}

pub(crate) fn failed(err: tower::BoxError) -> Self {
ResponseFuture {
state: ResponseState::Failed { error: Some(err) },
}
}
}

impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<tower::BoxError>,
{
type Output = Result<T, tower::BoxError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed { error } => {
return Poll::Ready(Err(error.take().expect("polled after error")));
}
ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
Ok(Err(e)) => return Poll::Ready(Err(e.into())),
Err(_) => return Poll::Ready(Err(Closed::new().into())),
},
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
}
}
}
}
16 changes: 16 additions & 0 deletions packages/apalis-core/src/worker/buffer/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use futures::channel::oneshot;

use super::error::ServiceError;

/// Message sent over buffer
#[derive(Debug)]
pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,
pub(crate) tx: Tx<Fut>,
}

/// Response sender
pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>;

/// Response receiver
pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>;
5 changes: 5 additions & 0 deletions packages/apalis-core/src/worker/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub(crate) mod error;
pub(crate) mod future;
pub(crate) mod message;
pub(crate) mod service;
pub(crate) mod worker;
Loading

0 comments on commit 5861e84

Please sign in to comment.