Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Don't use try_acquire alongside spawned task #2165

Merged
merged 1 commit into from
Aug 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions iml-task-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use iml_action_client::invoke_rust_agent;
use iml_manager_env::get_pool_limit;
use iml_postgres::{
get_db_pool,
sqlx::{self, pool::PoolConnection, Done, Executor, PgPool, Postgres},
sqlx::{self, Done, Executor, PgPool},
};
use iml_tracing::tracing;
use iml_wire_types::{
Expand Down Expand Up @@ -37,7 +37,7 @@ lazy_static! {
}

async fn available_workers(
conn: &mut PoolConnection<Postgres>,
pool: &PgPool,
ids: Vec<i32>,
) -> Result<Vec<LustreClient>, error::ImlTaskRunnerError> {
let clients = sqlx::query_as!(
Expand All @@ -53,7 +53,7 @@ async fn available_workers(
&ids,
max(*POOL_LIMIT as i64 - ids.len() as i64, 0),
)
.fetch_all(conn)
.fetch_all(pool)
.await?;

Ok(clients)
Expand Down Expand Up @@ -332,29 +332,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

tracing::debug!("Pool State: {:?}", pg_pool);

let mut conn = match pg_pool.try_acquire() {
Some(x) => x,
None => {
tracing::info!(
"Could not acquire connection (pool full), will try again next tick"
);
continue;
}
};

let workers = {
let ids: Vec<i32> = {
let xs = active_clients.lock().await;
let ids: Vec<i32> = xs.iter().copied().collect();
xs.iter().copied().collect()
};

tracing::debug!("checking workers for ids: {:?}", ids);
if ids.len() as u32 >= *POOL_LIMIT {
tracing::info!("No more capacity to service tasks. Active workers: {:?}, Connection Limit: {}. Will try again next tick.", ids, *POOL_LIMIT);
continue;
}

let workers = available_workers(&mut conn, ids).await?;
tracing::debug!("checking workers for ids: {:?}", ids);

tracing::debug!("got workers: {:?}", workers);
let workers = available_workers(&pg_pool, ids).await?;

drop(conn);
workers
};
tracing::debug!("got workers: {:?}", workers);

{
let mut x = active_clients.lock().await;
Expand Down