Skip to content

Commit

Permalink
fix: properly stop prefetching background threads (#7712)
Browse files Browse the repository at this point in the history
Explicitly stop and wait for prefetching background threads to terminate when the `ShardTriesInner` is dropped.

 This avoids that estimations are influenced by background threads left
 over from previous estimations, which we have observed since merging
 #7661.
  • Loading branch information
jakmeier authored and nikurt committed Nov 9, 2022
1 parent c60196e commit 9e26a1a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
100 changes: 75 additions & 25 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ use crate::{
metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig,
TrieStorage,
};
use crossbeam::select;
use near_o11y::metrics::prometheus;
use near_o11y::metrics::prometheus::core::GenericGauge;
use near_o11y::tracing::error;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;

const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024;
const MAX_PREFETCH_STAGING_MEMORY: usize = 200 * 1024 * 1024;
Expand Down Expand Up @@ -239,7 +242,7 @@ impl TrieStorage for TriePrefetchingStorage {
PrefetcherResult::Prefetched(value) => Ok(value),
PrefetcherResult::Pending => {
// yield once before calling `block_get` that will check for data to be present again.
std::thread::yield_now();
thread::yield_now();
self.prefetching
.blocking_get(hash.clone())
.or_else(|| {
Expand Down Expand Up @@ -334,7 +337,7 @@ impl PrefetchStagingArea {
Some(_) => (),
None => return None,
}
std::thread::sleep(std::time::Duration::from_micros(1));
thread::sleep(std::time::Duration::from_micros(1));
}
}

Expand Down Expand Up @@ -377,12 +380,12 @@ impl PrefetchStagingArea {
}

impl PrefetchApi {
pub fn new(
pub(crate) fn new(
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
trie_config: &TrieConfig,
) -> Self {
) -> (Self, PrefetchingThreadsHandle) {
let (work_queue_tx, work_queue_rx) = crossbeam::channel::bounded(MAX_QUEUED_WORK_ITEMS);
let sweat_prefetch_receivers = trie_config.sweat_prefetch_receivers.clone();
let sweat_prefetch_senders = trie_config.sweat_prefetch_senders.clone();
Expand All @@ -397,10 +400,19 @@ impl PrefetchApi {
sweat_prefetch_senders,
shard_uid,
};
for _ in 0..NUM_IO_THREADS {
this.start_io_thread(store.clone(), shard_cache.clone(), shard_uid.clone());
}
this
let (shutdown_tx, shutdown_rx) = crossbeam::channel::bounded(1);
let handles = (0..NUM_IO_THREADS)
.map(|_| {
this.start_io_thread(
store.clone(),
shard_cache.clone(),
shard_uid.clone(),
shutdown_rx.clone(),
)
})
.collect();
let handle = PrefetchingThreadsHandle { shutdown_channel: Some(shutdown_tx), handles };
(this, handle)
}

/// Returns the argument back if queue is full.
Expand All @@ -417,30 +429,44 @@ impl PrefetchApi {
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
) -> std::thread::JoinHandle<()> {
shutdown_rx: crossbeam::channel::Receiver<()>,
) -> thread::JoinHandle<()> {
let prefetcher_storage =
TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone());
let work_queue = self.work_queue_rx.clone();
let metric_prefetch_sent =
metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]);
let metric_prefetch_fail =
metrics::PREFETCH_FAIL.with_label_values(&[&shard_uid.shard_id.to_string()]);
std::thread::spawn(move || {
while let Ok((trie_root, trie_key)) = work_queue.recv() {
// Since the trie root can change,and since the root is not known at the time when the IO threads starts,
// we need to redefine the trie before each request.
// Note that the constructor of `Trie` is trivial, and the clone only clones a few `Arc`s, so the performance hit is small.
let prefetcher_trie =
Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None);
let storage_key = trie_key.to_vec();
metric_prefetch_sent.inc();
if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) {
near_o11y::io_trace!(count: "prefetch");
} else {
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
thread::spawn(move || {
loop {
let selected = select! {
recv(shutdown_rx) -> _ => None,
recv(work_queue) -> maybe_work_item => maybe_work_item.ok(),
};

match selected {
None => return,
Some((trie_root, trie_key)) => {
// Since the trie root can change,and since the root is
// not known at the time when the IO threads starts,
// we need to redefine the trie before each request.
// Note that the constructor of `Trie` is trivial, and
// the clone only clones a few `Arc`s, so the performance
// hit is small.
let prefetcher_trie =
Trie::new(Box::new(prefetcher_storage.clone()), trie_root, None);
let storage_key = trie_key.to_vec();
metric_prefetch_sent.inc();
if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) {
near_o11y::io_trace!(count: "prefetch");
} else {
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
}
}
}
}
})
Expand Down Expand Up @@ -469,6 +495,30 @@ fn prefetch_state_matches(expected: PrefetchSlot, actual: &PrefetchSlot) -> bool
}
}

/// Guard that owns the spawned prefetching IO threads.
#[must_use = "When dropping this handle, the IO threads will be aborted immediately."]
pub(crate) struct PrefetchingThreadsHandle {
/// Shutdown channel to all spawned threads.
shutdown_channel: Option<crossbeam::channel::Sender<()>>,
/// Join handles of spawned threads.
///
/// Used to actively join all background threads after shutting them down.
handles: Vec<thread::JoinHandle<()>>,
}

impl Drop for PrefetchingThreadsHandle {
fn drop(&mut self) {
// Dropping the single sender will hang up the channel and stop
// background threads.
self.shutdown_channel.take();
for handle in self.handles.drain(..) {
if let Err(e) = handle.join() {
error!("IO thread panicked joining failed, {e:?}");
}
}
}
}

/// Implementation to make testing from runtime possible.
///
/// Prefetching by design has no visible side-effects.
Expand Down
4 changes: 3 additions & 1 deletion core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_primitives::types::{

use crate::flat_state::FlatStateFactory;
use crate::trie::config::TrieConfig;
use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle;
use crate::trie::trie_storage::{TrieCache, TrieCachingStorage};
use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR};
use crate::{metrics, DBCol, DBOp, DBTransaction, PrefetchApi};
Expand All @@ -27,7 +28,7 @@ struct ShardTriesInner {
view_caches: RwLock<HashMap<ShardUId, TrieCache>>,
flat_state_factory: FlatStateFactory,
/// Prefetcher state, such as IO threads, per shard.
prefetchers: RwLock<HashMap<ShardUId, PrefetchApi>>,
prefetchers: RwLock<HashMap<ShardUId, (PrefetchApi, PrefetchingThreadsHandle)>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -136,6 +137,7 @@ impl ShardTries {
&self.0.trie_config,
)
})
.0
.clone()
});

Expand Down

0 comments on commit 9e26a1a

Please sign in to comment.