Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop background threads between estimations #7689

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct TriePrefetchingStorage {
/// The former puts requests in, the latter serves requests.
/// With this API, the store does not know about receipts etc, and the runtime
/// does not know about the trie structure. The only thing they share is this object.
#[derive(Clone)]
pub struct PrefetchApi {
/// Bounded, shared queue for all IO threads to take work from.
///
Expand All @@ -64,6 +63,9 @@ pub struct PrefetchApi {
/// at the same time.
work_queue_tx: crossbeam::channel::Sender<(StateRoot, TrieKey)>,
work_queue_rx: crossbeam::channel::Receiver<(StateRoot, TrieKey)>,
/// Threads spawned by this instance of `PrefetchApi`, clones created for chunk processing
/// will have an empty vector instead.
io_thread_handles: Vec<std::thread::JoinHandle<()>>,
/// Prefetching IO threads will insert fetched data here. This is also used
/// to mark what is already being fetched, to avoid fetching the same data
/// multiple times.
Expand Down Expand Up @@ -388,9 +390,10 @@ impl PrefetchApi {
let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone();
let enable_receipt_prefetching = trie_config.enable_receipt_prefetching;

let this = Self {
let mut this = Self {
work_queue_tx,
work_queue_rx,
io_thread_handles: vec![],
prefetching: PrefetchStagingArea::new(shard_uid.shard_id()),
enable_receipt_prefetching,
sweat_prefetch_receivers,
Expand All @@ -412,20 +415,15 @@ impl PrefetchApi {
self.work_queue_tx.send((root, trie_key)).map_err(|e| e.0)
}

pub fn start_io_thread(
&self,
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
) -> std::thread::JoinHandle<()> {
pub fn start_io_thread(&mut self, store: Store, shard_cache: TrieCache, shard_uid: ShardUId) {
let prefetcher_storage =
TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone());
let work_queue = self.work_queue_rx.clone();
let metric_prefetch_sent =
metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]);
let metric_prefetch_fail =
metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]);
std::thread::spawn(move || {
let handle = std::thread::spawn(move || {
while let Ok((trie_root, trie_key)) = work_queue.recv() {
// Since the trie root can change,and since the root is not known at the time when the IO threads starts,
// we need to redefine the trie before each request.
Expand All @@ -443,7 +441,8 @@ impl PrefetchApi {
metric_prefetch_fail.inc();
}
}
})
});
self.io_thread_handles.push(handle);
}

/// Remove queued up requests so IO threads will be paused after they finish their current task.
Expand All @@ -458,6 +457,18 @@ impl PrefetchApi {
pub fn clear_data(&self) {
self.prefetching.0.lock().expect(POISONED_LOCK_ERR).slots.clear();
}

/// Interrupt and wait for all prefetching background threads to terminate.
pub fn stop_background_threads(&mut self) -> std::thread::Result<()> {
// close cross-beam channel
(self.work_queue_tx, self.work_queue_rx) =
crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS);
// wait for IO threads to terminate
for handle in self.io_thread_handles.drain(..) {
handle.join()?;
}
Ok(())
}
}

fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool {
Expand All @@ -469,6 +480,23 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool
}
}

// Manual implementation to avoid cloning thread handles. Those handles should
// be in exclusive ownership of the original object, stored in `ShardTries`.
impl Clone for PrefetchApi {
fn clone(&self) -> Self {
Self {
work_queue_tx: self.work_queue_tx.clone(),
work_queue_rx: self.work_queue_rx.clone(),
io_thread_handles: vec![],
prefetching: self.prefetching.clone(),
enable_receipt_prefetching: self.enable_receipt_prefetching.clone(),
sweat_prefetch_receivers: self.sweat_prefetch_receivers.clone(),
sweat_prefetch_senders: self.sweat_prefetch_senders.clone(),
shard_uid: self.shard_uid.clone(),
}
}
}

/// Implementation to make testing from runtime possible.
///
/// Prefetching by design has no visible side-effects.
Expand Down
9 changes: 9 additions & 0 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ impl ShardTries {
) -> (StoreUpdate, StateRoot) {
self.apply_all_inner(trie_changes, shard_uid, true)
}

