From 5861e8467310b1ede31f6fb7553c78fb4fa4e0f4 Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi <95377562+geofmureithi@users.noreply.github.com> Date: Thu, 4 Jul 2024 21:09:38 +0300 Subject: [PATCH] Remove `Clone` constraints and buffer the service (#348) * feat: remove the `Clone` requirements for services * test save * fix: get buffered layer working * update: remove clone & update api * fix: tests and api * lint: clippy fixes * lint: cargo fmt --- examples/redis/Cargo.toml | 2 +- examples/redis/src/main.rs | 6 +- packages/apalis-core/src/layers.rs | 45 ++++- packages/apalis-core/src/lib.rs | 3 +- packages/apalis-core/src/monitor/mod.rs | 4 +- .../apalis-core/src/worker/buffer/error.rs | 68 +++++++ .../apalis-core/src/worker/buffer/future.rs | 79 ++++++++ .../apalis-core/src/worker/buffer/message.rs | 16 ++ packages/apalis-core/src/worker/buffer/mod.rs | 5 + .../apalis-core/src/worker/buffer/service.rs | 146 ++++++++++++++ .../apalis-core/src/worker/buffer/worker.rs | 184 ++++++++++++++++++ packages/apalis-core/src/worker/mod.rs | 26 ++- packages/apalis-redis/lua/ack_job.lua | 2 +- packages/apalis-redis/src/storage.rs | 31 +-- packages/apalis-sql/src/mysql.rs | 16 +- packages/apalis-sql/src/postgres.rs | 27 ++- packages/apalis-sql/src/sqlite.rs | 24 +-- 17 files changed, 613 insertions(+), 71 deletions(-) create mode 100644 packages/apalis-core/src/worker/buffer/error.rs create mode 100644 packages/apalis-core/src/worker/buffer/future.rs create mode 100644 packages/apalis-core/src/worker/buffer/message.rs create mode 100644 packages/apalis-core/src/worker/buffer/mod.rs create mode 100644 packages/apalis-core/src/worker/buffer/service.rs create mode 100644 packages/apalis-core/src/worker/buffer/worker.rs diff --git a/examples/redis/Cargo.toml b/examples/redis/Cargo.toml index 2f8a9be..27ff00f 100644 --- a/examples/redis/Cargo.toml +++ b/examples/redis/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" [dependencies] anyhow = "1" tokio = { version = "1", features = ["full"] } -apalis = { path = "../../", features = ["redis", "timeout"]} +apalis = { path = "../../", features = ["redis", "timeout", "limit"]} serde = "1" env_logger = "0.10" tracing-subscriber = "0.3.11" diff --git a/examples/redis/src/main.rs b/examples/redis/src/main.rs index 87a35ea..35b498d 100644 --- a/examples/redis/src/main.rs +++ b/examples/redis/src/main.rs @@ -6,13 +6,13 @@ use std::{ use anyhow::Result; use apalis::prelude::*; -use apalis::redis::RedisStorage; +use apalis::{layers::limit::RateLimitLayer, redis::RedisStorage}; use email_service::{send_email, Email}; use tracing::{error, info}; async fn produce_jobs(mut storage: RedisStorage) -> Result<()> { - for index in 0..1 { + for index in 0..10 { storage .push(Email { to: index.to_string(), @@ -48,6 +48,7 @@ async fn main() -> Result<()> { let worker = WorkerBuilder::new("rango-tango") .chain(|svc| svc.timeout(Duration::from_millis(500))) .data(Count::default()) + .layer(RateLimitLayer::new(5, Duration::from_secs(1))) .with_storage(storage) .build_fn(send_email); @@ -71,6 +72,7 @@ async fn main() -> Result<()> { }) .shutdown_timeout(Duration::from_millis(5000)) .run_with_signal(async { + info!("Monitor started"); tokio::signal::ctrl_c().await?; info!("Monitor starting shutdown"); Ok(()) diff --git a/packages/apalis-core/src/layers.rs b/packages/apalis-core/src/layers.rs index 16355bf..d561c68 100644 --- a/packages/apalis-core/src/layers.rs +++ b/packages/apalis-core/src/layers.rs @@ -162,24 +162,43 @@ pub trait Ack { /// Acknowledges successful processing of the given request fn ack( &mut self, - worker_id: &WorkerId, - data: &Self::Acknowledger, + response: AckResponse, ) -> impl Future> + Send; } +/// ACK response +#[derive(Debug, Clone)] +pub struct AckResponse { + /// The worker id + pub worker: WorkerId, + /// The acknowledger + pub acknowledger: A, + /// The stringified result + pub result: String, +} + +impl AckResponse { + /// Output a json for the response + pub fn to_json(&self) -> String { + format!( + r#"{{"worker": "{}", "acknowledger": "{}", "result": "{}"}}"#, + self.worker, self.acknowledger, self.result + ) + } +} + /// A generic stream that emits (worker_id, task_id) #[derive(Debug)] -pub struct AckStream(pub Sender<(WorkerId, A)>); +pub struct AckStream(pub Sender>); impl Ack for AckStream { type Acknowledger = A; type Error = SendError; fn ack( &mut self, - worker_id: &WorkerId, - data: &Self::Acknowledger, + response: AckResponse, ) -> impl Future> + Send { - self.0.send((worker_id.clone(), data.clone())).boxed() + self.0.send(response).boxed() } } @@ -248,7 +267,7 @@ where >>::Future: std::marker::Send + 'static, A: Ack + Send + 'static + Clone + Send + Sync, J: 'static, - >>::Response: std::marker::Send, + >>::Response: std::marker::Send + fmt::Debug + Sync, >::Acknowledger: Sync + Send + Clone, { type Response = SV::Response; @@ -266,12 +285,18 @@ where let mut ack = self.ack.clone(); let worker_id = self.worker_id.clone(); let data = request.get::<>::Acknowledger>().cloned(); - let fut = self.service.call(request); let fut_with_ack = async move { let res = fut.await; - if let Some(data) = data { - if let Err(_e) = ack.ack(&worker_id, &data).await { + if let Some(task_id) = data { + if let Err(_e) = ack + .ack(AckResponse { + worker: worker_id, + acknowledger: task_id, + result: format!("{res:?}"), + }) + .await + { // tracing::warn!("Acknowledgement Failed: {}", e); // try get monitor, and emit } diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index 8fb411d..5511cee 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -145,7 +145,8 @@ pub mod interval { Poll::Pending => return Poll::Pending, }; let interval = self.interval; - let _ = std::mem::replace(&mut self.timer, Box::pin(sleep(interval))); + let fut = std::mem::replace(&mut self.timer, Box::pin(sleep(interval))); + drop(fut); Poll::Ready(Some(())) } } diff --git a/packages/apalis-core/src/monitor/mod.rs b/packages/apalis-core/src/monitor/mod.rs index 393f58f..efec9eb 100644 --- a/packages/apalis-core/src/monitor/mod.rs +++ b/packages/apalis-core/src/monitor/mod.rs @@ -80,7 +80,7 @@ impl Monitor { /// Registers a single instance of a [Worker] pub fn register< J: Send + Sync + 'static, - S: Service> + Send + 'static + Clone, + S: Service> + Send + 'static, P: Backend> + 'static, >( mut self, @@ -109,7 +109,7 @@ impl Monitor { /// The monitor instance, with all workers added to the collection. pub fn register_with_count< J: Send + Sync + 'static, - S: Service> + Send + 'static + Clone, + S: Service> + Send + 'static, P: Backend> + 'static, >( mut self, diff --git a/packages/apalis-core/src/worker/buffer/error.rs b/packages/apalis-core/src/worker/buffer/error.rs new file mode 100644 index 0000000..a1da124 --- /dev/null +++ b/packages/apalis-core/src/worker/buffer/error.rs @@ -0,0 +1,68 @@ +//! Error types for the `Buffer` middleware. + +use std::{fmt, sync::Arc}; +use tower::BoxError; + +/// An error produced by a [`Service`] wrapped by a [`Buffer`] +/// +/// [`Service`]: crate::Service +/// [`Buffer`]: crate::buffer::Buffer +#[derive(Debug)] +pub(crate) struct ServiceError { + inner: Arc, +} + +/// An error produced when the a buffer's worker closes unexpectedly. +pub(crate) struct Closed { + _p: (), +} + +// ===== impl ServiceError ===== + +impl ServiceError { + pub(crate) fn new(inner: BoxError) -> ServiceError { + let inner = Arc::new(inner); + ServiceError { inner } + } + + // Private to avoid exposing `Clone` trait as part of the public API + pub(crate) fn clone(&self) -> ServiceError { + ServiceError { + inner: self.inner.clone(), + } + } +} + +impl fmt::Display for ServiceError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "buffered service failed: {}", self.inner) + } +} + +impl std::error::Error for ServiceError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&**self.inner) + } +} + +// ===== impl Closed ===== + +impl Closed { + pub(crate) fn new() -> Self { + Closed { _p: () } + } +} + +impl fmt::Debug for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_tuple("Closed").finish() + } +} + +impl fmt::Display for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.write_str("buffer's worker closed unexpectedly") + } +} + +impl std::error::Error for Closed {} diff --git a/packages/apalis-core/src/worker/buffer/future.rs b/packages/apalis-core/src/worker/buffer/future.rs new file mode 100644 index 0000000..8cf3bae --- /dev/null +++ b/packages/apalis-core/src/worker/buffer/future.rs @@ -0,0 +1,79 @@ +//! Future types for the [`Buffer`] middleware. +//! +//! [`Buffer`]: crate::buffer::Buffer + +use super::{error::Closed, message}; +use futures::ready; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Future that completes when the buffered service eventually services the submitted request. + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + state: ResponseState, + } +} + +pin_project! { + #[project = ResponseStateProj] + #[derive(Debug)] + enum ResponseState { + Failed { + error: Option, + }, + Rx { + #[pin] + rx: message::Rx, + }, + Poll { + #[pin] + fut: T, + }, + } +} + +impl ResponseFuture { + pub(crate) fn new(rx: message::Rx) -> Self { + ResponseFuture { + state: ResponseState::Rx { rx }, + } + } + + pub(crate) fn failed(err: tower::BoxError) -> Self { + ResponseFuture { + state: ResponseState::Failed { error: Some(err) }, + } + } +} + +impl Future for ResponseFuture +where + F: Future>, + E: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + match this.state.as_mut().project() { + ResponseStateProj::Failed { error } => { + return Poll::Ready(Err(error.take().expect("polled after error"))); + } + ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { + Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), + Ok(Err(e)) => return Poll::Ready(Err(e.into())), + Err(_) => return Poll::Ready(Err(Closed::new().into())), + }, + ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), + } + } + } +} diff --git a/packages/apalis-core/src/worker/buffer/message.rs b/packages/apalis-core/src/worker/buffer/message.rs new file mode 100644 index 0000000..02863a2 --- /dev/null +++ b/packages/apalis-core/src/worker/buffer/message.rs @@ -0,0 +1,16 @@ +use futures::channel::oneshot; + +use super::error::ServiceError; + +/// Message sent over buffer +#[derive(Debug)] +pub(crate) struct Message { + pub(crate) request: Request, + pub(crate) tx: Tx, +} + +/// Response sender +pub(crate) type Tx = oneshot::Sender>; + +/// Response receiver +pub(crate) type Rx = oneshot::Receiver>; diff --git a/packages/apalis-core/src/worker/buffer/mod.rs b/packages/apalis-core/src/worker/buffer/mod.rs new file mode 100644 index 0000000..c341f07 --- /dev/null +++ b/packages/apalis-core/src/worker/buffer/mod.rs @@ -0,0 +1,5 @@ +pub(crate) mod error; +pub(crate) mod future; +pub(crate) mod message; +pub(crate) mod service; +pub(crate) mod worker; diff --git a/packages/apalis-core/src/worker/buffer/service.rs b/packages/apalis-core/src/worker/buffer/service.rs new file mode 100644 index 0000000..a176764 --- /dev/null +++ b/packages/apalis-core/src/worker/buffer/service.rs @@ -0,0 +1,146 @@ +use super::{ + future::ResponseFuture, + message::Message, + worker::{Handle, Worker}, +}; + +use futures::channel::{mpsc, oneshot}; +use futures::task::AtomicWaker; +use std::sync::Arc; +use std::{ + future::Future, + task::{Context, Poll}, +}; +use tower::Service; + +/// Adds an mpsc buffer in front of an inner service. +/// +/// See the module documentation for more details. +#[derive(Debug)] +pub struct Buffer { + tx: PollSender>, + handle: Handle, +} + +impl Buffer +where + F: 'static, +{ + /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker. + /// + /// This is useful if you do not want to spawn directly onto the runtime + /// but instead want to use your own executor. This will return the [`Buffer`] and + /// the background `Worker` that you can then spawn. + pub fn pair(service: S, bound: usize) -> (Self, Worker) + where + S: Service + Send + 'static, + F: Send, + S::Error: Into + Send + Sync, + Req: Send + 'static, + { + let (tx, rx) = mpsc::channel(bound); + let (handle, worker) = Worker::new(service, rx); + let buffer = Self { + tx: PollSender::new(tx), + handle, + }; + (buffer, worker) + } + + fn get_worker_error(&self) -> tower::BoxError { + self.handle.get_error_on_closed() + } +} + +impl Service for Buffer +where + F: Future> + Send + 'static, + E: Into, + Req: Send + 'static, +{ + type Response = Rsp; + type Error = tower::BoxError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // First, check if the worker is still alive. + if self.tx.is_closed() { + // If the inner service has errored, then we error here. + return Poll::Ready(Err(self.get_worker_error())); + } + + // Poll the sender to acquire a permit. + self.tx + .poll_reserve(cx) + .map_err(|_| self.get_worker_error()) + } + + fn call(&mut self, request: Req) -> Self::Future { + let (tx, rx) = oneshot::channel(); + match self.tx.send_item(Message { request, tx }) { + Ok(_) => ResponseFuture::new(rx), + Err(_) => ResponseFuture::failed(self.get_worker_error()), + } + } +} + +impl Clone for Buffer +where + Req: Send + 'static, + F: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + handle: self.handle.clone(), + tx: self.tx.clone(), + } + } +} + +// PollSender implementation using futures and async-channel +#[derive(Debug)] +struct PollSender { + tx: mpsc::Sender, + waker: Arc, +} + +impl PollSender { + fn new(tx: mpsc::Sender) -> Self { + Self { + tx, + waker: Arc::new(AtomicWaker::new()), + } + } + + fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.tx.is_closed() { + return Poll::Ready(Err(())); + } + + self.waker.register(cx.waker()); + + self.tx.poll_ready(cx).map(|res| match res { + Ok(_) => Ok(()), + Err(_) => Err(()), + }) + } + + fn send_item(&mut self, item: T) -> Result<(), ()> { + if self.tx.is_closed() { + return Err(()); + } + + self.tx.try_send(item).map_err(|_| ()) + } + + fn is_closed(&self) -> bool { + self.tx.is_closed() + } + + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + waker: self.waker.clone(), + } + } +} diff --git a/packages/apalis-core/src/worker/buffer/worker.rs b/packages/apalis-core/src/worker/buffer/worker.rs new file mode 100644 index 0000000..1ace6f2 --- /dev/null +++ b/packages/apalis-core/src/worker/buffer/worker.rs @@ -0,0 +1,184 @@ +use super::{ + error::{Closed, ServiceError}, + message::Message, +}; +use futures::{channel::mpsc, ready, Stream}; +use std::sync::{Arc, Mutex}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use tower::Service; + +pin_project_lite::pin_project! { + #[derive(Debug)] + pub struct Worker + where + T: Service, + { + current_message: Option>, + rx: mpsc::Receiver>, + service: T, + finish: bool, + failed: Option, + handle: Handle, + } +} + +/// Get the error out +#[derive(Debug)] +pub(crate) struct Handle { + inner: Arc>>, +} + +impl Worker +where + T: Service, + T::Error: Into, +{ + pub(crate) fn new( + service: T, + rx: mpsc::Receiver>, + ) -> (Handle, Worker) { + let handle = Handle { + inner: Arc::new(Mutex::new(None)), + }; + + let worker = Worker { + current_message: None, + finish: false, + failed: None, + rx, + service, + handle: handle.clone(), + }; + + (handle, worker) + } + + /// Return the next queued Message that hasn't been canceled. + /// + /// If a `Message` is returned, the `bool` is true if this is the first time we received this + /// message, and false otherwise (i.e., we tried to forward it to the backing service before). + #[allow(clippy::type_complexity)] + fn poll_next_msg( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, bool)>> { + if self.finish { + // We've already received None and are shutting down + return Poll::Ready(None); + } + + // tracing::trace!("worker polling for next message"); + if let Some(msg) = self.current_message.take() { + // If the oneshot sender is closed, then the receiver is dropped, + // and nobody cares about the response. If this is the case, we + // should continue to the next request. + if !msg.tx.is_canceled() { + // tracing::trace!("resuming buffered request"); + return Poll::Ready(Some((msg, false))); + } + + // tracing::trace!("dropping cancelled buffered request"); + } + + // Get the next request + while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_next(cx)) { + if !msg.tx.is_canceled() { + // tracing::trace!("processing new request"); + return Poll::Ready(Some((msg, true))); + } + // Otherwise, request is canceled, so pop the next one. + // tracing::trace!("dropping cancelled request"); + } + + Poll::Ready(None) + } + + fn failed(&mut self, error: tower::BoxError) { + let error = ServiceError::new(error); + + let mut inner = self.handle.inner.lock().unwrap(); + + if inner.is_some() { + return; + } + + *inner = Some(error.clone()); + drop(inner); + + self.rx.close(); + self.failed = Some(error); + } +} + +impl Future for Worker +where + T: Service, + T::Error: Into, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.finish { + return Poll::Ready(()); + } + + loop { + match ready!(self.poll_next_msg(cx)) { + Some((msg, _)) => { + if let Some(ref failed) = self.failed { + let _ = msg.tx.send(Err(failed.clone())); + continue; + } + match self.service.poll_ready(cx) { + Poll::Ready(Ok(())) => { + let response = self.service.call(msg.request); + let _ = msg.tx.send(Ok(response)); + } + Poll::Pending => { + self.current_message = Some(msg); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + let error = e.into(); + self.failed(error); + let _ = msg.tx.send(Err(self + .failed + .as_ref() + .expect("Worker::failed did not set self.failed?") + .clone())); + } + } + } + None => { + // No more more requests _ever_. + self.finish = true; + return Poll::Ready(()); + } + } + } + } +} + +impl Handle { + pub(crate) fn get_error_on_closed(&self) -> tower::BoxError { + self.inner + .lock() + .unwrap() + .as_ref() + .map(|svc_err| svc_err.clone().into()) + .unwrap_or_else(|| Closed::new().into()) + } +} + +impl Clone for Handle { + fn clone(&self) -> Handle { + Handle { + inner: self.inner.clone(), + } + } +} diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index c92bef5..82539c8 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -23,7 +23,11 @@ use std::task::{Context as TaskCtx, Poll, Waker}; use thiserror::Error; use tower::{Service, ServiceBuilder, ServiceExt}; +mod buffer; mod stream; + +pub use buffer::service::Buffer; + // By default a worker starts 3 futures, one for polling, one for worker stream and the other for consuming. const WORKER_FUTURES: usize = 3; @@ -217,7 +221,7 @@ impl Worker> { /// Start a worker with a custom executor pub fn with_executor(self, executor: E) -> Worker> where - S: Service> + Send + 'static + Clone, + S: Service> + Send + 'static, P: Backend> + 'static, J: Send + 'static + Sync, S::Future: Send, @@ -237,7 +241,7 @@ impl Worker> { .shared(); Self::build_worker_instance( WorkerId::new(self.id.name()), - service.clone(), + service, executor.clone(), notifier.clone(), polling.clone(), @@ -249,7 +253,7 @@ impl Worker> { /// Run as a monitored worker pub fn with_monitor(self, monitor: &Monitor) -> Worker> where - S: Service> + Send + 'static + Clone, + S: Service> + Send + 'static, P: Backend> + 'static, J: Send + 'static + Sync, S::Future: Send, @@ -270,7 +274,7 @@ impl Worker> { .shared(); Self::build_worker_instance( WorkerId::new(self.id.name()), - service.clone(), + service, executor.clone(), notifier.clone(), polling.clone(), @@ -286,7 +290,7 @@ impl Worker> { monitor: &Monitor, ) -> Vec>> where - S: Service> + Send + 'static + Clone, + S: Service> + Send + 'static, P: Backend> + 'static, J: Send + 'static + Sync, S::Future: Send, @@ -297,6 +301,7 @@ impl Worker> { { let notifier = Notify::new(); let service = self.state.service; + let (service, poll_worker) = Buffer::pair(service, instances); let backend = self.state.backend; let executor = monitor.executor().clone(); let context = monitor.context().clone(); @@ -307,6 +312,8 @@ impl Worker> { .shared(); let mut workers = Vec::new(); + executor.spawn(poll_worker); + for instance in 0..instances { workers.push(Self::build_worker_instance( WorkerId::new_with_instance(self.id.name(), instance), @@ -329,7 +336,7 @@ impl Worker> { executor: E, ) -> Vec>> where - S: Service> + Send + 'static + Clone, + S: Service> + Send + 'static, P: Backend> + 'static, J: Send + 'static + Sync, S::Future: Send, @@ -342,13 +349,14 @@ impl Worker> { let worker_id = self.id.clone(); let notifier = Notify::new(); let service = self.state.service; + let (service, poll_worker) = Buffer::pair(service, instances); let backend = self.state.backend; let poller = backend.poll(worker_id.clone()); let polling = poller.heartbeat.shared(); let worker_stream = WorkerStream::new(poller.stream, notifier.clone()) .into_future() .shared(); - + executor.spawn(poll_worker); let mut workers = Vec::new(); for instance in 0..instances { workers.push(Self::build_worker_instance( @@ -374,7 +382,7 @@ impl Worker> { context: Option, ) -> Worker> where - LS: Service> + Send + 'static + Clone, + LS: Service> + Send + 'static, LS::Future: Send + 'static, LS::Response: 'static, LS::Error: Send + Sync + Into + 'static, @@ -409,7 +417,7 @@ impl Worker> { worker: Worker>, notifier: WorkerNotify>, Error>>, ) where - LS: Service> + Send + 'static + Clone, + LS: Service> + Send + 'static, LS::Future: Send + 'static, LS::Response: 'static, LS::Error: Send + Sync + Into + 'static, diff --git a/packages/apalis-redis/lua/ack_job.lua b/packages/apalis-redis/lua/ack_job.lua index d451bae..a51b6a6 100644 --- a/packages/apalis-redis/lua/ack_job.lua +++ b/packages/apalis-redis/lua/ack_job.lua @@ -4,7 +4,7 @@ -- ARGV[1]: the job ID -- ARGV[2]: the current time --- Returns: nil +-- Returns: bool -- Remove the job from this consumer's inflight set local removed = redis.call("srem", KEYS[1], ARGV[1]) diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index 700b463..c5f85a3 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -1,6 +1,6 @@ use apalis_core::codec::json::JsonCodec; use apalis_core::data::Extensions; -use apalis_core::layers::{AckLayer, AckStream}; +use apalis_core::layers::{Ack, AckLayer, AckResponse, AckStream}; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; use apalis_core::poller::Poller; @@ -19,11 +19,11 @@ use redis::aio::ConnectionLike; use redis::ErrorKind; use redis::{aio::ConnectionManager, Client, IntoConnectionInfo, RedisError, Script, Value}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; - use std::any::type_name; +use std::fmt::{self, Debug}; +use std::io; use std::num::TryFromIntError; use std::sync::Arc; -use std::{fmt, io}; use std::{marker::PhantomData, time::Duration}; /// Shorthand to create a client and connect @@ -452,8 +452,8 @@ impl< } } id_to_ack = ack_stream.next() => { - if let Some((worker_id, task_id)) = id_to_ack { - self.ack(&worker_id, &task_id).await.unwrap(); + if let Some(res) = id_to_ack { + self.ack(res).await.unwrap(); } } }; @@ -467,19 +467,22 @@ impl< } } -impl RedisStorage { - /// Ack a job - pub async fn ack(&mut self, worker_id: &WorkerId, task_id: &TaskId) -> Result<(), RedisError> { +impl Ack + for RedisStorage +{ + type Acknowledger = TaskId; + type Error = RedisError; + async fn ack(&mut self, res: AckResponse) -> Result<(), RedisError> { let ack_job = self.scripts.ack_job.clone(); - let inflight_set = format!("{}:{}", self.config.inflight_jobs_set(), worker_id); + let inflight_set = format!("{}:{}", self.config.inflight_jobs_set(), res.worker); let done_jobs_set = &self.config.done_jobs_set(); - let now: i64 = Utc::now().timestamp(); + let now: i64 = res.acknowledger.inner().timestamp_ms().try_into().unwrap(); ack_job .key(inflight_set) .key(done_jobs_set) - .arg(task_id.to_string()) + .arg(res.acknowledger.to_string()) .arg(now) .invoke_async(&mut self.conn) .await @@ -984,7 +987,11 @@ mod tests { let job_id = &job.get::().unwrap().id; storage - .ack(&worker_id, &job_id) + .ack(AckResponse { + acknowledger: job_id.clone(), + result: "Success".to_string(), + worker: worker_id.clone(), + }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index be54ed6..2abaa17 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -1,6 +1,6 @@ use apalis_core::codec::json::JsonCodec; use apalis_core::error::Error; -use apalis_core::layers::{Ack, AckLayer}; +use apalis_core::layers::{Ack, AckLayer, AckResponse}; use apalis_core::notify::Notify; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; @@ -420,13 +420,9 @@ impl Backend Ack for MysqlStorage { type Acknowledger = TaskId; type Error = sqlx::Error; - async fn ack( - &mut self, - worker_id: &WorkerId, - task_id: &Self::Acknowledger, - ) -> Result<(), sqlx::Error> { + async fn ack(&mut self, response: AckResponse) -> Result<(), sqlx::Error> { self.ack_notify - .notify((worker_id.clone(), task_id.clone())) + .notify((response.worker.clone(), response.acknowledger.clone())) .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::BrokenPipe, e)))?; Ok(()) @@ -631,7 +627,11 @@ mod tests { let job_id = ctx.id(); storage - .ack(&worker_id, job_id) + .ack(AckResponse { + acknowledger: job_id.clone(), + result: "Success".to_string(), + worker: worker_id.clone(), + }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index 77cfe3f..c8f258c 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -42,7 +42,7 @@ use crate::context::SqlContext; use crate::Config; use apalis_core::codec::json::JsonCodec; use apalis_core::error::Error; -use apalis_core::layers::{Ack, AckLayer}; +use apalis_core::layers::{Ack, AckLayer, AckResponse}; use apalis_core::notify::Notify; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; @@ -63,6 +63,7 @@ use sqlx::types::chrono::{DateTime, Utc}; use sqlx::{Pool, Postgres, Row}; use std::any::type_name; use std::convert::TryInto; +use std::fmt::Debug; use std::sync::Arc; use std::{fmt, io}; use std::{marker::PhantomData, time::Duration}; @@ -88,7 +89,7 @@ pub struct PostgresStorage { >, config: Config, controller: Controller, - ack_notify: Notify<(WorkerId, TaskId)>, + ack_notify: Notify>, subscription: Option, } @@ -137,7 +138,6 @@ impl Backend Backend { if let Some(ids) = ids { - let worker_ids: Vec = ids.iter().map(|c| c.0.to_string()).collect(); - let task_ids: Vec = ids.iter().map(|c| c.1.to_string()).collect(); + let worker_ids: Vec = ids.iter().map(|c| c.worker.to_string()).collect(); + let task_ids: Vec = ids.iter().map(|c| c.acknowledger.to_string()).collect(); let query = "UPDATE apalis.jobs SET status = 'Done', done_at = now() WHERE id = ANY($1::text[]) AND lock_by = ANY($2::text[])"; @@ -516,13 +516,9 @@ where impl Ack for PostgresStorage { type Acknowledger = TaskId; type Error = sqlx::Error; - async fn ack( - &mut self, - worker_id: &WorkerId, - task_id: &Self::Acknowledger, - ) -> Result<(), sqlx::Error> { + async fn ack(&mut self, res: AckResponse) -> Result<(), sqlx::Error> { self.ack_notify - .notify((worker_id.clone(), task_id.clone())) + .notify(res) .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::Interrupted, e)))?; Ok(()) @@ -589,7 +585,6 @@ mod tests { use super::*; use email_service::Email; - use futures::StreamExt; use sqlx::types::chrono::Utc; /// migrate DB and return a storage instance. @@ -642,7 +637,7 @@ mod tests { storage: &mut PostgresStorage, worker_id: &WorkerId, ) -> Request { - let mut req = storage.fetch_next(worker_id).await; + let req = storage.fetch_next(worker_id).await; req.unwrap()[0].clone() } @@ -703,7 +698,11 @@ mod tests { let job_id = ctx.id(); storage - .ack(&worker_id, job_id) + .ack(AckResponse { + acknowledger: job_id.clone(), + result: "Success".to_string(), + worker: worker_id.clone(), + }) .await .expect("failed to acknowledge the job"); diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 652184b..3e31a64 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -3,7 +3,7 @@ use crate::Config; use apalis_core::codec::json::JsonCodec; use apalis_core::error::Error; -use apalis_core::layers::{Ack, AckLayer}; +use apalis_core::layers::{Ack, AckLayer, AckResponse}; use apalis_core::poller::controller::Controller; use apalis_core::poller::stream::BackendStream; use apalis_core::poller::Poller; @@ -477,17 +477,15 @@ impl Backend Ack for SqliteStorage { type Acknowledger = TaskId; type Error = sqlx::Error; - async fn ack( - &mut self, - worker_id: &WorkerId, - task_id: &Self::Acknowledger, - ) -> Result<(), sqlx::Error> { + async fn ack(&mut self, res: AckResponse) -> Result<(), sqlx::Error> { + let pool = self.pool.clone(); let query = - "UPDATE Jobs SET status = 'Done', done_at = strftime('%s','now') WHERE id = ?1 AND lock_by = ?2"; + "UPDATE Jobs SET status = 'Done', done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2"; sqlx::query(query) - .bind(task_id.to_string()) - .bind(worker_id.to_string()) - .execute(&self.pool) + .bind(res.acknowledger.to_string()) + .bind(res.worker.to_string()) + .bind(res.result) + .execute(&pool) .await?; Ok(()) } @@ -609,7 +607,11 @@ mod tests { let job_id = ctx.id(); storage - .ack(&worker_id, job_id) + .ack(AckResponse { + acknowledger: job_id.clone(), + result: "Success".to_string(), + worker: worker_id.clone(), + }) .await .expect("failed to acknowledge the job");