Skip to content

Commit

Permalink
Obscure deadlock repro (sort of)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Nov 17, 2023
1 parent 9febe26 commit 5e253de
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 13 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ exit-future = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
slog-term = { workspace = true }
slog-async = { workspace = true }
hiatus = "0.1.1"

[[test]]
name = "beacon_chain_tests"
Expand Down
52 changes: 52 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ use std::collections::HashSet;
use std::io::prelude::*;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Once;
use std::time::{Duration, Instant};
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::{
Expand All @@ -124,6 +125,8 @@ use types::payload::BlockProductionVersion;
use types::sidecar::BlobItems;
use types::*;

static INIT: Once = Once::new();

pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;

/// Alias to appease clippy.
Expand Down Expand Up @@ -2294,7 +2297,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// return `false`: where an attestation for the previous epoch nominates the pivot block
// which is the parent block of the finalized block. Such attestations are not useful, so
// this doesn't matter.
let step = if state.slot() == 41 {
INIT.call_once(|| {
println!("waiting for step 6");
drop(hiatus::step(6));
println!("executed step 6");
});
hiatus::Step::Dummy
} else {
hiatus::Step::Dummy
};
let fork_choice_lock = self.canonical_head.fork_choice_read_lock();
if state.slot() == 41 {
println!("got the fork choice lock!");
drop(step);
}
let block = fork_choice_lock
.get_block(block_root)
.ok_or(Error::AttestationHeadNotInForkChoice(*block_root))?;
Expand Down Expand Up @@ -3235,7 +3252,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by
// avoiding taking other locks whilst holding this lock.
if hiatus::is_enabled() {
println!("waiting for s3");
}
let step1 = hiatus::step(3);
if hiatus::is_enabled() {
println!("executing s3");
}
let mut fork_choice = self.canonical_head.fork_choice_write_lock();
if hiatus::is_enabled() {
println!("waiting for s7 (holding lock)");
let step = step1.then(7);
println!("executing step 7 and sleeping 5s");
drop(step);
std::thread::sleep(std::time::Duration::from_secs(5));

// FIXME(sproul): force tree hashing here, getting this to trigger in fork choice
// is too fiddly for my current level of patience
state.update_tree_hash_cache().unwrap();
}

// Do not import a block that doesn't descend from the finalized root.
let signed_block =
Expand Down Expand Up @@ -3413,6 +3448,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
if hiatus::is_enabled() {
println!("dropping fork choice lock");
}
drop(fork_choice);

// We're declaring the block "imported" at this point, since fork choice and the DB know
Expand Down Expand Up @@ -4001,6 +4039,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
) -> Result<(BeaconState<T::EthSpec>, Option<Hash256>), BlockProductionError> {
// Let the block producer finish running fork choice
let step = if slot == 41 {
println!("waiting for step 1");
hiatus::step(1)
} else {
hiatus::Step::Dummy
};
let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_FORK_CHOICE_TIMES);
self.wait_for_fork_choice_before_block_production(slot)?;
drop(fork_choice_timer);
Expand Down Expand Up @@ -4067,6 +4112,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

drop(state_load_timer);
drop(step);
println!("step 1 complete");

Ok((state, state_root_opt))
}
Expand Down Expand Up @@ -4792,6 +4839,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state)
};

if state.slot() == 41 {
println!("getting attestations step 2");
drop(hiatus::step(2));
println!("continuing step 2");
}
let mut attestations = self
.op_pool
.get_attestations(
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ where
.justified_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
println!("waiting for step 8");
/*
if 1 > 0 {
panic!("ugh");
}
*/
let step = hiatus::step(8);
println!("executing step 8");
let (_, state) = self
.store
.get_advanced_hot_state(
Expand All @@ -338,6 +346,7 @@ where
)
.map_err(Error::FailedToReadState)?
.ok_or_else(|| Error::MissingState(justified_block.state_root()))?;
drop(step);

self.justified_balances = JustifiedBalances::from_justified_state(&state)?;
}
Expand Down
73 changes: 68 additions & 5 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ async fn full_participation_no_skips() {

#[tokio::test]
async fn deadlock_block_production() {
let num_blocks_produced = E::slots_per_epoch() * 5;
use beacon_chain::*;
use types::payload::BlockProductionVersion;

let num_blocks_produced = E::slots_per_epoch() * 5 - 1;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
Expand All @@ -316,11 +319,71 @@ async fn deadlock_block_production() {
AttestationStrategy::AllValidators,
)
.await;
harness.advance_slot();
harness.advance_slot();

check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store);
check_chain_dump(&harness, num_blocks_produced + 1);
check_iterators(&harness);
// Add some attestations to the pool for the current epoch (5).
// This ensures that both op pool threads are busy
let canonical_head = harness.chain.canonical_head.cached_head();
let state = canonical_head.snapshot.beacon_state.clone();
let block = canonical_head.snapshot.beacon_block.clone();
let block_root = block.canonical_root();
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let attestations = harness.make_attestations(
&all_validators,
&state,
state.canonical_root(),
block_root.into(),
block.slot() + 1,
);
harness.process_attestations(attestations);

// Process a new block which misses the balances cache concurrently with the production of
// a new block.
println!("HERE");
hiatus::enable();
assert!(hiatus::is_enabled());
println!("enabling hiatus");

let import_block = async {
println!("importing block");
let ((block, blobs), _) = harness
.make_block(state, Slot::new(num_blocks_produced) + 1)
.await;
println!("slot {} block built", num_blocks_produced + 1);
harness
.chain
.canonical_head
.fork_choice_write_lock()
.fc_store
.flush_balances_cache();
let block_root = block.canonical_root();
harness
.chain
.process_block(
block_root,
Arc::new(block),
NotifyExecutionLayer::No,
|| Ok(()),
)
.await
.unwrap();
};
let produce_block = async {
println!("producing block");
harness
.chain
.produce_block_with_verification(
Signature::infinity().unwrap(),
Slot::new(num_blocks_produced) + 2,
None,
ProduceBlockVerification::NoVerification,
BlockProductionVersion::V3,
)
.await
.unwrap();
};
futures::future::join(import_block, produce_block).await;
}

