diff --git a/iml-task-runner/src/main.rs b/iml-task-runner/src/main.rs index cfb181a8e5..6fdaa84185 100644 --- a/iml-task-runner/src/main.rs +++ b/iml-task-runner/src/main.rs @@ -7,7 +7,7 @@ use iml_action_client::invoke_rust_agent; use iml_manager_env::get_pool_limit; use iml_postgres::{ get_db_pool, - sqlx::{self, pool::PoolConnection, Done, Executor, PgPool, Postgres}, + sqlx::{self, Done, Executor, PgPool}, }; use iml_tracing::tracing; use iml_wire_types::{ @@ -37,7 +37,7 @@ lazy_static! { } async fn available_workers( - conn: &mut PoolConnection, + pool: &PgPool, ids: Vec, ) -> Result, error::ImlTaskRunnerError> { let clients = sqlx::query_as!( @@ -53,7 +53,7 @@ async fn available_workers( &ids, max(*POOL_LIMIT as i64 - ids.len() as i64, 0), ) - .fetch_all(conn) + .fetch_all(pool) .await?; Ok(clients) @@ -332,29 +332,21 @@ async fn main() -> Result<(), Box> { tracing::debug!("Pool State: {:?}", pg_pool); - let mut conn = match pg_pool.try_acquire() { - Some(x) => x, - None => { - tracing::info!( - "Could not acquire connection (pool full), will try again next tick" - ); - continue; - } - }; - - let workers = { + let ids: Vec = { let xs = active_clients.lock().await; - let ids: Vec = xs.iter().copied().collect(); + xs.iter().copied().collect() + }; - tracing::debug!("checking workers for ids: {:?}", ids); + if ids.len() as u32 >= *POOL_LIMIT { + tracing::info!("No more capacity to service tasks. Active workers: {:?}, Connection Limit: {}. Will try again next tick.", ids, *POOL_LIMIT); + continue; + } - let workers = available_workers(&mut conn, ids).await?; + tracing::debug!("checking workers for ids: {:?}", ids); - tracing::debug!("got workers: {:?}", workers); + let workers = available_workers(&pg_pool, ids).await?; - drop(conn); - workers - }; + tracing::debug!("got workers: {:?}", workers); { let mut x = active_clients.lock().await;