From 9447cc20cd563d67052f558335312da97adc8b2f Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 26 Sep 2022 14:28:34 +0200 Subject: [PATCH 1/5] fix: stop background threads between estimations Explicitly stop and wait for prefetching background threads to terminate when a testbed is dropped. This avoids that estimations are influenced by background threads left over from previous estimations, which we have observed since merging #7661. --- .../src/trie/prefetching_trie_storage.rs | 48 +++++++++++++++---- core/store/src/trie/shard_tries.rs | 9 ++++ .../src/estimator_context.rs | 6 +++ runtime/runtime-params-estimator/src/lib.rs | 3 ++ .../runtime-params-estimator/src/testbed.rs | 4 ++ runtime/runtime-params-estimator/src/utils.rs | 5 +- 6 files changed, 63 insertions(+), 12 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index b00aff22bee..49ae0b2b33a 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -53,7 +53,6 @@ struct TriePrefetchingStorage { /// The former puts requests in, the latter serves requests. /// With this API, the store does not know about receipts etc, and the runtime /// does not know about the trie structure. The only thing they share is this object. -#[derive(Clone)] pub struct PrefetchApi { /// Bounded, shared queue for all IO threads to take work from. /// @@ -64,6 +63,9 @@ pub struct PrefetchApi { /// at the same time. work_queue_tx: crossbeam::channel::Sender<(StateRoot, TrieKey)>, work_queue_rx: crossbeam::channel::Receiver<(StateRoot, TrieKey)>, + /// Threads spawned by this instance of `PrefetchApi`, clones created for chunk processing + /// will have an empty vector instead. + io_thread_handles: Vec>, /// Prefetching IO threads will insert fetched data here. This is also used /// to mark what is already being fetched, to avoid fetching the same data /// multiple times. @@ -388,9 +390,10 @@ impl PrefetchApi { let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone(); let enable_receipt_prefetching = trie_config.enable_receipt_prefetching; - let this = Self { + let mut this = Self { work_queue_tx, work_queue_rx, + io_thread_handles: vec![], prefetching: PrefetchStagingArea::new(shard_uid.shard_id()), enable_receipt_prefetching, sweat_prefetch_receivers, @@ -412,12 +415,7 @@ impl PrefetchApi { self.work_queue_tx.send((root, trie_key)).map_err(|e| e.0) } - pub fn start_io_thread( - &self, - store: Store, - shard_cache: TrieCache, - shard_uid: ShardUId, - ) -> std::thread::JoinHandle<()> { + pub fn start_io_thread(&mut self, store: Store, shard_cache: TrieCache, shard_uid: ShardUId) { let prefetcher_storage = TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone()); let work_queue = self.work_queue_rx.clone(); @@ -425,7 +423,7 @@ impl PrefetchApi { metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]); let metric_prefetch_fail = metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]); - std::thread::spawn(move || { + let handle = std::thread::spawn(move || { while let Ok((trie_root, trie_key)) = work_queue.recv() { // Since the trie root can change,and since the root is not known at the time when the IO threads starts, // we need to redefine the trie before each request. @@ -443,7 +441,8 @@ impl PrefetchApi { metric_prefetch_fail.inc(); } } - }) + }); + self.io_thread_handles.push(handle); } /// Remove queued up requests so IO threads will be paused after they finish their current task. @@ -458,6 +457,18 @@ impl PrefetchApi { pub fn clear_data(&self) { self.prefetching.0.lock().expect(POISONED_LOCK_ERR).slots.clear(); } + + /// Interrupt and wait for all prefetching background threads to terminate. + pub fn stop_background_threads(&mut self) -> std::thread::Result<()> { + // close cross-beam channel + (self.work_queue_tx, self.work_queue_rx) = + crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); + // wait for IO threads to terminate + for handle in self.io_thread_handles.drain(..) { + handle.join()?; + } + Ok(()) + } } fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool { @@ -469,6 +480,23 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool } } +// Manual implementation to avoid cloning thread handles. Those handles should +// be in exclusive ownership of the original object, stored in `ShardTries`. +impl Clone for PrefetchApi { + fn clone(&self) -> Self { + Self { + work_queue_tx: self.work_queue_tx.clone(), + work_queue_rx: self.work_queue_rx.clone(), + io_thread_handles: vec![], + prefetching: self.prefetching.clone(), + enable_receipt_prefetching: self.enable_receipt_prefetching.clone(), + sweat_prefetch_receivers: self.sweat_prefetch_receivers.clone(), + sweat_prefetch_senders: self.sweat_prefetch_senders.clone(), + shard_uid: self.shard_uid.clone(), + } + } +} + /// Implementation to make testing from runtime possible. /// /// Prefetching by design has no visible side-effects. diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 0acf2f10980..b8e82777985 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -317,6 +317,15 @@ impl ShardTries { ) -> (StoreUpdate, StateRoot) { self.apply_all_inner(trie_changes, shard_uid, true) } + + // Stop prefetching background threads and wait until they have terminated. + pub fn stop_prefetching_threads(&self) -> std::thread::Result<()> { + for prefetcher in self.0.prefetchers.write().expect(POISONED_LOCK_ERR).values_mut() { + prefetcher.clear_queue(); + prefetcher.stop_background_threads()?; + } + Ok(()) + } } pub struct WrappedTrieChanges { diff --git a/runtime/runtime-params-estimator/src/estimator_context.rs b/runtime/runtime-params-estimator/src/estimator_context.rs index dbb41a59cca..6abd1d1057b 100644 --- a/runtime/runtime-params-estimator/src/estimator_context.rs +++ b/runtime/runtime-params-estimator/src/estimator_context.rs @@ -149,3 +149,9 @@ impl Testbed<'_> { } } } + +impl<'a> Drop for Testbed<'a> { + fn drop(&mut self) { + self.inner.stop_prefetching_threads(); + } +} diff --git a/runtime/runtime-params-estimator/src/lib.rs b/runtime/runtime-params-estimator/src/lib.rs index 776d2cad437..5bf89ccd1da 100644 --- a/runtime/runtime-params-estimator/src/lib.rs +++ b/runtime/runtime-params-estimator/src/lib.rs @@ -1155,6 +1155,9 @@ fn apply_block_cost(ctx: &mut EstimatorContext) -> GasCost { let gas_cost = average_cost(measurements); + // Drop required if `Drop` is implemented on testbed to satisfy lifetime + // requirements on `ctx`. + std::mem::drop(testbed); ctx.cached.apply_block = Some(gas_cost.clone()); gas_cost diff --git a/runtime/runtime-params-estimator/src/testbed.rs b/runtime/runtime-params-estimator/src/testbed.rs index df595465f48..b528a006f0b 100644 --- a/runtime/runtime-params-estimator/src/testbed.rs +++ b/runtime/runtime-params-estimator/src/testbed.rs @@ -158,4 +158,8 @@ impl RuntimeTestbed { pub fn store(&mut self) -> Store { self.tries.get_store() } + + pub fn stop_prefetching_threads(&self) { + self.tries.stop_prefetching_threads().unwrap(); + } } diff --git a/runtime/runtime-params-estimator/src/utils.rs b/runtime/runtime-params-estimator/src/utils.rs index b2d7966bf30..968f37371ec 100644 --- a/runtime/runtime-params-estimator/src/utils.rs +++ b/runtime/runtime-params-estimator/src/utils.rs @@ -151,7 +151,8 @@ pub(crate) fn fn_cost_with_setup( let block_latency = 0; let overhead = overhead_per_measured_block(ctx, block_latency); let block_size = 2usize; - let n_blocks = ctx.config.warmup_iters_per_block + ctx.config.iter_per_block; + let warmup_iters = ctx.config.warmup_iters_per_block; + let n_blocks = warmup_iters + ctx.config.iter_per_block; let mut testbed = ctx.testbed(); @@ -180,7 +181,7 @@ pub(crate) fn fn_cost_with_setup( // Filter out setup blocks. let measurements: Vec<_> = measurements .into_iter() - .skip(ctx.config.warmup_iters_per_block * 2) + .skip(warmup_iters * 2) .enumerate() .filter(|(i, _)| i % 2 == 1) .map(|(_, m)| m) From 079cfa159d032fbc5f57547374692adb484e8bdb Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 26 Sep 2022 21:56:48 +0200 Subject: [PATCH 2/5] drop inside `PrefetchApi` to guarantee joining The potential clones of `PrefetchApi` is unknown by its initial creator, the `InnerShardTries` instance. But the last channel sender must be dropped before joining the threads. Therefore, it is tricky to find the right place to join background threads. To solve it locally inside `core/store/src/trie/prefetching_trie_storage.rs` we use a helper struct `JoinGuard`. Dropping the join guard joins all threads. It is stored inside a reference counted pointer right after the the crossbeam sender, such that they are always cloned together. This ensures the join guard outlives the last sender to the channel. --- .../src/trie/prefetching_trie_storage.rs | 125 ++++++++++-------- core/store/src/trie/shard_tries.rs | 9 -- .../src/estimator_context.rs | 6 - runtime/runtime-params-estimator/src/lib.rs | 3 - .../runtime-params-estimator/src/testbed.rs | 4 - runtime/runtime-params-estimator/src/utils.rs | 5 +- 6 files changed, 75 insertions(+), 77 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 49ae0b2b33a..198b16a9084 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -11,6 +11,7 @@ use near_primitives::trie_key::TrieKey; use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use tracing::error; const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024; const MAX_PREFETCH_STAGING_MEMORY: usize = 200 * 1024 * 1024; @@ -53,6 +54,7 @@ struct TriePrefetchingStorage { /// The former puts requests in, the latter serves requests. /// With this API, the store does not know about receipts etc, and the runtime /// does not know about the trie structure. The only thing they share is this object. +#[derive(Clone)] pub struct PrefetchApi { /// Bounded, shared queue for all IO threads to take work from. /// @@ -61,11 +63,7 @@ pub struct PrefetchApi { /// changing the queue to an enum. /// The state root is also included because multiple chunks could be applied /// at the same time. - work_queue_tx: crossbeam::channel::Sender<(StateRoot, TrieKey)>, - work_queue_rx: crossbeam::channel::Receiver<(StateRoot, TrieKey)>, - /// Threads spawned by this instance of `PrefetchApi`, clones created for chunk processing - /// will have an empty vector instead. - io_thread_handles: Vec>, + work_queue: WorkQueue, /// Prefetching IO threads will insert fetched data here. This is also used /// to mark what is already being fetched, to avoid fetching the same data /// multiple times. @@ -108,6 +106,19 @@ pub(crate) enum PrefetcherResult { MemoryLimitReached, } +/// Only exists to implement `Drop`. +struct JoinGuard(Vec>); + +impl Drop for JoinGuard { + fn drop(&mut self) { + for handle in self.0.drain(..) { + if let Err(e) = handle.join() { + error!("Failed to join background thread: {e:?}") + } + } + } +} + struct StagedMetrics { prefetch_staged_bytes: GenericGauge, prefetch_staged_items: GenericGauge, @@ -385,25 +396,32 @@ impl PrefetchApi { shard_uid: ShardUId, trie_config: &TrieConfig, ) -> Self { - let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); + let (tx, rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone(); let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone(); let enable_receipt_prefetching = trie_config.enable_receipt_prefetching; - - let mut this = Self { - work_queue_tx, - work_queue_rx, - io_thread_handles: vec![], - prefetching: PrefetchStagingArea::new(shard_uid.shard_id()), + let prefetching = PrefetchStagingArea::new(shard_uid.shard_id()); + + let handles = (0..NUM_IO_THREADS) + .map(|_| { + Self::start_io_thread( + rx.clone(), + prefetching.clone(), + store.clone(), + shard_cache.clone(), + shard_uid.clone(), + ) + }) + .collect(); + Self { + // Do not clone tx before this point, or `WorkQueue` invariant is broken. + work_queue: WorkQueue { rx, tx, _handles: Arc::new(JoinGuard(handles)) }, + prefetching, enable_receipt_prefetching, sweat_prefetch_receivers, sweat_prefetch_senders, shard_uid, - }; - for _ in 0..NUM_IO_THREADS { - this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone()); } - this } /// Returns the argument back if queue is full. @@ -412,18 +430,23 @@ impl PrefetchApi { root: StateRoot, trie_key: TrieKey, ) -> Result<(), (StateRoot, TrieKey)> { - self.work_queue_tx.send((root, trie_key)).map_err(|e| e.0) + self.work_queue.tx.send((root, trie_key)).map_err(|e| e.0) } - pub fn start_io_thread(&mut self, store: Store, shard_cache: TrieCache, shard_uid: ShardUId) { + fn start_io_thread( + work_queue: crossbeam::channel::Receiver<(StateRoot, TrieKey)>, + prefetching: PrefetchStagingArea, + store: Store, + shard_cache: TrieCache, + shard_uid: ShardUId, + ) -> std::thread::JoinHandle<()> { let prefetcher_storage = - TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone()); - let work_queue = self.work_queue_rx.clone(); + TriePrefetchingStorage::new(store, shard_uid, shard_cache, prefetching); let metric_prefetch_sent = metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]); let metric_prefetch_fail = metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]); - let handle = std::thread::spawn(move || { + std::thread::spawn(move || { while let Ok((trie_root, trie_key)) = work_queue.recv() { // Since the trie root can change,and since the root is not known at the time when the IO threads starts, // we need to redefine the trie before each request. @@ -441,8 +464,7 @@ impl PrefetchApi { metric_prefetch_fail.inc(); } } - }); - self.io_thread_handles.push(handle); + }) } /// Remove queued up requests so IO threads will be paused after they finish their current task. @@ -450,25 +472,41 @@ impl PrefetchApi { /// Queued up work will not be finished. But trie keys that are already /// being fetched will finish. pub fn clear_queue(&self) { - while let Ok(_dropped) = self.work_queue_rx.try_recv() {} + while let Ok(_dropped) = self.work_queue.rx.try_recv() {} } /// Clear prefetched staging area from data that has not been picked up by the main thread. pub fn clear_data(&self) { self.prefetching.0.lock().expect(POISONED_LOCK_ERR).slots.clear(); } +} - /// Interrupt and wait for all prefetching background threads to terminate. - pub fn stop_background_threads(&mut self) -> std::thread::Result<()> { - // close cross-beam channel - (self.work_queue_tx, self.work_queue_rx) = - crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS); - // wait for IO threads to terminate - for handle in self.io_thread_handles.drain(..) { - handle.join()?; - } - Ok(()) - } +/// Bounded, shared queue for all IO threads to take work from. +/// +/// Work items are defined as `TrieKey` because currently the only +/// work is to prefetch a trie key. If other IO work is added, consider +/// changing the queue to an enum. +/// The state root is also included because multiple chunks could be applied +/// at the same time. +#[derive(Clone)] +struct WorkQueue { + /// The channel to the IO prefetch work queue. + rx: crossbeam::channel::Receiver<(StateRoot, TrieKey)>, + tx: crossbeam::channel::Sender<(StateRoot, TrieKey)>, + /// Thread handles for threads sitting behind channel. + /// + /// Invariant: The number of existing clones of `tx` is equal to + /// the reference count of join handles. + /// + /// The invariant holds because when `WorkQueue` is created there is no + /// clone of it, yet. And afterwards the only clones are through + /// `WorkQueue.clone()` which also increases the handles reference count. + /// + /// When the last reference to `handles` is dropped, the handles + /// are joined, which will terminate because the last `tx` has + /// already been dropped (field order matters!) and therefore the crossbeam + /// channel has been closed. + _handles: Arc, } fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool { @@ -480,23 +518,6 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool } } -// Manual implementation to avoid cloning thread handles. Those handles should -// be in exclusive ownership of the original object, stored in `ShardTries`. -impl Clone for PrefetchApi { - fn clone(&self) -> Self { - Self { - work_queue_tx: self.work_queue_tx.clone(), - work_queue_rx: self.work_queue_rx.clone(), - io_thread_handles: vec![], - prefetching: self.prefetching.clone(), - enable_receipt_prefetching: self.enable_receipt_prefetching.clone(), - sweat_prefetch_receivers: self.sweat_prefetch_receivers.clone(), - sweat_prefetch_senders: self.sweat_prefetch_senders.clone(), - shard_uid: self.shard_uid.clone(), - } - } -} - /// Implementation to make testing from runtime possible. /// /// Prefetching by design has no visible side-effects. diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index b8e82777985..0acf2f10980 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -317,15 +317,6 @@ impl ShardTries { ) -> (StoreUpdate, StateRoot) { self.apply_all_inner(trie_changes, shard_uid, true) } - - // Stop prefetching background threads and wait until they have terminated. - pub fn stop_prefetching_threads(&self) -> std::thread::Result<()> { - for prefetcher in self.0.prefetchers.write().expect(POISONED_LOCK_ERR).values_mut() { - prefetcher.clear_queue(); - prefetcher.stop_background_threads()?; - } - Ok(()) - } } pub struct WrappedTrieChanges { diff --git a/runtime/runtime-params-estimator/src/estimator_context.rs b/runtime/runtime-params-estimator/src/estimator_context.rs index 6abd1d1057b..dbb41a59cca 100644 --- a/runtime/runtime-params-estimator/src/estimator_context.rs +++ b/runtime/runtime-params-estimator/src/estimator_context.rs @@ -149,9 +149,3 @@ impl Testbed<'_> { } } } - -impl<'a> Drop for Testbed<'a> { - fn drop(&mut self) { - self.inner.stop_prefetching_threads(); - } -} diff --git a/runtime/runtime-params-estimator/src/lib.rs b/runtime/runtime-params-estimator/src/lib.rs index 5bf89ccd1da..776d2cad437 100644 --- a/runtime/runtime-params-estimator/src/lib.rs +++ b/runtime/runtime-params-estimator/src/lib.rs @@ -1155,9 +1155,6 @@ fn apply_block_cost(ctx: &mut EstimatorContext) -> GasCost { let gas_cost = average_cost(measurements); - // Drop required if `Drop` is implemented on testbed to satisfy lifetime - // requirements on `ctx`. - std::mem::drop(testbed); ctx.cached.apply_block = Some(gas_cost.clone()); gas_cost diff --git a/runtime/runtime-params-estimator/src/testbed.rs b/runtime/runtime-params-estimator/src/testbed.rs index b528a006f0b..df595465f48 100644 --- a/runtime/runtime-params-estimator/src/testbed.rs +++ b/runtime/runtime-params-estimator/src/testbed.rs @@ -158,8 +158,4 @@ impl RuntimeTestbed { pub fn store(&mut self) -> Store { self.tries.get_store() } - - pub fn stop_prefetching_threads(&self) { - self.tries.stop_prefetching_threads().unwrap(); - } } diff --git a/runtime/runtime-params-estimator/src/utils.rs b/runtime/runtime-params-estimator/src/utils.rs index 968f37371ec..b2d7966bf30 100644 --- a/runtime/runtime-params-estimator/src/utils.rs +++ b/runtime/runtime-params-estimator/src/utils.rs @@ -151,8 +151,7 @@ pub(crate) fn fn_cost_with_setup( let block_latency = 0; let overhead = overhead_per_measured_block(ctx, block_latency); let block_size = 2usize; - let warmup_iters = ctx.config.warmup_iters_per_block; - let n_blocks = warmup_iters + ctx.config.iter_per_block; + let n_blocks = ctx.config.warmup_iters_per_block + ctx.config.iter_per_block; let mut testbed = ctx.testbed(); @@ -181,7 +180,7 @@ pub(crate) fn fn_cost_with_setup( // Filter out setup blocks. let measurements: Vec<_> = measurements .into_iter() - .skip(warmup_iters * 2) + .skip(ctx.config.warmup_iters_per_block * 2) .enumerate() .filter(|(i, _)| i % 2 == 1) .map(|(_, m)| m) From 0d651aa5337e587e4d3992735429f2bf5196c2c7 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 26 Sep 2022 22:11:03 +0200 Subject: [PATCH 3/5] fix tests --- core/store/src/trie/prefetching_trie_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 198b16a9084..84d02485601 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -547,7 +547,7 @@ mod tests { } pub fn work_queued(&self) -> bool { - !self.work_queue_rx.is_empty() + !self.work_queue.rx.is_empty() } } From b5bed6ae34eae2566236f94fbbaf86dd86d75475 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 26 Sep 2022 22:23:44 +0200 Subject: [PATCH 4/5] move JoinHandle to bottom --- .../src/trie/prefetching_trie_storage.rs | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 84d02485601..8e0c92f5277 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -106,19 +106,6 @@ pub(crate) enum PrefetcherResult { MemoryLimitReached, } -/// Only exists to implement `Drop`. -struct JoinGuard(Vec>); - -impl Drop for JoinGuard { - fn drop(&mut self) { - for handle in self.0.drain(..) { - if let Err(e) = handle.join() { - error!("Failed to join background thread: {e:?}") - } - } - } -} - struct StagedMetrics { prefetch_staged_bytes: GenericGauge, prefetch_staged_items: GenericGauge, @@ -413,9 +400,10 @@ impl PrefetchApi { ) }) .collect(); + // Do not clone tx before this point, or `WorkQueue` invariant is broken. + let work_queue = WorkQueue { rx, tx, _handles: Arc::new(JoinGuard(handles)) }; Self { - // Do not clone tx before this point, or `WorkQueue` invariant is broken. - work_queue: WorkQueue { rx, tx, _handles: Arc::new(JoinGuard(handles)) }, + work_queue, prefetching, enable_receipt_prefetching, sweat_prefetch_receivers, @@ -509,6 +497,19 @@ struct WorkQueue { _handles: Arc, } +/// Only exists to implement `Drop`. +struct JoinGuard(Vec>); + +impl Drop for JoinGuard { + fn drop(&mut self) { + for handle in self.0.drain(..) { + if let Err(e) = handle.join() { + error!("Failed to join background thread: {e:?}") + } + } + } +} + fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool { match (expected, actual) { (PrefetchSlot::PendingPrefetch, PrefetchSlot::PendingPrefetch) From 6b16bd076de03f48ae3a6dd80b3bd8c02e405fab Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 26 Sep 2022 22:25:32 +0200 Subject: [PATCH 5/5] update comment --- core/store/src/trie/prefetching_trie_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 8e0c92f5277..6f03b2f7642 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -487,7 +487,7 @@ struct WorkQueue { /// the reference count of join handles. /// /// The invariant holds because when `WorkQueue` is created there is no - /// clone of it, yet. And afterwards the only clones are through + /// clone of `tx`, yet. And afterwards the only `tx` clones are through /// `WorkQueue.clone()` which also increases the handles reference count. /// /// When the last reference to `handles` is dropped, the handles