Skip to content

Commit

Permalink
Graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed Oct 30, 2024
1 parent 4e08e3a commit f060abe
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ impl Options {
&mailer,
homeserver_connection.clone(),
url_builder.clone(),
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)
.await?;

Expand Down
39 changes: 34 additions & 5 deletions crates/cli/src/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ use rand::{
};
use tracing::{info, info_span};

use crate::util::{
database_pool_from_config, mailer_from_config, site_config_from_config, templates_from_config,
use crate::{
shutdown::ShutdownManager,
util::{
database_pool_from_config, mailer_from_config, site_config_from_config,
templates_from_config,
},
};

#[derive(Parser, Debug, Default)]
pub(super) struct Options {}

impl Options {
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
let shutdown = ShutdownManager::new()?;
let span = info_span!("cli.worker.init").entered();
let config = AppConfig::extract(figment)?;

Expand Down Expand Up @@ -71,11 +76,35 @@ impl Options {
let worker_name = Alphanumeric.sample_string(&mut rng, 10);

info!(worker_name, "Starting task scheduler");
let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn, url_builder).await?;

let monitor = mas_tasks::init(
&worker_name,
&pool,
&mailer,
conn,
url_builder,
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)
.await?;

// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
span.exit();

monitor.run().await?;
shutdown.run().await;

Ok(ExitCode::SUCCESS)
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion crates/storage-pg/src/queue/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
),
err,
)]
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> {
async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
Expand All @@ -126,6 +126,30 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {

DatabaseError::ensure_affected_rows(&res, 1)?;

// Remove the leader lease if we were holding it
let res = sqlx::query!(
r#"
DELETE FROM queue_leader
WHERE queue_worker_id = $1
"#,
Uuid::from(worker.id),
)
.traced()
.execute(&mut *self.conn)
.await?;

// If we were holding the leader lease, notify workers
if res.rows_affected() > 0 {
sqlx::query!(
r#"
NOTIFY queue_leader_stepdown
"#,
)
.traced()
.execute(&mut *self.conn)
.await?;
}

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions crates/storage/src/queue/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub trait QueueWorkerRepository: Send + Sync {
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error>;
async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;

/// Find dead workers and shut them down.
///
Expand Down Expand Up @@ -105,7 +105,7 @@ repository_impl!(QueueWorkerRepository:
async fn shutdown(
&mut self,
clock: &dyn Clock,
worker: Worker,
worker: &Worker,
) -> Result<(), Self::Error>;

async fn shutdown_dead_workers(
Expand Down
7 changes: 6 additions & 1 deletion crates/tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ workspace = true

[dependencies]
anyhow.workspace = true
apalis-core = { version = "0.4.9", features = ["extensions", "tokio-comp", "storage"] }
apalis-core = { version = "0.4.9", features = [
"extensions",
"tokio-comp",
"storage",
] }
apalis-cron = "0.4.9"
async-stream = "0.3.6"
async-trait.workspace = true
Expand All @@ -25,6 +29,7 @@ rand_chacha = "0.3.1"
sqlx.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tower.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
Expand Down
9 changes: 5 additions & 4 deletions crates/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use mas_storage_pg::PgRepository;
use new_queue::QueueRunnerError;
use rand::SeedableRng;
use sqlx::{Pool, Postgres};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::debug;

use crate::storage::PostgresStorageFactory;
Expand Down Expand Up @@ -143,6 +144,8 @@ pub async fn init(
mailer: &Mailer,
homeserver: impl HomeserverConnection<Error = anyhow::Error> + 'static,
url_builder: UrlBuilder,
cancellation_token: CancellationToken,
task_tracker: &TaskTracker,
) -> Result<Monitor<TokioExecutor>, QueueRunnerError> {
let state = State::new(
pool.clone(),
Expand All @@ -166,11 +169,9 @@ pub async fn init(
.map_err(QueueRunnerError::SetupListener)?;
debug!(?monitor, "workers registered");

let mut worker = self::new_queue::QueueWorker::new(state).await?;
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;

// TODO: this is just spawning the task in the background, we probably actually
// want to wrap that in a structure, and handle graceful shutdown correctly
tokio::spawn(async move {
task_tracker.spawn(async move {
if let Err(e) = worker.run().await {
tracing::error!(
error = &e as &dyn std::error::Error,
Expand Down
Loading

0 comments on commit f060abe

Please sign in to comment.