Skip to content

Commit

Permalink
feat: standardize cron as backend (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi authored Jul 10, 2024
1 parent fc72bc7 commit 7159edb
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ members = [
"examples/basics",
"examples/redis-with-msg-pack",
"examples/redis-deadpool",
"examples/redis-mq-example",
"examples/redis-mq-example", "examples/cron",
]


Expand Down
2 changes: 1 addition & 1 deletion examples/async-std-runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<()> {
let worker = WorkerBuilder::new("daily-cron-worker")
.layer(RetryLayer::new(RetryPolicy::retries(5)))
.layer(TraceLayer::new().make_span_with(ReminderSpan::new()))
.stream(CronStream::new(schedule).into_stream())
.backend(CronStream::new(schedule))
.build_fn(send_reminder);

Monitor::<AsyncStdExecutor>::new()
Expand Down
23 changes: 23 additions & 0 deletions examples/cron/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "cron"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
apalis = { path = "../../", default-features = false, features = [
"tokio-comp",
"tracing",
"limit",
] }
apalis-cron = { path = "../../packages/apalis-cron" }
tokio = { version = "1", features = ["full"] }
serde = "1"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
pin-project-lite = "0.2.9"
tower = { version = "0.4", features = ["load-shed"] }

[dependencies.tracing]
default-features = false
version = "0.1"
44 changes: 44 additions & 0 deletions examples/cron/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use apalis::prelude::*;
use apalis::utils::TokioExecutor;
use apalis_cron::CronStream;
use apalis_cron::Schedule;
use chrono::{DateTime, Utc};
use std::str::FromStr;
use std::time::Duration;
use tower::limit::RateLimitLayer;
use tower::load_shed::LoadShedLayer;

#[derive(Clone)]
struct FakeService;
impl FakeService {
fn execute(&self, item: Reminder) {
dbg!(&item.0);
}
}

#[derive(Default, Debug, Clone)]
struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder {
fn from(t: DateTime<Utc>) -> Self {
Reminder(t)
}
}
async fn send_reminder(job: Reminder, svc: Data<FakeService>) {
svc.execute(job);
}

#[tokio::main]
async fn main() {
let schedule = Schedule::from_str("1/1 * * * * *").unwrap();
let worker = WorkerBuilder::new("morning-cereal")
.layer(LoadShedLayer::new()) // Important when you have layers that block the service
.layer(RateLimitLayer::new(1, Duration::from_secs(2)))
.data(FakeService)
.backend(CronStream::new(schedule))
.build_fn(send_reminder);
Monitor::<TokioExecutor>::new()
.register(worker)
.run()
.await
.unwrap();
}
1 change: 1 addition & 0 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl<Serv> WorkerBuilder<(), (), Identity, Serv> {

impl<J, M, Serv> WorkerBuilder<J, (), M, Serv> {
/// Consume a stream directly
#[deprecated(since = "0.6.0", note = "Consider using the `.backend`")]
pub fn stream<NS: Stream<Item = Result<Option<Request<NJ>>, Error>> + Send + 'static, NJ>(
self,
stream: NS,
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
self.workers.push(worker.with_monitor(&self));

Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl<S, P> Worker<Ready<S, P>> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
Expand Down Expand Up @@ -322,7 +322,7 @@ impl<S, P> Worker<Ready<S, P>> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
Expand Down Expand Up @@ -378,7 +378,7 @@ impl<S, P> Worker<Ready<S, P>> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
let worker_id = self.id.clone();
let notifier = Notify::new();
Expand Down
5 changes: 0 additions & 5 deletions packages/apalis-cron/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ description = "A simple yet extensible library for cron-like job scheduling for
[dependencies]
apalis-core = { path = "../../packages/apalis-core", version = "0.6.0-rc.1", default-features = false, features = [
"sleep",
"json",
] }
cron = "0.12.1"
futures = "0.3.30"
Expand All @@ -29,10 +28,6 @@ apalis-core = { path = "../../packages/apalis-core" }
apalis = { path = "../../", default-features = false, features = ["retry"] }
serde = { version = "1.0", features = ["derive"] }

[features]
default = ["tokio-comp"]
async-std-comp = ["async-std"]
tokio-comp = ["tokio/net"]

[package.metadata.docs.rs]
# defines the configuration attribute `docsrs`
Expand Down
22 changes: 21 additions & 1 deletion packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@
//! ```
use apalis_core::data::Extensions;
use apalis_core::layers::Identity;
use apalis_core::poller::Poller;
use apalis_core::request::RequestStream;
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::Backend;
use apalis_core::{error::Error, request::Request};
use chrono::{DateTime, TimeZone, Utc};
pub use cron::Schedule;
Expand Down Expand Up @@ -104,7 +108,7 @@ where
Tz::Offset: Send + Sync,
{
/// Convert to consumable
pub fn into_stream(self) -> RequestStream<Request<J>> {
fn into_stream(self) -> RequestStream<Request<J>> {
let timezone = self.timezone.clone();
let stream = async_stream::stream! {
let mut schedule = self.schedule.upcoming_owned(timezone.clone());
Expand All @@ -128,3 +132,19 @@ where
Box::pin(stream)
}
}

impl<J, Tz> Backend<Request<J>> for CronStream<J, Tz>
where
J: From<DateTime<Tz>> + Send + Sync + 'static,
Tz: TimeZone + Send + Sync + 'static,
Tz::Offset: Send + Sync,
{
type Stream = RequestStream<Request<J>>;

type Layer = Identity;

fn poll(self, _worker: WorkerId) -> Poller<Self::Stream, Self::Layer> {
let stream = self.into_stream();
Poller::new(stream, async {})
}
}

0 comments on commit 7159edb

Please sign in to comment.