// Stop prefetching background threads and wait until they have terminated.
pub fn stop_prefetching_threads(&self) -> std::thread::Result<()> {
for prefetcher in self.0.prefetchers.write().expect(POISONED_LOCK_ERR).values_mut() {
prefetcher.clear_queue();
prefetcher.stop_background_threads()?;
}
Ok(())
}
}

pub struct WrappedTrieChanges {
Expand Down
6 changes: 6 additions & 0 deletions runtime/runtime-params-estimator/src/estimator_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,9 @@ impl Testbed<'_> {
}
}
}

impl<'a> Drop for Testbed<'a> {
fn drop(&mut self) {
self.inner.stop_prefetching_threads();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's propagate this drop to ShardTriesInner? Joining thread on drop should be responsibility of the entity which spawns the threads. As a rule-of-thumb, every spawn thread should have its' join.

Perhaps also use something like

struct ShardTriesInner {
    /// Prefetcher state, such as IO threads, per shard.
    prefetchers: RwLock<HashMap<ShardUId, (PrefetchApi, Vec<JoinHandle<()>>)>>,
}

?

The funky Clone impl is funky, better if we can separate Prefetcher and PrefetcherHandle at the type level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, 100% agree that this clone thing is too funky, thanks for pointing it out.

However, ShardTriesInner::drop seems like the wrong place to me. At least I failed in my attempt to make it work.

The problem is that we really must close the crossbeam channel first and only then join the threads. Joining when dropping Testbed only works because the testbed itself outlives all places that could hold a clone of a channel sender, which is not true for ShardTriesInner.

The sender is stored inside PrefetchApi, which in turn is created by ShardTriesInner. PrefetchApi is cloned around into every instance of Trie and that lives in various other structs, such as TrieUpdate. And unlike Testbed, ShardTriesInner is not guaranteed to outlive all of those. Thus, joining when ShardTriesInner is dropped results in deadlocks as the background threads can still be waiting on an open channel.

I have now attempted to apply your suggestion in a slightly different way. Definitive ownership of the channel sender AND the join handles is now given to the clonable struct WorkQueue, which itself is stored inside PrefetchApi. This way all clones of the sender also clone the Arc<Vec<JoinHandle>>. When the last instance that combination is dropped, it is safe to join the threads.

I am still not 100% happy with my implementation, though. It is still gimmicky and there are too many nested structs for my taste. But it's the truest representation of ownership that I could come up with right now.^^ Ideas for improvements are welcome. :)

}
}
3 changes: 3 additions & 0 deletions runtime/runtime-params-estimator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,9 @@ fn apply_block_cost(ctx: &mut EstimatorContext) -> GasCost {

let gas_cost = average_cost(measurements);

// Drop required if `Drop` is implemented on testbed to satisfy lifetime
// requirements on `ctx`.
std::mem::drop(testbed);
ctx.cached.apply_block = Some(gas_cost.clone());

gas_cost
Expand Down
4 changes: 4 additions & 0 deletions runtime/runtime-params-estimator/src/testbed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,8 @@ impl RuntimeTestbed {
pub fn store(&mut self) -> Store {
self.tries.get_store()
}

pub fn stop_prefetching_threads(&self) {
self.tries.stop_prefetching_threads().unwrap();
}
}
5 changes: 3 additions & 2 deletions runtime/runtime-params-estimator/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ pub(crate) fn fn_cost_with_setup(
let block_latency = 0;
let overhead = overhead_per_measured_block(ctx, block_latency);
let block_size = 2usize;
let n_blocks = ctx.config.warmup_iters_per_block + ctx.config.iter_per_block;
let warmup_iters = ctx.config.warmup_iters_per_block;
let n_blocks = warmup_iters + ctx.config.iter_per_block;

let mut testbed = ctx.testbed();

Expand Down Expand Up @@ -180,7 +181,7 @@ pub(crate) fn fn_cost_with_setup(
// Filter out setup blocks.
let measurements: Vec<_> = measurements
.into_iter()
.skip(ctx.config.warmup_iters_per_block * 2)
.skip(warmup_iters * 2)
.enumerate()
.filter(|(i, _)| i % 2 == 1)
.map(|(_, m)| m)
Expand Down