Skip to content

Commit

Permalink
Adds LoadingTaskWaiter.
Browse files Browse the repository at this point in the history
  • Loading branch information
Lichtso committed Dec 15, 2023
1 parent acdeb07 commit e396012
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
53 changes: 52 additions & 1 deletion program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
Arc, Condvar, Mutex, RwLock,
},
},
};
Expand Down Expand Up @@ -451,6 +451,54 @@ impl Default for ProgramRuntimeEnvironments {
}
}

#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct LoadingTaskCookie(u64);

impl LoadingTaskCookie {
fn new() -> Self {
Self(0)
}

fn update(&mut self) {
let LoadingTaskCookie(cookie) = self;
*cookie = cookie.wrapping_add(1);
}
}

/// Prevents excessive polling during cooperative loading
#[derive(Debug, Default)]
pub struct LoadingTaskWaiter {
cookie: Mutex<LoadingTaskCookie>,
cond: Condvar,
}

impl LoadingTaskWaiter {
pub fn new() -> Self {
Self {
cookie: Mutex::new(LoadingTaskCookie::new()),
cond: Condvar::new(),
}
}

pub fn cookie(&self) -> LoadingTaskCookie {
*self.cookie.lock().unwrap()
}

pub fn notify(&self) {
let mut cookie = self.cookie.lock().unwrap();
cookie.update();
self.cond.notify_all();
}

pub fn wait(&self, cookie: LoadingTaskCookie) -> LoadingTaskCookie {
let cookie_guard = self.cookie.lock().unwrap();
*self
.cond
.wait_while(cookie_guard, |current_cookie| *current_cookie == cookie)
.unwrap()
}
}

#[derive(Debug, Default)]
struct SecondLevel {
slot_versions: Vec<Arc<LoadedProgram>>,
Expand Down Expand Up @@ -480,6 +528,7 @@ pub struct LoadedPrograms<FG: ForkGraph> {
pub programs_to_recompile: Vec<(Pubkey, Arc<LoadedProgram>)>,
pub stats: Stats,
pub fork_graph: Option<Arc<RwLock<FG>>>,
pub loading_task_waiter: Arc<LoadingTaskWaiter>,
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -561,6 +610,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
programs_to_recompile: Vec::default(),
stats: Stats::default(),
fork_graph: None,
loading_task_waiter: Arc::new(LoadingTaskWaiter::default()),
}
}

Expand Down Expand Up @@ -887,6 +937,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
);
second_level.cooperative_loading_lock = None;
self.assign_program(key, loaded_program);
self.loading_task_waiter.notify();
}

pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
Expand Down
13 changes: 9 additions & 4 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5077,7 +5077,7 @@ impl Bank {
let mut loaded_programs_for_txs = None;
let mut program_to_store = None;
loop {
let program_to_load = {
let (program_to_load, task_cookie, task_waiter) = {
// Lock the global cache.
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
// Initialize our local cache.
Expand All @@ -5098,11 +5098,13 @@ impl Bank {
);
}
// Figure out which program needs to be loaded next.
loaded_programs_cache.extract(
let program_to_load = loaded_programs_cache.extract(
self,
&mut missing_programs,
loaded_programs_for_txs.as_mut().unwrap(),
)
);
let task_waiter = Arc::clone(&loaded_programs_cache.loading_task_waiter);
(program_to_load, task_waiter.cookie(), task_waiter)
// Unlock the global cache again.
};

Expand All @@ -5114,7 +5116,10 @@ impl Bank {
} else if missing_programs.is_empty() {
break;
} else {
// TODO: Wait on a std::sync::Condvar
// Sleep until the next finish_cooperative_loading_task() call.
// Once a task completes we'll wake up and try to load the
// missing programs inside the tx batch again.
let _new_cookie = task_waiter.wait(task_cookie);
}
}

Expand Down

0 comments on commit e396012

Please sign in to comment.