Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update clone api #348

Merged
merged 9 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "1"
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["redis", "timeout"]}
apalis = { path = "../../", features = ["redis", "timeout", "limit"]}
serde = "1"
env_logger = "0.10"
tracing-subscriber = "0.3.11"
Expand Down
6 changes: 4 additions & 2 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::{

use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use apalis::{layers::limit::RateLimitLayer, redis::RedisStorage};

use email_service::{send_email, Email};
use tracing::{error, info};

async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
for index in 0..1 {
for index in 0..10 {
storage
.push(Email {
to: index.to_string(),
Expand Down Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<()> {
let worker = WorkerBuilder::new("rango-tango")
.chain(|svc| svc.timeout(Duration::from_millis(500)))
.data(Count::default())
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.with_storage(storage)
.build_fn(send_email);

Expand All @@ -71,6 +72,7 @@ async fn main() -> Result<()> {
})
.shutdown_timeout(Duration::from_millis(5000))
.run_with_signal(async {
info!("Monitor started");
tokio::signal::ctrl_c().await?;
info!("Monitor starting shutdown");
Ok(())
Expand Down
45 changes: 35 additions & 10 deletions packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,43 @@ pub trait Ack<J> {
/// Acknowledges successful processing of the given request
fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
response: AckResponse<Self::Acknowledger>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

/// ACK response
#[derive(Debug, Clone)]
pub struct AckResponse<A> {
/// The worker id
pub worker: WorkerId,
/// The acknowledger
pub acknowledger: A,
/// The stringified result
pub result: String,
}

impl<A: fmt::Display> AckResponse<A> {
/// Output a json for the response
pub fn to_json(&self) -> String {
format!(
r#"{{"worker": "{}", "acknowledger": "{}", "result": "{}"}}"#,
self.worker, self.acknowledger, self.result
)
}
}

/// A generic stream that emits (worker_id, task_id)
#[derive(Debug)]
pub struct AckStream<A>(pub Sender<(WorkerId, A)>);
pub struct AckStream<A>(pub Sender<AckResponse<A>>);

impl<J, A: Send + Clone + 'static> Ack<J> for AckStream<A> {
type Acknowledger = A;
type Error = SendError;
fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
response: AckResponse<A>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.0.send((worker_id.clone(), data.clone())).boxed()
self.0.send(response).boxed()
}
}

Expand Down Expand Up @@ -248,7 +267,7 @@ where
<SV as Service<Request<J>>>::Future: std::marker::Send + 'static,
A: Ack<J> + Send + 'static + Clone + Send + Sync,
J: 'static,
<SV as Service<Request<J>>>::Response: std::marker::Send,
<SV as Service<Request<J>>>::Response: std::marker::Send + fmt::Debug + Sync,
<A as Ack<J>>::Acknowledger: Sync + Send + Clone,
{
type Response = SV::Response;
Expand All @@ -266,12 +285,18 @@ where
let mut ack = self.ack.clone();
let worker_id = self.worker_id.clone();
let data = request.get::<<A as Ack<J>>::Acknowledger>().cloned();

let fut = self.service.call(request);
let fut_with_ack = async move {
let res = fut.await;
if let Some(data) = data {
if let Err(_e) = ack.ack(&worker_id, &data).await {
if let Some(task_id) = data {
if let Err(_e) = ack
.ack(AckResponse {
worker: worker_id,
acknowledger: task_id,
result: format!("{res:?}"),
})
.await
{
// tracing::warn!("Acknowledgement Failed: {}", e);
// try get monitor, and emit
}
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ pub mod interval {
Poll::Pending => return Poll::Pending,
};
let interval = self.interval;
let _ = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval)));
drop(fut);
Poll::Ready(Some(()))
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
/// Registers a single instance of a [Worker]
pub fn register<
J: Send + Sync + 'static,
S: Service<Request<J>> + Send + 'static + Clone,
S: Service<Request<J>> + Send + 'static,
P: Backend<Request<J>> + 'static,
>(
mut self,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
/// The monitor instance, with all workers added to the collection.
pub fn register_with_count<
J: Send + Sync + 'static,
S: Service<Request<J>> + Send + 'static + Clone,
S: Service<Request<J>> + Send + 'static,
P: Backend<Request<J>> + 'static,
>(
mut self,
Expand Down
68 changes: 68 additions & 0 deletions packages/apalis-core/src/worker/buffer/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! Error types for the `Buffer` middleware.

use std::{fmt, sync::Arc};
use tower::BoxError;

/// An error produced by a [`Service`] wrapped by a [`Buffer`]
///
/// [`Service`]: crate::Service
/// [`Buffer`]: crate::buffer::Buffer
#[derive(Debug)]
pub(crate) struct ServiceError {
inner: Arc<BoxError>,
}

/// An error produced when the a buffer's worker closes unexpectedly.
pub(crate) struct Closed {
_p: (),
}

// ===== impl ServiceError =====

impl ServiceError {
pub(crate) fn new(inner: BoxError) -> ServiceError {
let inner = Arc::new(inner);
ServiceError { inner }
}

// Private to avoid exposing `Clone` trait as part of the public API
pub(crate) fn clone(&self) -> ServiceError {
ServiceError {
inner: self.inner.clone(),
}
}
}

impl fmt::Display for ServiceError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "buffered service failed: {}", self.inner)
}
}

impl std::error::Error for ServiceError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&**self.inner)
}
}

// ===== impl Closed =====

impl Closed {
pub(crate) fn new() -> Self {
Closed { _p: () }
}
}

impl fmt::Debug for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("Closed").finish()
}
}

impl fmt::Display for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.write_str("buffer's worker closed unexpectedly")
}
}

impl std::error::Error for Closed {}
79 changes: 79 additions & 0 deletions packages/apalis-core/src/worker/buffer/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Future types for the [`Buffer`] middleware.
//!
//! [`Buffer`]: crate::buffer::Buffer

use super::{error::Closed, message};
use futures::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pin_project! {
/// Future that completes when the buffered service eventually services the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}
}

pin_project! {
#[project = ResponseStateProj]
#[derive(Debug)]
enum ResponseState<T> {
Failed {
error: Option<tower::BoxError>,
},
Rx {
#[pin]
rx: message::Rx<T>,
},
Poll {
#[pin]
fut: T,
},
}
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx { rx },
}
}

pub(crate) fn failed(err: tower::BoxError) -> Self {
ResponseFuture {
state: ResponseState::Failed { error: Some(err) },
}
}
}

impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<tower::BoxError>,
{
type Output = Result<T, tower::BoxError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed { error } => {
return Poll::Ready(Err(error.take().expect("polled after error")));
}
ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
Ok(Err(e)) => return Poll::Ready(Err(e.into())),
Err(_) => return Poll::Ready(Err(Closed::new().into())),
},
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
}
}
}
}
16 changes: 16 additions & 0 deletions packages/apalis-core/src/worker/buffer/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use futures::channel::oneshot;

use super::error::ServiceError;

/// Message sent over buffer
#[derive(Debug)]
pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,
pub(crate) tx: Tx<Fut>,
}

/// Response sender
pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>;

/// Response receiver
pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>;
5 changes: 5 additions & 0 deletions packages/apalis-core/src/worker/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub(crate) mod error;
pub(crate) mod future;
pub(crate) mod message;
pub(crate) mod service;
pub(crate) mod worker;
Loading
Loading