Skip to content

Commit

Permalink
fix:[bug] include backend provided layer in service layers. (#426)
Browse files Browse the repository at this point in the history
* fix:[bug] include backend provided layer in service layers.

Problem:
The current worker logic is missing an implementation where the backend provided layer should be added to the service's layer.
This is a critical issue that affects all v0.6.0-rc-7 users and they should update as soon as a new release is done.

Solution:
- Add backend layers to service's layer.
- Add worker_consume tests on the storages to prevent regression on this.

* chore: comment an enforcement rule not yet followed by redis
  • Loading branch information
geofmureithi authored Oct 4, 2024
1 parent 3166d7c commit e1d7e6b
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 8 deletions.
3 changes: 3 additions & 0 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ pub mod test_utils {
assert_eq!(res, 1); // A job exists
let res = t.execute_next().await;
assert_eq!(res.1, Ok("1".to_owned()));
// TODO: all storages need to satisfy this rule, redis does not
// let res = t.len().await.unwrap();
// assert_eq!(res, 0);
t.vacuum().await.unwrap();
}
};
Expand Down
7 changes: 4 additions & 3 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,12 @@ impl<S, P> Worker<Ready<S, P>> {
Ctx: Send + 'static + Sync,
{
let notifier = Notify::new();
let service = self.state.service;

let (service, poll_worker) = Buffer::pair(service, instances);
let backend = self.state.backend;
let service = self.state.service;
let poller = backend.poll::<S>(self.id.clone());
let layer = poller.layer;
let service = ServiceBuilder::new().layer(layer).service(service);
let (service, poll_worker) = Buffer::pair(service, instances);
let polling = poller.heartbeat.shared();
let worker_stream = WorkerStream::new(poller.stream, notifier.clone())
.into_future()
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl Config {

/// set the namespace for the Storage
pub fn set_namespace(mut self, namespace: &str) -> Self {
self.namespace = namespace.to_owned();
self.namespace = namespace.to_string();
self
}

Expand Down Expand Up @@ -308,7 +308,7 @@ impl<T, Conn: Clone, C> Clone for RedisStorage<T, Conn, C> {
scripts: self.scripts.clone(),
controller: self.controller.clone(),
config: self.config.clone(),
codec: self.codec.clone(),
codec: self.codec,
}
}
}
Expand Down
54 changes: 53 additions & 1 deletion packages/apalis-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Config {
///
/// Defaults to "apalis::sql"
pub fn set_namespace(mut self, namespace: &str) -> Self {
self.namespace = namespace.to_owned();
self.namespace = namespace.to_string();
self
}

Expand Down Expand Up @@ -223,5 +223,57 @@ macro_rules! sql_storage_tests {
"{\"Err\":\"FailedError: Missing separator character '@'.\"}"
);
}

#[tokio::test]
async fn worker_consume() {
use apalis_core::builder::WorkerBuilder;
use apalis_core::builder::WorkerFactoryFn;
use apalis_core::executor::Executor;
use std::future::Future;

#[derive(Debug, Clone)]
struct TokioTestExecutor;

impl Executor for TokioTestExecutor {
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
tokio::spawn(future);
}
}

let storage = $setup().await;
let mut handle = storage.clone();

let parts = handle
.push(email_service::example_good_email())
.await
.unwrap();

async fn task(_job: Email) -> &'static str {
tokio::time::sleep(Duration::from_millis(100)).await;
"Job well done"
}
let worker = WorkerBuilder::new("rango-tango").backend(storage);
let worker = worker.build_fn(task);
let worker = worker.with_executor(TokioTestExecutor);
let w = worker.clone();

let runner = async move {
apalis_core::sleep(Duration::from_secs(3)).await;
let job_id = &parts.task_id;
let job = get_job(&mut handle, job_id).await;
let ctx = job.parts.context;

assert_eq!(*ctx.status(), State::Done);
assert!(ctx.done_at().is_some());
assert!(ctx.lock_by().is_some());
assert!(ctx.lock_at().is_some());
assert!(ctx.last_error().is_some()); // TODO: rename last_error to last_result

w.stop();
};

let wkr = worker.run();
tokio::join!(runner, wkr);
}
};
}
3 changes: 2 additions & 1 deletion packages/apalis-sql/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ where
ids = ack_stream.next() => {
if let Some(ids) = ids {
let ack_ids: Vec<(String, String, String, String, u64)> = ids.iter().map(|(ctx, res)| {
(res.task_id.to_string(), ctx.lock_by().clone().unwrap().to_string(), serde_json::to_string(&res.inner.as_ref().map_err(|e| e.to_string())).unwrap(), calculate_status(&res.inner).to_string(), (res.attempt.current() + 1) as u64 )
(res.task_id.to_string(), ctx.lock_by().clone().unwrap().to_string(), serde_json::to_string(&res.inner.as_ref().map_err(|e| e.to_string())).expect("Could not convert response to json"), calculate_status(&res.inner).to_string(), (res.attempt.current() + 1) as u64 )
}).collect();
let query =
"UPDATE apalis.jobs
Expand Down Expand Up @@ -567,6 +567,7 @@ where
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::Interrupted, e)))
.unwrap()
});

self.ack_notify
.notify((ctx.clone(), res))
.map_err(|e| sqlx::Error::Io(io::Error::new(io::ErrorKind::Interrupted, e)))?;
Expand Down
6 changes: 6 additions & 0 deletions src/layers/catch_panic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ impl CatchPanicLayer<fn(Box<dyn Any + Send>) -> Error> {
}
}

impl Default for CatchPanicLayer<fn(Box<dyn Any + Send>) -> Error> {
fn default() -> Self {
Self::new()
}
}

impl<F> CatchPanicLayer<F>
where
F: FnMut(Box<dyn Any + Send>) -> Error + Clone,
Expand Down
3 changes: 2 additions & 1 deletion src/layers/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use tower::{Layer, Service};

/// A layer to support prometheus metrics
#[derive(Debug, Default)]
pub struct PrometheusLayer;
#[non_exhaustive]
pub struct PrometheusLayer {}

impl<S> Layer<S> for PrometheusLayer {
type Service = PrometheusService<S>;
Expand Down

0 comments on commit e1d7e6b

Please sign in to comment.