From e1d7e6bbc3a56e05db2a4c95958c717b92900a72 Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi <95377562+geofmureithi@users.noreply.github.com> Date: Fri, 4 Oct 2024 08:09:32 +0300 Subject: [PATCH] fix:[bug] include backend provided layer in service layers. (#426) * fix:[bug] include backend provided layer in service layers. Problem: The current worker logic is missing an implementation where the backend provided layer should be added to the service's layer. This is a critical issue that affects all v0.6.0-rc-7 users and they should update as soon as a new release is done. Solution: - Add backend layers to service's layer. - Add worker_consume tests on the storages to prevent regression on this. * chore: comment an enforcement rule not yet followed by redis --- packages/apalis-core/src/lib.rs | 3 ++ packages/apalis-core/src/worker/mod.rs | 7 ++-- packages/apalis-redis/src/storage.rs | 4 +- packages/apalis-sql/src/lib.rs | 54 +++++++++++++++++++++++++- packages/apalis-sql/src/postgres.rs | 3 +- src/layers/catch_panic/mod.rs | 6 +++ src/layers/prometheus/mod.rs | 3 +- 7 files changed, 72 insertions(+), 8 deletions(-) diff --git a/packages/apalis-core/src/lib.rs b/packages/apalis-core/src/lib.rs index 6f01618..42dbe88 100644 --- a/packages/apalis-core/src/lib.rs +++ b/packages/apalis-core/src/lib.rs @@ -398,6 +398,9 @@ pub mod test_utils { assert_eq!(res, 1); // A job exists let res = t.execute_next().await; assert_eq!(res.1, Ok("1".to_owned())); + // TODO: all storages need to satisfy this rule, redis does not + // let res = t.len().await.unwrap(); + // assert_eq!(res, 0); t.vacuum().await.unwrap(); } }; diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index 99a1bb3..d0908c2 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -238,11 +238,12 @@ impl Worker> { Ctx: Send + 'static + Sync, { let notifier = Notify::new(); - let service = self.state.service; - - let (service, poll_worker) = Buffer::pair(service, instances); let backend = self.state.backend; + let service = self.state.service; let poller = backend.poll::(self.id.clone()); + let layer = poller.layer; + let service = ServiceBuilder::new().layer(layer).service(service); + let (service, poll_worker) = Buffer::pair(service, instances); let polling = poller.heartbeat.shared(); let worker_stream = WorkerStream::new(poller.stream, notifier.clone()) .into_future() diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index ec595f0..729d62e 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -193,7 +193,7 @@ impl Config { /// set the namespace for the Storage pub fn set_namespace(mut self, namespace: &str) -> Self { - self.namespace = namespace.to_owned(); + self.namespace = namespace.to_string(); self } @@ -308,7 +308,7 @@ impl Clone for RedisStorage { scripts: self.scripts.clone(), controller: self.controller.clone(), config: self.config.clone(), - codec: self.codec.clone(), + codec: self.codec, } } } diff --git a/packages/apalis-sql/src/lib.rs b/packages/apalis-sql/src/lib.rs index 012dfcd..9f6b475 100644 --- a/packages/apalis-sql/src/lib.rs +++ b/packages/apalis-sql/src/lib.rs @@ -90,7 +90,7 @@ impl Config { /// /// Defaults to "apalis::sql" pub fn set_namespace(mut self, namespace: &str) -> Self { - self.namespace = namespace.to_owned(); + self.namespace = namespace.to_string(); self } @@ -223,5 +223,57 @@ macro_rules! sql_storage_tests { "{\"Err\":\"FailedError: Missing separator character '@'.\"}" ); } + + #[tokio::test] + async fn worker_consume() { + use apalis_core::builder::WorkerBuilder; + use apalis_core::builder::WorkerFactoryFn; + use apalis_core::executor::Executor; + use std::future::Future; + + #[derive(Debug, Clone)] + struct TokioTestExecutor; + + impl Executor for TokioTestExecutor { + fn spawn(&self, future: impl Future + Send + 'static) { + tokio::spawn(future); + } + } + + let storage = $setup().await; + let mut handle = storage.clone(); + + let parts = handle + .push(email_service::example_good_email()) + .await + .unwrap(); + + async fn task(_job: Email) -> &'static str { + tokio::time::sleep(Duration::from_millis(100)).await; + "Job well done" + } + let worker = WorkerBuilder::new("rango-tango").backend(storage); + let worker = worker.build_fn(task); + let worker = worker.with_executor(TokioTestExecutor); + let w = worker.clone(); + + let runner = async move { + apalis_core::sleep(Duration::from_secs(3)).await; + let job_id = &parts.task_id; + let job = get_job(&mut handle, job_id).await; + let ctx = job.parts.context; + + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + assert!(ctx.lock_by().is_some()); + assert!(ctx.lock_at().is_some()); + assert!(ctx.last_error().is_some()); // TODO: rename last_error to last_result + + w.stop(); + }; + + let wkr = worker.run(); + tokio::join!(runner, wkr); + } }; } diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index 72cb1ab..067b3e3 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -182,7 +182,7 @@ where ids = ack_stream.next() => { if let Some(ids) = ids { let ack_ids: Vec<(String, String, String, String, u64)> = ids.iter().map(|(ctx, res)| { - (res.task_id.to_string(), ctx.lock_by().clone().unwrap().to_string(), serde_json::to_string(&res.inner.as_ref().map_err(|e| e.to_string())).unwrap(), calculate_status(&res.inner).to_string(), (res.attempt.current() + 1) as u64 ) + (res.task_id.to_string(), ctx.lock_by().clone().unwrap().to_string(), serde_json::to_string(&res.inner.as_ref().map_err(|e| e.to_string())).expect("Could not convert response to json"), calculate_status(&res.inner).to_string(), (res.attempt.current() + 1) as u64 ) }).collect(); let query = "UPDATE apalis.jobs @@ -567,6 +567,7 @@ where .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::Interrupted, e))) .unwrap() }); + self.ack_notify .notify((ctx.clone(), res)) .map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::Interrupted, e)))?; diff --git a/src/layers/catch_panic/mod.rs b/src/layers/catch_panic/mod.rs index 2f5bdac..ba869d1 100644 --- a/src/layers/catch_panic/mod.rs +++ b/src/layers/catch_panic/mod.rs @@ -26,6 +26,12 @@ impl CatchPanicLayer) -> Error> { } } +impl Default for CatchPanicLayer) -> Error> { + fn default() -> Self { + Self::new() + } +} + impl CatchPanicLayer where F: FnMut(Box) -> Error + Clone, diff --git a/src/layers/prometheus/mod.rs b/src/layers/prometheus/mod.rs index 99507b5..a0d5008 100644 --- a/src/layers/prometheus/mod.rs +++ b/src/layers/prometheus/mod.rs @@ -11,7 +11,8 @@ use tower::{Layer, Service}; /// A layer to support prometheus metrics #[derive(Debug, Default)] -pub struct PrometheusLayer; +#[non_exhaustive] +pub struct PrometheusLayer {} impl Layer for PrometheusLayer { type Service = PrometheusService;