From 642eaa370a56f4d21a99f90810f6454ab4371019 Mon Sep 17 00:00:00 2001 From: Joe Grund Date: Tue, 18 Aug 2020 14:40:37 -0400 Subject: [PATCH] Don't use try_acquire alongside spawned task We're hitting an issue that looks similar to https://github.com/launchbadge/sqlx/issues/622 I was able to work around it by disabling fairness, but we can just avoid the lookup entirely if active workers is >= the pool size. Signed-off-by: Joe Grund --- iml-task-runner/src/main.rs | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/iml-task-runner/src/main.rs b/iml-task-runner/src/main.rs index cfb181a8e5..6fdaa84185 100644 --- a/iml-task-runner/src/main.rs +++ b/iml-task-runner/src/main.rs @@ -7,7 +7,7 @@ use iml_action_client::invoke_rust_agent; use iml_manager_env::get_pool_limit; use iml_postgres::{ get_db_pool, - sqlx::{self, pool::PoolConnection, Done, Executor, PgPool, Postgres}, + sqlx::{self, Done, Executor, PgPool}, }; use iml_tracing::tracing; use iml_wire_types::{ @@ -37,7 +37,7 @@ lazy_static! { } async fn available_workers( - conn: &mut PoolConnection, + pool: &PgPool, ids: Vec, ) -> Result, error::ImlTaskRunnerError> { let clients = sqlx::query_as!( @@ -53,7 +53,7 @@ async fn available_workers( &ids, max(*POOL_LIMIT as i64 - ids.len() as i64, 0), ) - .fetch_all(conn) + .fetch_all(pool) .await?; Ok(clients) @@ -332,29 +332,21 @@ async fn main() -> Result<(), Box> { tracing::debug!("Pool State: {:?}", pg_pool); - let mut conn = match pg_pool.try_acquire() { - Some(x) => x, - None => { - tracing::info!( - "Could not acquire connection (pool full), will try again next tick" - ); - continue; - } - }; - - let workers = { + let ids: Vec = { let xs = active_clients.lock().await; - let ids: Vec = xs.iter().copied().collect(); + xs.iter().copied().collect() + }; - tracing::debug!("checking workers for ids: {:?}", ids); + if ids.len() as u32 >= *POOL_LIMIT { + tracing::info!("No more capacity to service tasks. Active workers: {:?}, Connection Limit: {}. Will try again next tick.", ids, *POOL_LIMIT); + continue; + } - let workers = available_workers(&mut conn, ids).await?; + tracing::debug!("checking workers for ids: {:?}", ids); - tracing::debug!("got workers: {:?}", workers); + let workers = available_workers(&pg_pool, ids).await?; - drop(conn); - workers - }; + tracing::debug!("got workers: {:?}", workers); { let mut x = active_clients.lock().await;