#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/operation_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ serde = { workspace = true }
store = { workspace = true }
bitvec = { workspace = true }
rand = { workspace = true }
hiatus = "0.1.1"

[dev-dependencies]
beacon_chain = { workspace = true }
tokio = { workspace = true }
maplit = { workspace = true }

[features]
portable = ["beacon_chain/portable"]
portable = ["beacon_chain/portable"]
38 changes: 35 additions & 3 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,21 +306,53 @@ impl<T: EthSpec> OperationPool<T> {

let (prev_cover, curr_cover) = rayon::join(
move || {
println!("building block at slot {}", state.slot());
if state.slot() == 41 {
println!(
"waiting for s4 on t{}",
rayon::current_thread_index().unwrap()
);
drop(hiatus::step(4));
println!("continuing exec of s4");
}
let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME);
// If we're in the genesis epoch, just use the current epoch attestations.
if prev_epoch_key == curr_epoch_key {
let res = if prev_epoch_key == curr_epoch_key {
vec![]
} else {
maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations")
};
if state.slot() == 41 {
println!(
"finished prev epoch packing on t{}",
rayon::current_thread_index().unwrap()
);
}
res
},
move || {
println!("building block at slot {}", state.slot());
if state.slot() == 41 {
println!(
"waiting for s5 on t{}",
rayon::current_thread_index().unwrap()
);
drop(hiatus::step(5));
println!("continuing exec of s5");
}
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(
let res = maximum_cover(
curr_epoch_att,
T::MaxAttestations::to_usize(),
"curr_epoch_attestations",
)
);
if state.slot() == 41 {
println!(
"finished curr epoch packing on t{}",
rayon::current_thread_index().unwrap()
);
}
res
},
);

Expand Down
8 changes: 6 additions & 2 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,10 +743,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} else {
state_root
};
let state = self
let mut state = self
.load_hot_state(&state_root, state_processing_strategy)?
.map(|state| (state_root, state));
drop(split);
// FIXME(sproul): this COULD happen here
state
.as_mut()
.map(|(_, state)| state.update_tree_hash_cache().unwrap());
Ok(state)
}

Expand Down Expand Up @@ -2215,7 +2219,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

/// This function fills in missing block roots between last restore point slot and split
/// slot, if any.
/// slot, if any.
pub fn heal_freezer_block_roots(&self) -> Result<(), Error> {
let split = self.get_split_info();
let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point
Expand Down
3 changes: 2 additions & 1 deletion consensus/fork_choice/src/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ pub struct ForkChoiceView {
/// - Queuing of attestations from the current slot.
pub struct ForkChoice<T, E> {
/// Storage for `ForkChoice`, modelled off the spec `Store` object.
fc_store: T,
pub fc_store: T,
/// The underlying representation of the block DAG.
proto_array: ProtoArrayForkChoice,
/// Attestations that arrived at the current slot and must be queued for later processing.
Expand Down Expand Up @@ -1163,6 +1163,7 @@ where
}

// Process any attestations that might now be eligible.
// FIXME(sproul): put this back
self.process_attestation_queue()?;

Ok(self.fc_store.get_current_slot())
Expand Down
1 change: 1 addition & 0 deletions consensus/types/src/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,7 @@ impl<T: EthSpec> BeaconState<T> {
///
/// Initialize the tree hash cache if it isn't already initialized.
pub fn update_tree_hash_cache(&mut self) -> Result<Hash256, Error> {
println!("running tree hashing");
self.initialize_tree_hash_cache();

let cache = self.tree_hash_cache_mut().take();
Expand Down
Loading

0 comments on commit 5e253de

Please sign in to comment.