From 7159edb72818e1811fcabc460a3d4413832fc9d7 Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi <95377562+geofmureithi@users.noreply.github.com> Date: Wed, 10 Jul 2024 10:07:23 +0300 Subject: [PATCH] feat: standardize cron as backend (#359) --- Cargo.toml | 2 +- examples/async-std-runtime/src/main.rs | 2 +- examples/cron/Cargo.toml | 23 +++++++++++++ examples/cron/src/main.rs | 44 +++++++++++++++++++++++++ packages/apalis-core/src/builder.rs | 1 + packages/apalis-core/src/monitor/mod.rs | 2 +- packages/apalis-core/src/worker/mod.rs | 6 ++-- packages/apalis-cron/Cargo.toml | 5 --- packages/apalis-cron/src/lib.rs | 22 ++++++++++++- 9 files changed, 95 insertions(+), 12 deletions(-) create mode 100644 examples/cron/Cargo.toml create mode 100644 examples/cron/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 431d740..c1551da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,7 +113,7 @@ members = [ "examples/basics", "examples/redis-with-msg-pack", "examples/redis-deadpool", - "examples/redis-mq-example", + "examples/redis-mq-example", "examples/cron", ] diff --git a/examples/async-std-runtime/src/main.rs b/examples/async-std-runtime/src/main.rs index f77a895..58f2afa 100644 --- a/examples/async-std-runtime/src/main.rs +++ b/examples/async-std-runtime/src/main.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { let worker = WorkerBuilder::new("daily-cron-worker") .layer(RetryLayer::new(RetryPolicy::retries(5))) .layer(TraceLayer::new().make_span_with(ReminderSpan::new())) - .stream(CronStream::new(schedule).into_stream()) + .backend(CronStream::new(schedule)) .build_fn(send_reminder); Monitor::::new() diff --git a/examples/cron/Cargo.toml b/examples/cron/Cargo.toml new file mode 100644 index 0000000..f187e27 --- /dev/null +++ b/examples/cron/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "cron" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +apalis = { path = "../../", default-features = false, features = [ + "tokio-comp", + "tracing", + "limit", +] } +apalis-cron = { path = "../../packages/apalis-cron" } +tokio = { version = "1", features = ["full"] } +serde = "1" +tracing-subscriber = "0.3.11" +chrono = { version = "0.4", default-features = false, features = ["clock"] } +pin-project-lite = "0.2.9" +tower = { version = "0.4", features = ["load-shed"] } + +[dependencies.tracing] +default-features = false +version = "0.1" diff --git a/examples/cron/src/main.rs b/examples/cron/src/main.rs new file mode 100644 index 0000000..4a8fb74 --- /dev/null +++ b/examples/cron/src/main.rs @@ -0,0 +1,44 @@ +use apalis::prelude::*; +use apalis::utils::TokioExecutor; +use apalis_cron::CronStream; +use apalis_cron::Schedule; +use chrono::{DateTime, Utc}; +use std::str::FromStr; +use std::time::Duration; +use tower::limit::RateLimitLayer; +use tower::load_shed::LoadShedLayer; + +#[derive(Clone)] +struct FakeService; +impl FakeService { + fn execute(&self, item: Reminder) { + dbg!(&item.0); + } +} + +#[derive(Default, Debug, Clone)] +struct Reminder(DateTime); +impl From> for Reminder { + fn from(t: DateTime) -> Self { + Reminder(t) + } +} +async fn send_reminder(job: Reminder, svc: Data) { + svc.execute(job); +} + +#[tokio::main] +async fn main() { + let schedule = Schedule::from_str("1/1 * * * * *").unwrap(); + let worker = WorkerBuilder::new("morning-cereal") + .layer(LoadShedLayer::new()) // Important when you have layers that block the service + .layer(RateLimitLayer::new(1, Duration::from_secs(2))) + .data(FakeService) + .backend(CronStream::new(schedule)) + .build_fn(send_reminder); + Monitor::::new() + .register(worker) + .run() + .await + .unwrap(); +} diff --git a/packages/apalis-core/src/builder.rs b/packages/apalis-core/src/builder.rs index 7bcf1b9..4d837f3 100644 --- a/packages/apalis-core/src/builder.rs +++ b/packages/apalis-core/src/builder.rs @@ -55,6 +55,7 @@ impl WorkerBuilder<(), (), Identity, Serv> { impl WorkerBuilder { /// Consume a stream directly + #[deprecated(since = "0.6.0", note = "Consider using the `.backend`")] pub fn stream>, Error>> + Send + 'static, NJ>( self, stream: NS, diff --git a/packages/apalis-core/src/monitor/mod.rs b/packages/apalis-core/src/monitor/mod.rs index af5cb8e..aad4daf 100644 --- a/packages/apalis-core/src/monitor/mod.rs +++ b/packages/apalis-core/src/monitor/mod.rs @@ -97,7 +97,7 @@ impl Monitor { <<

>>::Layer as Layer>::Service as Service>>::Future: Send, <<

>>::Layer as Layer>::Service as Service>>::Error: - Send + std::error::Error + Sync, + Send + Into + Sync, { self.workers.push(worker.with_monitor(&self)); diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index 864d429..b9c071f 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -276,7 +276,7 @@ impl Worker> { <<

>>::Layer as Layer>::Service as Service>>::Future: Send, <<

>>::Layer as Layer>::Service as Service>>::Error: - Send + std::error::Error + Sync, + Send + Into + Sync, { let notifier = Notify::new(); let service = self.state.service; @@ -322,7 +322,7 @@ impl Worker> { <<

>>::Layer as Layer>::Service as Service>>::Future: Send, <<

>>::Layer as Layer>::Service as Service>>::Error: - Send + std::error::Error + Sync, + Send + Into + Sync, { let notifier = Notify::new(); let service = self.state.service; @@ -378,7 +378,7 @@ impl Worker> { <<

>>::Layer as Layer>::Service as Service>>::Future: Send, <<

>>::Layer as Layer>::Service as Service>>::Error: - Send + std::error::Error + Sync, + Send + Into + Sync, { let worker_id = self.id.clone(); let notifier = Notify::new(); diff --git a/packages/apalis-cron/Cargo.toml b/packages/apalis-cron/Cargo.toml index 199c2f4..8eeeb6a 100644 --- a/packages/apalis-cron/Cargo.toml +++ b/packages/apalis-cron/Cargo.toml @@ -11,7 +11,6 @@ description = "A simple yet extensible library for cron-like job scheduling for [dependencies] apalis-core = { path = "../../packages/apalis-core", version = "0.6.0-rc.1", default-features = false, features = [ "sleep", - "json", ] } cron = "0.12.1" futures = "0.3.30" @@ -29,10 +28,6 @@ apalis-core = { path = "../../packages/apalis-core" } apalis = { path = "../../", default-features = false, features = ["retry"] } serde = { version = "1.0", features = ["derive"] } -[features] -default = ["tokio-comp"] -async-std-comp = ["async-std"] -tokio-comp = ["tokio/net"] [package.metadata.docs.rs] # defines the configuration attribute `docsrs` diff --git a/packages/apalis-cron/src/lib.rs b/packages/apalis-cron/src/lib.rs index 5001945..024fa9f 100644 --- a/packages/apalis-cron/src/lib.rs +++ b/packages/apalis-cron/src/lib.rs @@ -58,8 +58,12 @@ //! ``` use apalis_core::data::Extensions; +use apalis_core::layers::Identity; +use apalis_core::poller::Poller; use apalis_core::request::RequestStream; use apalis_core::task::task_id::TaskId; +use apalis_core::worker::WorkerId; +use apalis_core::Backend; use apalis_core::{error::Error, request::Request}; use chrono::{DateTime, TimeZone, Utc}; pub use cron::Schedule; @@ -104,7 +108,7 @@ where Tz::Offset: Send + Sync, { /// Convert to consumable - pub fn into_stream(self) -> RequestStream> { + fn into_stream(self) -> RequestStream> { let timezone = self.timezone.clone(); let stream = async_stream::stream! { let mut schedule = self.schedule.upcoming_owned(timezone.clone()); @@ -128,3 +132,19 @@ where Box::pin(stream) } } + +impl Backend> for CronStream +where + J: From> + Send + Sync + 'static, + Tz: TimeZone + Send + Sync + 'static, + Tz::Offset: Send + Sync, +{ + type Stream = RequestStream>; + + type Layer = Identity; + + fn poll(self, _worker: WorkerId) -> Poller { + let stream = self.into_stream(); + Poller::new(stream, async {}) + } +}