Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: standardize cron as backend #359

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ members = [
"examples/basics",
"examples/redis-with-msg-pack",
"examples/redis-deadpool",
"examples/redis-mq-example",
"examples/redis-mq-example", "examples/cron",
]


Expand Down
2 changes: 1 addition & 1 deletion examples/async-std-runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<()> {
let worker = WorkerBuilder::new("daily-cron-worker")
.layer(RetryLayer::new(RetryPolicy::retries(5)))
.layer(TraceLayer::new().make_span_with(ReminderSpan::new()))
.stream(CronStream::new(schedule).into_stream())
.backend(CronStream::new(schedule))
.build_fn(send_reminder);

Monitor::<AsyncStdExecutor>::new()
Expand Down
23 changes: 23 additions & 0 deletions examples/cron/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "cron"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
apalis = { path = "../../", default-features = false, features = [
"tokio-comp",
"tracing",
"limit",
] }
apalis-cron = { path = "../../packages/apalis-cron" }
tokio = { version = "1", features = ["full"] }
serde = "1"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
pin-project-lite = "0.2.9"
tower = { version = "0.4", features = ["load-shed"] }

[dependencies.tracing]
default-features = false
version = "0.1"
44 changes: 44 additions & 0 deletions examples/cron/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use apalis::prelude::*;
use apalis::utils::TokioExecutor;
use apalis_cron::CronStream;
use apalis_cron::Schedule;
use chrono::{DateTime, Utc};
use std::str::FromStr;
use std::time::Duration;
use tower::limit::RateLimitLayer;
use tower::load_shed::LoadShedLayer;

#[derive(Clone)]
struct FakeService;
impl FakeService {
fn execute(&self, item: Reminder) {
dbg!(&item.0);
}
}

#[derive(Default, Debug, Clone)]
struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder {
fn from(t: DateTime<Utc>) -> Self {
Reminder(t)
}
}
async fn send_reminder(job: Reminder, svc: Data<FakeService>) {
svc.execute(job);
}

#[tokio::main]
async fn main() {
let schedule = Schedule::from_str("1/1 * * * * *").unwrap();
let worker = WorkerBuilder::new("morning-cereal")
.layer(LoadShedLayer::new()) // Important when you have layers that block the service
.layer(RateLimitLayer::new(1, Duration::from_secs(2)))
.data(FakeService)
.backend(CronStream::new(schedule))
.build_fn(send_reminder);
Monitor::<TokioExecutor>::new()
.register(worker)
.run()
.await
.unwrap();
}
1 change: 1 addition & 0 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl<Serv> WorkerBuilder<(), (), Identity, Serv> {

impl<J, M, Serv> WorkerBuilder<J, (), M, Serv> {
/// Consume a stream directly
#[deprecated(since = "0.6.0", note = "Consider using the `.backend`")]
pub fn stream<NS: Stream<Item = Result<Option<Request<NJ>>, Error>> + Send + 'static, NJ>(
self,
stream: NS,
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
self.workers.push(worker.with_monitor(&self));

Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl<S, P> Worker<Ready<S, P>> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
Expand Down Expand Up @@ -322,7 +322,7 @@ impl<S, P> Worker<Ready<S, P>> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
Expand Down Expand Up @@ -378,7 +378,7 @@ impl<S, P> Worker<Ready<S, P>> {
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
Send + Into<BoxDynError> + Sync,
{
let worker_id = self.id.clone();
let notifier = Notify::new();
Expand Down
5 changes: 0 additions & 5 deletions packages/apalis-cron/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ description = "A simple yet extensible library for cron-like job scheduling for
[dependencies]
apalis-core = { path = "../../packages/apalis-core", version = "0.6.0-rc.1", default-features = false, features = [
"sleep",
"json",
] }
cron = "0.12.1"
futures = "0.3.30"
Expand All @@ -29,10 +28,6 @@ apalis-core = { path = "../../packages/apalis-core" }
apalis = { path = "../../", default-features = false, features = ["retry"] }
serde = { version = "1.0", features = ["derive"] }

[features]
default = ["tokio-comp"]
async-std-comp = ["async-std"]
tokio-comp = ["tokio/net"]

[package.metadata.docs.rs]
# defines the configuration attribute `docsrs`
Expand Down
22 changes: 21 additions & 1 deletion packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@
//! ```

use apalis_core::data::Extensions;
use apalis_core::layers::Identity;
use apalis_core::poller::Poller;
use apalis_core::request::RequestStream;
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::Backend;
use apalis_core::{error::Error, request::Request};
use chrono::{DateTime, TimeZone, Utc};
pub use cron::Schedule;
Expand Down Expand Up @@ -104,7 +108,7 @@ where
Tz::Offset: Send + Sync,
{
/// Convert to consumable
pub fn into_stream(self) -> RequestStream<Request<J>> {
fn into_stream(self) -> RequestStream<Request<J>> {
let timezone = self.timezone.clone();
let stream = async_stream::stream! {
let mut schedule = self.schedule.upcoming_owned(timezone.clone());
Expand All @@ -128,3 +132,19 @@ where
Box::pin(stream)
}
}

impl<J, Tz> Backend<Request<J>> for CronStream<J, Tz>
where
J: From<DateTime<Tz>> + Send + Sync + 'static,
Tz: TimeZone + Send + Sync + 'static,
Tz::Offset: Send + Sync,
{
type Stream = RequestStream<Request<J>>;

type Layer = Identity;

fn poll(self, _worker: WorkerId) -> Poller<Self::Stream, Self::Layer> {
let stream = self.into_stream();
Poller::new(stream, async {})
}
}
Loading