diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ceda7222e69..08e5db7b177 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2578,6 +2578,7 @@ impl BeaconChain { signature_verified_block.block_root(), signature_verified_block, notify_execution_layer, + || Ok(()), ) .await { @@ -2666,6 +2667,7 @@ impl BeaconChain { block_root: Hash256, unverified_block: B, notify_execution_layer: NotifyExecutionLayer, + publish_fn: impl FnOnce() -> Result<(), BlockError> + Send + 'static, ) -> Result> { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -2684,6 +2686,7 @@ impl BeaconChain { &chain, notify_execution_layer, )?; + publish_fn()?; chain .import_execution_pending_block(execution_pending) .await @@ -2725,7 +2728,7 @@ impl BeaconChain { } // The block failed verification. Err(other) => { - trace!( + debug!( self.log, "Beacon block rejected"; "reason" => other.to_string(), diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 3cb8fbdb524..492f492521e 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -52,6 +52,7 @@ use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier, }; +use crate::observed_block_producers::SeenBlock; use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -181,13 +182,6 @@ pub enum BlockError { /// /// The block is valid and we have already imported a block with this hash. BlockIsAlreadyKnown, - /// A block for this proposer and slot has already been observed. - /// - /// ## Peer scoring - /// - /// The `proposer` has already proposed a block at this slot. The existing block may or may not - /// be equal to the given block. - RepeatProposal { proposer: u64, slot: Slot }, /// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER. /// /// ## Peer scoring @@ -283,6 +277,13 @@ pub enum BlockError { /// problems to worry about than losing peers, and we're doing the network a favour by /// disconnecting. ParentExecutionPayloadInvalid { parent_root: Hash256 }, + /// The block is a slashable equivocation from the proposer. + /// + /// ## Peer scoring + /// + /// Honest peers shouldn't forward more than 1 equivocating block from the same proposer, so + /// we penalise them with a mid-tolerance error. + Slashable, } /// Returned when block validation failed due to some issue verifying @@ -631,6 +632,40 @@ pub struct ExecutionPendingBlock { pub payload_verification_handle: PayloadVerificationHandle, } +pub trait IntoGossipVerifiedBlock: Sized { + fn into_gossip_verified_block( + self, + chain: &BeaconChain, + ) -> Result, BlockError>; + fn inner(&self) -> Arc>; +} + +impl IntoGossipVerifiedBlock for GossipVerifiedBlock { + fn into_gossip_verified_block( + self, + _chain: &BeaconChain, + ) -> Result, BlockError> { + Ok(self) + } + + fn inner(&self) -> Arc> { + self.block.clone() + } +} + +impl IntoGossipVerifiedBlock for Arc> { + fn into_gossip_verified_block( + self, + chain: &BeaconChain, + ) -> Result, BlockError> { + GossipVerifiedBlock::new(self, chain) + } + + fn inner(&self) -> Arc> { + self.clone() + } +} + /// Implemented on types that can be converted into a `ExecutionPendingBlock`. /// /// Used to allow functions to accept blocks at various stages of verification. @@ -727,19 +762,6 @@ impl GossipVerifiedBlock { return Err(BlockError::BlockIsAlreadyKnown); } - // Check that we have not already received a block with a valid signature for this slot. - if chain - .observed_block_producers - .read() - .proposer_has_been_observed(block.message()) - .map_err(|e| BlockError::BeaconChainError(e.into()))? - { - return Err(BlockError::RepeatProposal { - proposer: block.message().proposer_index(), - slot: block.slot(), - }); - } - // Do not process a block that doesn't descend from the finalized root. // // We check this *before* we load the parent so that we can return a more detailed error. @@ -855,17 +877,16 @@ impl GossipVerifiedBlock { // // It's important to double-check that the proposer still hasn't been observed so we don't // have a race-condition when verifying two blocks simultaneously. - if chain + match chain .observed_block_producers .write() - .observe_proposer(block.message()) + .observe_proposal(block_root, block.message()) .map_err(|e| BlockError::BeaconChainError(e.into()))? { - return Err(BlockError::RepeatProposal { - proposer: block.message().proposer_index(), - slot: block.slot(), - }); - } + SeenBlock::Slashable => return Err(BlockError::Slashable), + SeenBlock::Duplicate => return Err(BlockError::BlockIsAlreadyKnown), + SeenBlock::UniqueNonSlashable => {} + }; if block.message().proposer_index() != expected_proposer as u64 { return Err(BlockError::IncorrectBlockProposer { @@ -1101,6 +1122,12 @@ impl ExecutionPendingBlock { chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, ) -> Result> { + chain + .observed_block_producers + .write() + .observe_proposal(block_root, block.message()) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(parent) = chain .canonical_head .fork_choice_read_lock() diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 84148fbfb18..9bb39396326 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -826,7 +826,6 @@ where observed_sync_aggregators: <_>::default(), // TODO: allow for persisting and loading the pool from disk. observed_block_producers: <_>::default(), - // TODO: allow for persisting and loading the pool from disk. observed_voluntary_exits: <_>::default(), observed_proposer_slashings: <_>::default(), observed_attester_slashings: <_>::default(), diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index e789b54a21b..50bcf426536 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -213,6 +213,7 @@ pub enum BeaconChainError { BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), + UnableToPublish, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index d672c168288..c5cf74e179c 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -64,7 +64,7 @@ pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ get_block_root, BlockError, ExecutionPayloadError, GossipVerifiedBlock, - IntoExecutionPendingBlock, + IntoExecutionPendingBlock, IntoGossipVerifiedBlock, }; pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; diff --git a/beacon_node/beacon_chain/src/observed_block_producers.rs b/beacon_node/beacon_chain/src/observed_block_producers.rs index b5995121b99..f76fc537967 100644 --- a/beacon_node/beacon_chain/src/observed_block_producers.rs +++ b/beacon_node/beacon_chain/src/observed_block_producers.rs @@ -1,9 +1,10 @@ //! Provides the `ObservedBlockProducers` struct which allows for rejecting gossip blocks from //! validators that have already produced a block. +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; -use types::{BeaconBlockRef, Epoch, EthSpec, Slot, Unsigned}; +use types::{BeaconBlockRef, Epoch, EthSpec, Hash256, Slot, Unsigned}; #[derive(Debug, PartialEq)] pub enum Error { @@ -14,6 +15,12 @@ pub enum Error { ValidatorIndexTooHigh(u64), } +#[derive(Eq, Hash, PartialEq, Debug, Default)] +struct ProposalKey { + slot: Slot, + proposer: u64, +} + /// Maintains a cache of observed `(block.slot, block.proposer)`. /// /// The cache supports pruning based upon the finalized epoch. It does not automatically prune, you @@ -27,7 +34,7 @@ pub enum Error { /// known_distinct_shufflings` which is much smaller. pub struct ObservedBlockProducers { finalized_slot: Slot, - items: HashMap>, + items: HashMap>, _phantom: PhantomData, } @@ -42,6 +49,24 @@ impl Default for ObservedBlockProducers { } } +pub enum SeenBlock { + Duplicate, + Slashable, + UniqueNonSlashable, +} + +impl SeenBlock { + pub fn proposer_previously_observed(self) -> bool { + match self { + Self::Duplicate | Self::Slashable => true, + Self::UniqueNonSlashable => false, + } + } + pub fn is_slashable(&self) -> bool { + matches!(self, Self::Slashable) + } +} + impl ObservedBlockProducers { /// Observe that the `block` was produced by `block.proposer_index` at `block.slot`. This will /// update `self` so future calls to it indicate that this block is known. @@ -52,16 +77,44 @@ impl ObservedBlockProducers { /// /// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`. /// - `block.slot` is equal to or less than the latest pruned `finalized_slot`. - pub fn observe_proposer(&mut self, block: BeaconBlockRef<'_, E>) -> Result { + pub fn observe_proposal( + &mut self, + block_root: Hash256, + block: BeaconBlockRef<'_, E>, + ) -> Result { self.sanitize_block(block)?; - let did_not_exist = self - .items - .entry(block.slot()) - .or_insert_with(|| HashSet::with_capacity(E::SlotsPerEpoch::to_usize())) - .insert(block.proposer_index()); - - Ok(!did_not_exist) + let key = ProposalKey { + slot: block.slot(), + proposer: block.proposer_index(), + }; + + let entry = self.items.entry(key); + + let slashable_proposal = match entry { + Entry::Occupied(mut occupied_entry) => { + let block_roots = occupied_entry.get_mut(); + let newly_inserted = block_roots.insert(block_root); + + let is_equivocation = block_roots.len() > 1; + + if is_equivocation { + SeenBlock::Slashable + } else if !newly_inserted { + SeenBlock::Duplicate + } else { + SeenBlock::UniqueNonSlashable + } + } + Entry::Vacant(vacant_entry) => { + let block_roots = HashSet::from([block_root]); + vacant_entry.insert(block_roots); + + SeenBlock::UniqueNonSlashable + } + }; + + Ok(slashable_proposal) } /// Returns `Ok(true)` if the `block` has been observed before, `Ok(false)` if not. Does not @@ -72,15 +125,33 @@ impl ObservedBlockProducers { /// /// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`. /// - `block.slot` is equal to or less than the latest pruned `finalized_slot`. - pub fn proposer_has_been_observed(&self, block: BeaconBlockRef<'_, E>) -> Result { + pub fn proposer_has_been_observed( + &self, + block: BeaconBlockRef<'_, E>, + block_root: Hash256, + ) -> Result { self.sanitize_block(block)?; - let exists = self - .items - .get(&block.slot()) - .map_or(false, |set| set.contains(&block.proposer_index())); - - Ok(exists) + let key = ProposalKey { + slot: block.slot(), + proposer: block.proposer_index(), + }; + + if let Some(block_roots) = self.items.get(&key) { + let block_already_known = block_roots.contains(&block_root); + let no_prev_known_blocks = + block_roots.difference(&HashSet::from([block_root])).count() == 0; + + if !no_prev_known_blocks { + Ok(SeenBlock::Slashable) + } else if block_already_known { + Ok(SeenBlock::Duplicate) + } else { + Ok(SeenBlock::UniqueNonSlashable) + } + } else { + Ok(SeenBlock::UniqueNonSlashable) + } } /// Returns `Ok(())` if the given `block` is sane. @@ -112,15 +183,15 @@ impl ObservedBlockProducers { } self.finalized_slot = finalized_slot; - self.items.retain(|slot, _set| *slot > finalized_slot); + self.items.retain(|key, _| key.slot > finalized_slot); } /// Returns `true` if the given `validator_index` has been stored in `self` at `epoch`. /// /// This is useful for doppelganger detection. pub fn index_seen_at_epoch(&self, validator_index: u64, epoch: Epoch) -> bool { - self.items.iter().any(|(slot, producers)| { - slot.epoch(E::slots_per_epoch()) == epoch && producers.contains(&validator_index) + self.items.iter().any(|(key, _)| { + key.slot.epoch(E::slots_per_epoch()) == epoch && key.proposer == validator_index }) } } @@ -148,9 +219,12 @@ mod tests { // Slot 0, proposer 0 let block_a = get_block(0, 0); + let block_root = block_a.canonical_root(); assert_eq!( - cache.observe_proposer(block_a.to_ref()), + cache + .observe_proposal(block_root, block_a.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe proposer, indicates proposer unobserved" ); @@ -164,7 +238,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(0)) + .get(&ProposalKey { + slot: Slot::new(0), + proposer: 0 + }) .expect("slot zero should be present") .len(), 1, @@ -182,7 +259,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(0)) + .get(&ProposalKey { + slot: Slot::new(0), + proposer: 0 + }) .expect("slot zero should be present") .len(), 1, @@ -207,9 +287,12 @@ mod tests { // First slot of finalized epoch, proposer 0 let block_b = get_block(E::slots_per_epoch(), 0); + let block_root_b = block_b.canonical_root(); assert_eq!( - cache.observe_proposer(block_b.to_ref()), + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Err(Error::FinalizedBlock { slot: E::slots_per_epoch().into(), finalized_slot: E::slots_per_epoch().into(), @@ -229,7 +312,9 @@ mod tests { let block_b = get_block(three_epochs, 0); assert_eq!( - cache.observe_proposer(block_b.to_ref()), + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can insert non-finalized block" ); @@ -238,7 +323,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(three_epochs)) + .get(&ProposalKey { + slot: Slot::new(three_epochs), + proposer: 0 + }) .expect("the three epochs slot should be present") .len(), 1, @@ -262,7 +350,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(three_epochs)) + .get(&ProposalKey { + slot: Slot::new(three_epochs), + proposer: 0 + }) .expect("the three epochs slot should be present") .len(), 1, @@ -276,24 +367,33 @@ mod tests { // Slot 0, proposer 0 let block_a = get_block(0, 0); + let block_root_a = block_a.canonical_root(); assert_eq!( - cache.proposer_has_been_observed(block_a.to_ref()), + cache + .proposer_has_been_observed(block_a.to_ref(), block_a.canonical_root()) + .map(|x| x.proposer_previously_observed()), Ok(false), "no observation in empty cache" ); assert_eq!( - cache.observe_proposer(block_a.to_ref()), + cache + .observe_proposal(block_root_a, block_a.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe proposer, indicates proposer unobserved" ); assert_eq!( - cache.proposer_has_been_observed(block_a.to_ref()), + cache + .proposer_has_been_observed(block_a.to_ref(), block_a.canonical_root()) + .map(|x| x.proposer_previously_observed()), Ok(true), "observed block is indicated as true" ); assert_eq!( - cache.observe_proposer(block_a.to_ref()), + cache + .observe_proposal(block_root_a, block_a.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(true), "observing again indicates true" ); @@ -303,7 +403,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(0)) + .get(&ProposalKey { + slot: Slot::new(0), + proposer: 0 + }) .expect("slot zero should be present") .len(), 1, @@ -312,24 +415,33 @@ mod tests { // Slot 1, proposer 0 let block_b = get_block(1, 0); + let block_root_b = block_b.canonical_root(); assert_eq!( - cache.proposer_has_been_observed(block_b.to_ref()), + cache + .proposer_has_been_observed(block_b.to_ref(), block_b.canonical_root()) + .map(|x| x.proposer_previously_observed()), Ok(false), "no observation for new slot" ); assert_eq!( - cache.observe_proposer(block_b.to_ref()), + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe proposer for new slot, indicates proposer unobserved" ); assert_eq!( - cache.proposer_has_been_observed(block_b.to_ref()), + cache + .proposer_has_been_observed(block_b.to_ref(), block_b.canonical_root()) + .map(|x| x.proposer_previously_observed()), Ok(true), "observed block in slot 1 is indicated as true" ); assert_eq!( - cache.observe_proposer(block_b.to_ref()), + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(true), "observing slot 1 again indicates true" ); @@ -339,7 +451,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(0)) + .get(&ProposalKey { + slot: Slot::new(0), + proposer: 0 + }) .expect("slot zero should be present") .len(), 1, @@ -348,7 +463,10 @@ mod tests { assert_eq!( cache .items - .get(&Slot::new(1)) + .get(&ProposalKey { + slot: Slot::new(1), + proposer: 0 + }) .expect("slot zero should be present") .len(), 1, @@ -357,45 +475,54 @@ mod tests { // Slot 0, proposer 1 let block_c = get_block(0, 1); + let block_root_c = block_c.canonical_root(); assert_eq!( - cache.proposer_has_been_observed(block_c.to_ref()), + cache + .proposer_has_been_observed(block_c.to_ref(), block_c.canonical_root()) + .map(|x| x.proposer_previously_observed()), Ok(false), "no observation for new proposer" ); assert_eq!( - cache.observe_proposer(block_c.to_ref()), + cache + .observe_proposal(block_root_c, block_c.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe new proposer, indicates proposer unobserved" ); assert_eq!( - cache.proposer_has_been_observed(block_c.to_ref()), + cache + .proposer_has_been_observed(block_c.to_ref(), block_c.canonical_root()) + .map(|x| x.proposer_previously_observed()), Ok(true), "observed new proposer block is indicated as true" ); assert_eq!( - cache.observe_proposer(block_c.to_ref()), + cache + .observe_proposal(block_root_c, block_c.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(true), "observing new proposer again indicates true" ); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); - assert_eq!(cache.items.len(), 2, "two slots should be present"); + assert_eq!(cache.items.len(), 3, "three slots should be present"); assert_eq!( cache .items - .get(&Slot::new(0)) - .expect("slot zero should be present") - .len(), + .iter() + .filter(|(k, _)| k.slot == cache.finalized_slot) + .count(), 2, "two proposers should be present in slot 0" ); assert_eq!( cache .items - .get(&Slot::new(1)) - .expect("slot zero should be present") - .len(), + .iter() + .filter(|(k, _)| k.slot == Slot::new(1)) + .count(), 1, "only one proposer should be present in slot 1" ); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 55ea016fbda..21f7248cee5 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -733,6 +733,15 @@ where state.get_block_root(slot).unwrap() == state.get_block_root(slot - 1).unwrap() } + pub async fn make_blinded_block( + &self, + state: BeaconState, + slot: Slot, + ) -> (SignedBlindedBeaconBlock, BeaconState) { + let (unblinded, new_state) = self.make_block(state, slot).await; + (unblinded.into(), new_state) + } + /// Returns a newly created block, signed by the proposer for the given slot. pub async fn make_block( &self, @@ -1692,7 +1701,12 @@ where self.set_current_slot(slot); let block_hash: SignedBeaconBlockHash = self .chain - .process_block(block_root, Arc::new(block), NotifyExecutionLayer::Yes) + .process_block( + block_root, + Arc::new(block), + NotifyExecutionLayer::Yes, + || Ok(()), + ) .await? .into(); self.chain.recompute_head_at_current_slot().await; @@ -1709,6 +1723,7 @@ where block.canonical_root(), Arc::new(block), NotifyExecutionLayer::Yes, + || Ok(()), ) .await? .into(); diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index a88931367f7..75b00b2b442 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -351,6 +351,7 @@ async fn assert_invalid_signature( snapshots[block_index].beacon_block.canonical_root(), snapshots[block_index].beacon_block.clone(), NotifyExecutionLayer::Yes, + || Ok(()), ) .await; assert!( @@ -415,6 +416,7 @@ async fn invalid_signature_gossip_block() { signed_block.canonical_root(), Arc::new(signed_block), NotifyExecutionLayer::Yes, + || Ok(()), ) .await, Err(BlockError::InvalidSignature) @@ -727,6 +729,7 @@ async fn block_gossip_verification() { gossip_verified.block_root, gossip_verified, NotifyExecutionLayer::Yes, + || Ok(()), ) .await .expect("should import valid gossip verified block"); @@ -923,11 +926,7 @@ async fn block_gossip_verification() { assert!( matches!( unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), - BlockError::RepeatProposal { - proposer, - slot, - } - if proposer == other_proposer && slot == block.message().slot() + BlockError::BlockIsAlreadyKnown, ), "should register any valid signature against the proposer, even if the block failed later verification" ); @@ -956,11 +955,7 @@ async fn block_gossip_verification() { .await .err() .expect("should error when processing known block"), - BlockError::RepeatProposal { - proposer, - slot, - } - if proposer == block.message().proposer_index() && slot == block.message().slot() + BlockError::BlockIsAlreadyKnown ), "the second proposal by this validator should be rejected" ); @@ -998,6 +993,7 @@ async fn verify_block_for_gossip_slashing_detection() { verified_block.block_root, verified_block, NotifyExecutionLayer::Yes, + || Ok(()), ) .await .unwrap(); @@ -1037,6 +1033,7 @@ async fn verify_block_for_gossip_doppelganger_detection() { verified_block.block_root, verified_block, NotifyExecutionLayer::Yes, + || Ok(()), ) .await .unwrap(); @@ -1184,6 +1181,7 @@ async fn add_base_block_to_altair_chain() { base_block.canonical_root(), Arc::new(base_block.clone()), NotifyExecutionLayer::Yes, + || Ok(()), ) .await .err() @@ -1318,6 +1316,7 @@ async fn add_altair_block_to_base_chain() { altair_block.canonical_root(), Arc::new(altair_block.clone()), NotifyExecutionLayer::Yes, + || Ok(()), ) .await .err() diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index c39bdeaf366..018defd2f00 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -697,6 +697,7 @@ async fn invalidates_all_descendants() { fork_block.canonical_root(), Arc::new(fork_block), NotifyExecutionLayer::Yes, + || Ok(()), ) .await .unwrap(); @@ -793,6 +794,7 @@ async fn switches_heads() { fork_block.canonical_root(), Arc::new(fork_block), NotifyExecutionLayer::Yes, + || Ok(()), ) .await .unwrap(); @@ -1046,7 +1048,9 @@ async fn invalid_parent() { // Ensure the block built atop an invalid payload is invalid for import. assert!(matches!( - rig.harness.chain.process_block(block.canonical_root(), block.clone(), NotifyExecutionLayer::Yes).await, + rig.harness.chain.process_block(block.canonical_root(), block.clone(), NotifyExecutionLayer::Yes, + || Ok(()), + ).await, Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root }) if invalid_root == parent_root )); @@ -1332,7 +1336,12 @@ async fn build_optimistic_chain( for block in blocks { rig.harness .chain - .process_block(block.canonical_root(), block, NotifyExecutionLayer::Yes) + .process_block( + block.canonical_root(), + block, + NotifyExecutionLayer::Yes, + || Ok(()), + ) .await .unwrap(); } @@ -1892,6 +1901,7 @@ async fn recover_from_invalid_head_by_importing_blocks() { fork_block.canonical_root(), fork_block.clone(), NotifyExecutionLayer::Yes, + || Ok(()), ) .await .unwrap(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 0bc7798a7ff..29027748259 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2151,6 +2151,7 @@ async fn weak_subjectivity_sync() { full_block.canonical_root(), Arc::new(full_block), NotifyExecutionLayer::Yes, + || Ok(()), ) .await .unwrap(); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index f97f7069dce..c5b2892cbdb 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -687,6 +687,7 @@ async fn run_skip_slot_test(skip_slots: u64) { harness_a.chain.head_snapshot().beacon_block_root, harness_a.chain.head_snapshot().beacon_block.clone(), NotifyExecutionLayer::Yes, + || Ok(()) ) .await .unwrap(), diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 025b54f326e..93bfe524bc2 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -31,8 +31,8 @@ use beacon_chain::{ pub use block_id::BlockId; use directory::DEFAULT_ROOT_DIR; use eth2::types::{ - self as api_types, EndpointVersion, ForkChoice, ForkChoiceNode, SkipRandaoVerification, - ValidatorId, ValidatorStatus, + self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode, + SkipRandaoVerification, ValidatorId, ValidatorStatus, }; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; @@ -40,7 +40,9 @@ use logging::SSELoggingComponents; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; -use publish_blocks::ProvenancedBlock; +pub use publish_blocks::{ + publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock, +}; use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; @@ -324,6 +326,7 @@ pub fn serve( }; let eth_v1 = single_version(V1); + let eth_v2 = single_version(V2); // Create a `warp` filter that provides access to the network globals. let inner_network_globals = ctx.network_globals.clone(); @@ -1222,16 +1225,55 @@ pub fn serve( log: Logger| async move { publish_blocks::publish_block( None, - ProvenancedBlock::Local(block), + ProvenancedBlock::local(block), chain, &network_tx, log, + BroadcastValidation::default(), ) .await .map(|()| warp::reply().into_response()) }, ); + let post_beacon_blocks_v2 = eth_v2 + .and(warp::path("beacon")) + .and(warp::path("blocks")) + .and(warp::query::()) + .and(warp::path::end()) + .and(warp::body::json()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |validation_level: api_types::BroadcastValidationQuery, + block: Arc>, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| async move { + match publish_blocks::publish_block( + None, + ProvenancedBlock::local(block), + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + { + Ok(()) => warp::reply().into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + StatusCode::INTERNAL_SERVER_ERROR, + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } + }, + ); + /* * beacon/blocks */ @@ -1250,9 +1292,52 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| async move { - publish_blocks::publish_blinded_block(block, chain, &network_tx, log) - .await - .map(|()| warp::reply().into_response()) + publish_blocks::publish_blinded_block( + block, + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }, + ); + + let post_beacon_blinded_blocks_v2 = eth_v2 + .and(warp::path("beacon")) + .and(warp::path("blinded_blocks")) + .and(warp::query::()) + .and(warp::path::end()) + .and(warp::body::json()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |validation_level: api_types::BroadcastValidationQuery, + block: SignedBeaconBlock>, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| async move { + match publish_blocks::publish_blinded_block( + block, + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + { + Ok(()) => warp::reply().into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + StatusCode::INTERNAL_SERVER_ERROR, + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } }, ); @@ -3847,6 +3932,8 @@ pub fn serve( warp::post().and( post_beacon_blocks .uor(post_beacon_blinded_blocks) + .uor(post_beacon_blocks_v2) + .uor(post_beacon_blinded_blocks_v2) .uor(post_beacon_pool_attestations) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 8bcad6ba401..0f2f7b361c7 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,11 +1,16 @@ use crate::metrics; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer}; +use beacon_chain::{ + BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, IntoGossipVerifiedBlock, + NotifyExecutionLayer, +}; +use eth2::types::BroadcastValidation; use execution_layer::ProvenancedPayload; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; +use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; @@ -16,45 +21,115 @@ use types::{ }; use warp::Rejection; -pub enum ProvenancedBlock { +pub enum ProvenancedBlock> { /// The payload was built using a local EE. - Local(Arc>>), + Local(B, PhantomData), /// The payload was build using a remote builder (e.g., via a mev-boost /// compatible relay). - Builder(Arc>>), + Builder(B, PhantomData), +} + +impl> ProvenancedBlock { + pub fn local(block: B) -> Self { + Self::Local(block, PhantomData) + } + + pub fn builder(block: B) -> Self { + Self::Builder(block, PhantomData) + } } /// Handles a request from the HTTP API for full blocks. -pub async fn publish_block( +pub async fn publish_block>( block_root: Option, - provenanced_block: ProvenancedBlock, + provenanced_block: ProvenancedBlock, chain: Arc>, network_tx: &UnboundedSender>, log: Logger, + validation_level: BroadcastValidation, ) -> Result<(), Rejection> { let seen_timestamp = timestamp_now(); let (block, is_locally_built_block) = match provenanced_block { - ProvenancedBlock::Local(block) => (block, true), - ProvenancedBlock::Builder(block) => (block, false), + ProvenancedBlock::Local(block, _) => (block, true), + ProvenancedBlock::Builder(block, _) => (block, false), }; - let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); + let beacon_block = block.inner(); + let delay = get_block_delay_ms(seen_timestamp, beacon_block.message(), &chain.slot_clock); + debug!(log, "Signed block received in HTTP API"; "slot" => beacon_block.slot()); - debug!( - log, - "Signed block published to HTTP API"; - "slot" => block.slot() - ); + /* actually publish a block */ + let publish_block = move |block: Arc>, + sender, + log, + seen_timestamp| { + let publish_timestamp = timestamp_now(); + let publish_delay = publish_timestamp + .checked_sub(seen_timestamp) + .unwrap_or_else(|| Duration::from_secs(0)); - // Send the block, regardless of whether or not it is valid. The API - // specification is very clear that this is the desired behaviour. + info!(log, "Signed block published to network via HTTP API"; "slot" => block.slot(), "publish_delay" => ?publish_delay); - let message = PubsubMessage::BeaconBlock(block.clone()); - crate::publish_pubsub_message(network_tx, message)?; + let message = PubsubMessage::BeaconBlock(block); + crate::publish_pubsub_message(&sender, message) + .map_err(|_| BeaconChainError::UnableToPublish.into()) + }; - let block_root = block_root.unwrap_or_else(|| block.canonical_root()); + /* if we can form a `GossipVerifiedBlock`, we've passed our basic gossip checks */ + let gossip_verified_block = block.into_gossip_verified_block(&chain).map_err(|e| { + warn!(log, "Not publishing block, not gossip verified"; "slot" => beacon_block.slot(), "error" => ?e); + warp_utils::reject::custom_bad_request(e.to_string()) + })?; + + let block_root = block_root.unwrap_or(gossip_verified_block.block_root); + + if let BroadcastValidation::Gossip = validation_level { + publish_block( + beacon_block.clone(), + network_tx.clone(), + log.clone(), + seen_timestamp, + ) + .map_err(|_| warp_utils::reject::custom_server_error("unable to publish".into()))?; + } + + /* only publish if gossip- and consensus-valid and equivocation-free */ + let chain_clone = chain.clone(); + let block_clone = beacon_block.clone(); + let log_clone = log.clone(); + let sender_clone = network_tx.clone(); + + let publish_fn = move || match validation_level { + BroadcastValidation::Gossip => Ok(()), + BroadcastValidation::Consensus => { + publish_block(block_clone, sender_clone, log_clone, seen_timestamp) + } + BroadcastValidation::ConsensusAndEquivocation => { + if chain_clone + .observed_block_producers + .read() + .proposer_has_been_observed(block_clone.message(), block_root) + .map_err(|e| BlockError::BeaconChainError(e.into()))? + .is_slashable() + { + warn!( + log_clone, + "Not publishing equivocating block"; + "slot" => block_clone.slot() + ); + Err(BlockError::Slashable) + } else { + publish_block(block_clone, sender_clone, log_clone, seen_timestamp) + } + } + }; match chain - .process_block(block_root, block.clone(), NotifyExecutionLayer::Yes) + .process_block( + block_root, + gossip_verified_block, + NotifyExecutionLayer::Yes, + publish_fn, + ) .await { Ok(root) => { @@ -63,14 +138,14 @@ pub async fn publish_block( "Valid block from HTTP API"; "block_delay" => ?delay, "root" => format!("{}", root), - "proposer_index" => block.message().proposer_index(), - "slot" => block.slot(), + "proposer_index" => beacon_block.message().proposer_index(), + "slot" => beacon_block.slot(), ); // Notify the validator monitor. chain.validator_monitor.read().register_api_block( seen_timestamp, - block.message(), + beacon_block.message(), root, &chain.slot_clock, ); @@ -83,40 +158,44 @@ pub async fn publish_block( // blocks built with builders we consider the broadcast time to be // when the blinded block is published to the builder. if is_locally_built_block { - late_block_logging(&chain, seen_timestamp, block.message(), root, "local", &log) + late_block_logging( + &chain, + seen_timestamp, + beacon_block.message(), + root, + "local", + &log, + ) } Ok(()) } - Err(BlockError::BlockIsAlreadyKnown) => { - info!( - log, - "Block from HTTP API already known"; - "block" => ?block.canonical_root(), - "slot" => block.slot(), - ); - Ok(()) + Err(BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) => { + Err(warp_utils::reject::custom_server_error( + "unable to publish to network channel".to_string(), + )) } - Err(BlockError::RepeatProposal { proposer, slot }) => { - warn!( - log, - "Block ignored due to repeat proposal"; - "msg" => "this can happen when a VC uses fallback BNs. \ - whilst this is not necessarily an error, it can indicate issues with a BN \ - or between the VC and BN.", - "slot" => slot, - "proposer" => proposer, - ); + Err(BlockError::Slashable) => Err(warp_utils::reject::custom_bad_request( + "proposal for this slot and proposer has already been seen".to_string(), + )), + Err(BlockError::BlockIsAlreadyKnown) => { + info!(log, "Block from HTTP API already known"; "block" => ?block_root); Ok(()) } Err(e) => { - let msg = format!("{:?}", e); - error!( - log, - "Invalid block provided to HTTP API"; - "reason" => &msg - ); - Err(warp_utils::reject::broadcast_without_import(msg)) + if let BroadcastValidation::Gossip = validation_level { + Err(warp_utils::reject::broadcast_without_import(format!("{e}"))) + } else { + let msg = format!("{:?}", e); + error!( + log, + "Invalid block provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::custom_bad_request(format!( + "Invalid block: {e}" + ))) + } } } } @@ -128,21 +207,31 @@ pub async fn publish_blinded_block( chain: Arc>, network_tx: &UnboundedSender>, log: Logger, + validation_level: BroadcastValidation, ) -> Result<(), Rejection> { let block_root = block.canonical_root(); - let full_block = reconstruct_block(chain.clone(), block_root, block, log.clone()).await?; - publish_block::(Some(block_root), full_block, chain, network_tx, log).await + let full_block: ProvenancedBlock>> = + reconstruct_block(chain.clone(), block_root, block, log.clone()).await?; + publish_block::( + Some(block_root), + full_block, + chain, + network_tx, + log, + validation_level, + ) + .await } /// Deconstruct the given blinded block, and construct a full block. This attempts to use the /// execution layer's payload cache, and if that misses, attempts a blind block proposal to retrieve /// the full payload. -async fn reconstruct_block( +pub async fn reconstruct_block( chain: Arc>, block_root: Hash256, block: SignedBeaconBlock>, log: Logger, -) -> Result, Rejection> { +) -> Result>>, Rejection> { let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() { let el = chain.execution_layer.as_ref().ok_or_else(|| { warp_utils::reject::custom_server_error("Missing execution layer".to_string()) @@ -208,15 +297,15 @@ async fn reconstruct_block( None => block .try_into_full_block(None) .map(Arc::new) - .map(ProvenancedBlock::Local), + .map(ProvenancedBlock::local), Some(ProvenancedPayload::Local(full_payload)) => block .try_into_full_block(Some(full_payload)) .map(Arc::new) - .map(ProvenancedBlock::Local), + .map(ProvenancedBlock::local), Some(ProvenancedPayload::Builder(full_payload)) => block .try_into_full_block(Some(full_payload)) .map(Arc::new) - .map(ProvenancedBlock::Builder), + .map(ProvenancedBlock::builder), } .ok_or_else(|| { warp_utils::reject::custom_server_error("Unable to add payload to block".to_string()) diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs new file mode 100644 index 00000000000..4819dd99e7a --- /dev/null +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -0,0 +1,1270 @@ +use beacon_chain::{ + test_utils::{AttestationStrategy, BlockStrategy}, + GossipVerifiedBlock, +}; +use eth2::types::{BroadcastValidation, SignedBeaconBlock, SignedBlindedBeaconBlock}; +use http_api::test_utils::InteractiveTester; +use http_api::{publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock}; +use tree_hash::TreeHash; +use types::{Hash256, MainnetEthSpec, Slot}; +use warp::Rejection; +use warp_utils::reject::CustomBadRequest; + +use eth2::reqwest::StatusCode; + +type E = MainnetEthSpec; + +/* + * We have the following test cases, which are duplicated for the blinded variant of the route: + * + * - `broadcast_validation=gossip` + * - Invalid (400) + * - Full Pass (200) + * - Partial Pass (202) + * - `broadcast_validation=consensus` + * - Invalid (400) + * - Only gossip (400) + * - Only consensus pass (i.e., equivocates) (200) + * - Full pass (200) + * - `broadcast_validation=consensus_and_equivocation` + * - Invalid (400) + * - Invalid due to early equivocation (400) + * - Only gossip (400) + * - Only consensus (400) + * - Pass (200) + * + */ + +/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn gossip_invalid() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero(); + *b.parent_root_mut() = Hash256::zero(); + }) + .await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()) + ); +} + +/// This test checks that a block that is valid from a gossip perspective is accepted when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn gossip_partial_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::random() + }) + .await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + assert!(response.is_err()); + + let error_response = response.unwrap_err(); + + assert_eq!(error_response.status(), Some(StatusCode::ACCEPTED)); +} + +// This test checks that a block that is valid from both a gossip and consensus perspective is accepted when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn gossip_full_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester.harness.make_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn consensus_invalid() { + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero(); + *b.parent_root_mut() = Hash256::zero(); + }) + .await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()) + ); +} + +/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn consensus_gossip() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(state_a, slot_b, |b| *b.state_root_mut() = Hash256::zero()) + .await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: Invalid block: StateRootMismatch { block: 0x0000000000000000000000000000000000000000000000000000000000000000, local: 0xfc675d642ff7a06458eb33c7d7b62a5813e34d1b2bb1aee3e395100b579da026 }".to_string()) + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective, but nonetheless equivocates, is accepted when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn consensus_partial_pass_only_consensus() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + let test_logger = tester.harness.logger().clone(); + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block_a, state_after_a): (SignedBeaconBlock, _) = + tester.harness.make_block(state_a.clone(), slot_b).await; + let (block_b, state_after_b): (SignedBeaconBlock, _) = + tester.harness.make_block(state_a, slot_b).await; + + /* check for `make_block` curios */ + assert_eq!(block_a.state_root(), state_after_a.tree_hash_root()); + assert_eq!(block_b.state_root(), state_after_b.tree_hash_root()); + assert_ne!(block_a.state_root(), block_b.state_root()); + + let gossip_block_b = GossipVerifiedBlock::new(block_b.clone().into(), &tester.harness.chain); + assert!(gossip_block_b.is_ok()); + let gossip_block_a = GossipVerifiedBlock::new(block_a.clone().into(), &tester.harness.chain); + assert!(gossip_block_a.is_err()); + + /* submit `block_b` which should induce equivocation */ + let channel = tokio::sync::mpsc::unbounded_channel(); + + let publication_result: Result<(), Rejection> = publish_block( + None, + ProvenancedBlock::local(gossip_block_b.unwrap()), + tester.harness.chain.clone(), + &channel.0, + test_logger, + validation_level.unwrap(), + ) + .await; + + assert!(publication_result.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block_b.canonical_root())); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective is accepted when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn consensus_full_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester.harness.make_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn equivocation_invalid() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero(); + *b.parent_root_mut() = Hash256::zero(); + }) + .await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()) + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn equivocation_consensus_early_equivocation() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block_a, state_after_a): (SignedBeaconBlock, _) = + tester.harness.make_block(state_a.clone(), slot_b).await; + let (block_b, state_after_b): (SignedBeaconBlock, _) = + tester.harness.make_block(state_a, slot_b).await; + + /* check for `make_block` curios */ + assert_eq!(block_a.state_root(), state_after_a.tree_hash_root()); + assert_eq!(block_b.state_root(), state_after_b.tree_hash_root()); + assert_ne!(block_a.state_root(), block_b.state_root()); + + /* submit `block_a` as valid */ + assert!(tester + .client + .post_beacon_blocks_v2(&block_a, validation_level) + .await + .is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block_a.canonical_root())); + + /* submit `block_b` which should induce equivocation */ + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block_b, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: Slashable".to_string()) + ); +} + +/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn equivocation_gossip() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(state_a, slot_b, |b| *b.state_root_mut() = Hash256::zero()) + .await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: Invalid block: StateRootMismatch { block: 0x0000000000000000000000000000000000000000000000000000000000000000, local: 0xfc675d642ff7a06458eb33c7d7b62a5813e34d1b2bb1aee3e395100b579da026 }".to_string()) + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective but that equivocates **late** is rejected when using `broadcast_validation=consensus_and_equivocation`. +/// +/// This test is unique in that we can't actually test the HTTP API directly, but instead have to hook into the `publish_blocks` code manually. This is in order to handle the late equivocation case. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn equivocation_consensus_late_equivocation() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + let test_logger = tester.harness.logger().clone(); + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block_a, state_after_a): (SignedBeaconBlock, _) = + tester.harness.make_block(state_a.clone(), slot_b).await; + let (block_b, state_after_b): (SignedBeaconBlock, _) = + tester.harness.make_block(state_a, slot_b).await; + + /* check for `make_block` curios */ + assert_eq!(block_a.state_root(), state_after_a.tree_hash_root()); + assert_eq!(block_b.state_root(), state_after_b.tree_hash_root()); + assert_ne!(block_a.state_root(), block_b.state_root()); + + let gossip_block_b = GossipVerifiedBlock::new(block_b.clone().into(), &tester.harness.chain); + assert!(gossip_block_b.is_ok()); + let gossip_block_a = GossipVerifiedBlock::new(block_a.clone().into(), &tester.harness.chain); + assert!(gossip_block_a.is_err()); + + let channel = tokio::sync::mpsc::unbounded_channel(); + + let publication_result: Result<(), Rejection> = publish_block( + None, + ProvenancedBlock::local(gossip_block_b.unwrap()), + tester.harness.chain, + &channel.0, + test_logger, + validation_level.unwrap(), + ) + .await; + + assert!(publication_result.is_err()); + + let publication_error = publication_result.unwrap_err(); + + assert!(publication_error.find::().is_some()); + + assert_eq!( + *publication_error.find::().unwrap().0, + "proposal for this slot and proposer has already been seen".to_string() + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective (and does not equivocate) is accepted when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn equivocation_full_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester.harness.make_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_gossip_invalid() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero(); + *b.parent_root_mut() = Hash256::zero(); + }) + .await; + + let blinded_block: SignedBlindedBeaconBlock = block.into(); + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&blinded_block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()) + ); +} + +/// This test checks that a block that is valid from a gossip perspective is accepted when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_gossip_partial_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero() + }) + .await; + + let blinded_block: SignedBlindedBeaconBlock = block.into(); + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&blinded_block, validation_level) + .await; + assert!(response.is_err()); + + let error_response = response.unwrap_err(); + + assert_eq!(error_response.status(), Some(StatusCode::ACCEPTED)); +} + +// This test checks that a block that is valid from both a gossip and consensus perspective is accepted when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_gossip_full_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBlindedBeaconBlock, _) = + tester.harness.make_blinded_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_consensus_invalid() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero(); + *b.parent_root_mut() = Hash256::zero(); + }) + .await; + + let blinded_block: SignedBlindedBeaconBlock = block.into(); + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&blinded_block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()) + ); +} + +/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_consensus_gossip() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(state_a, slot_b, |b| *b.state_root_mut() = Hash256::zero()) + .await; + + let blinded_block: SignedBlindedBeaconBlock = block.into(); + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&blinded_block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: Invalid block: StateRootMismatch { block: 0x0000000000000000000000000000000000000000000000000000000000000000, local: 0xfc675d642ff7a06458eb33c7d7b62a5813e34d1b2bb1aee3e395100b579da026 }".to_string()) + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective is accepted when using `broadcast_validation=consensus`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_consensus_full_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Consensus); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBlindedBeaconBlock, _) = + tester.harness.make_blinded_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_equivocation_invalid() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let chain_state_before = tester.harness.get_current_state(); + let slot = chain_state_before.slot() + 1; + + tester.harness.advance_slot(); + + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(chain_state_before, slot, |b| { + *b.state_root_mut() = Hash256::zero(); + *b.parent_root_mut() = Hash256::zero(); + }) + .await; + + let blinded_block: SignedBlindedBeaconBlock = block.into(); + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&blinded_block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()) + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_equivocation_consensus_early_equivocation() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block_a, state_after_a): (SignedBlindedBeaconBlock, _) = tester + .harness + .make_blinded_block(state_a.clone(), slot_b) + .await; + let (block_b, state_after_b): (SignedBlindedBeaconBlock, _) = + tester.harness.make_blinded_block(state_a, slot_b).await; + + /* check for `make_blinded_block` curios */ + assert_eq!(block_a.state_root(), state_after_a.tree_hash_root()); + assert_eq!(block_b.state_root(), state_after_b.tree_hash_root()); + assert_ne!(block_a.state_root(), block_b.state_root()); + + /* submit `block_a` as valid */ + assert!(tester + .client + .post_beacon_blinded_blocks_v2(&block_a, validation_level) + .await + .is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block_a.canonical_root())); + + /* submit `block_b` which should induce equivocation */ + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&block_b, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: Slashable".to_string()) + ); +} + +/// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_equivocation_gossip() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester + .harness + .make_block_with_modifier(state_a, slot_b, |b| *b.state_root_mut() = Hash256::zero()) + .await; + + let blinded_block: SignedBlindedBeaconBlock = block.into(); + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blinded_blocks_v2(&blinded_block, validation_level) + .await; + assert!(response.is_err()); + + let error_response: eth2::Error = response.err().unwrap(); + + /* mandated by Beacon API spec */ + assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); + + assert!( + matches!(error_response, eth2::Error::ServerMessage(err) if err.message == "BAD_REQUEST: Invalid block: StateRootMismatch { block: 0x0000000000000000000000000000000000000000000000000000000000000000, local: 0xfc675d642ff7a06458eb33c7d7b62a5813e34d1b2bb1aee3e395100b579da026 }".to_string()) + ); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective but that equivocates **late** is rejected when using `broadcast_validation=consensus_and_equivocation`. +/// +/// This test is unique in that we can't actually test the HTTP API directly, but instead have to hook into the `publish_blocks` code manually. This is in order to handle the late equivocation case. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_equivocation_consensus_late_equivocation() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + let test_logger = tester.harness.logger().clone(); + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block_a, state_after_a): (SignedBlindedBeaconBlock, _) = tester + .harness + .make_blinded_block(state_a.clone(), slot_b) + .await; + let (block_b, state_after_b): (SignedBlindedBeaconBlock, _) = + tester.harness.make_blinded_block(state_a, slot_b).await; + + /* check for `make_blinded_block` curios */ + assert_eq!(block_a.state_root(), state_after_a.tree_hash_root()); + assert_eq!(block_b.state_root(), state_after_b.tree_hash_root()); + assert_ne!(block_a.state_root(), block_b.state_root()); + + let unblinded_block_a = reconstruct_block( + tester.harness.chain.clone(), + block_a.state_root(), + block_a, + test_logger.clone(), + ) + .await + .unwrap(); + let unblinded_block_b = reconstruct_block( + tester.harness.chain.clone(), + block_b.clone().state_root(), + block_b.clone(), + test_logger.clone(), + ) + .await + .unwrap(); + + let inner_block_a = match unblinded_block_a { + ProvenancedBlock::Local(a, _) => a, + ProvenancedBlock::Builder(a, _) => a, + }; + let inner_block_b = match unblinded_block_b { + ProvenancedBlock::Local(b, _) => b, + ProvenancedBlock::Builder(b, _) => b, + }; + + let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain); + assert!(gossip_block_b.is_ok()); + let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain); + assert!(gossip_block_a.is_err()); + + let channel = tokio::sync::mpsc::unbounded_channel(); + + let publication_result: Result<(), Rejection> = publish_blinded_block( + block_b, + tester.harness.chain, + &channel.0, + test_logger, + validation_level.unwrap(), + ) + .await; + + assert!(publication_result.is_err()); + + let publication_error: Rejection = publication_result.unwrap_err(); + + assert!(publication_error.find::().is_some()); +} + +/// This test checks that a block that is valid from both a gossip and consensus perspective (and does not equivocate) is accepted when using `broadcast_validation=consensus_and_equivocation`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blinded_equivocation_full_pass() { + /* this test targets gossip-level validation */ + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBlindedBeaconBlock, _) = + tester.harness.make_blinded_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} diff --git a/beacon_node/http_api/tests/main.rs b/beacon_node/http_api/tests/main.rs index f5916d8506a..e0636424e48 100644 --- a/beacon_node/http_api/tests/main.rs +++ b/beacon_node/http_api/tests/main.rs @@ -1,5 +1,6 @@ #![cfg(not(debug_assertions))] // Tests are too slow in debug. +pub mod broadcast_validation_tests; pub mod fork_tests; pub mod interactive_tests; pub mod status_tests; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index a6c49ddaeef..bcd192c699e 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -159,7 +159,7 @@ impl ApiTester { // `make_block` adds random graffiti, so this will produce an alternate block let (reorg_block, _reorg_state) = harness - .make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()) + .make_block(head.beacon_state.clone(), harness.chain.slot().unwrap() + 1) .await; let head_state_root = head.beacon_state_root(); @@ -1248,14 +1248,23 @@ impl ApiTester { } pub async fn test_post_beacon_blocks_invalid(mut self) -> Self { - let mut next_block = self.next_block.clone(); - *next_block.message_mut().proposer_index_mut() += 1; + let block = self + .harness + .make_block_with_modifier( + self.harness.get_current_state(), + self.harness.get_current_slot(), + |b| { + *b.state_root_mut() = Hash256::zero(); + }, + ) + .await + .0; - assert!(self.client.post_beacon_blocks(&next_block).await.is_err()); + assert!(self.client.post_beacon_blocks(&block).await.is_err()); assert!( self.network_rx.network_recv.recv().await.is_some(), - "invalid blocks should be sent to network" + "gossip valid blocks should be sent to network" ); self @@ -4126,7 +4135,7 @@ impl ApiTester { .unwrap(); let expected_reorg = EventKind::ChainReorg(SseChainReorg { - slot: self.next_block.slot(), + slot: self.reorg_block.slot(), depth: 1, old_head_block: self.next_block.canonical_root(), old_head_state: self.next_block.state_root(), @@ -4136,6 +4145,8 @@ impl ApiTester { execution_optimistic: false, }); + self.harness.advance_slot(); + self.client .post_beacon_blocks(&self.reorg_block) .await diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 185634c3081..91ec81b18d3 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -785,6 +785,20 @@ impl Worker { verified_block } + Err(e @ BlockError::Slashable) => { + warn!( + self.log, + "Received equivocating block from peer"; + "error" => ?e + ); + /* punish peer for submitting an equivocation, but not too harshly as honest peers may conceivably forward equivocating blocks to us from time to time */ + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "gossip_block_mid", + ); + return None; + } Err(BlockError::ParentUnknown(block)) => { debug!( self.log, @@ -806,7 +820,6 @@ impl Worker { Err(e @ BlockError::FutureSlot { .. }) | Err(e @ BlockError::WouldRevertFinalizedSlot { .. }) | Err(e @ BlockError::BlockIsAlreadyKnown) - | Err(e @ BlockError::RepeatProposal { .. }) | Err(e @ BlockError::NotFinalizedDescendant { .. }) => { debug!(self.log, "Could not verify block for gossip. Ignoring the block"; "error" => %e); @@ -948,7 +961,12 @@ impl Worker { let result = self .chain - .process_block(block_root, verified_block, NotifyExecutionLayer::Yes) + .process_block( + block_root, + verified_block, + NotifyExecutionLayer::Yes, + || Ok(()), + ) .await; match &result { diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 7e8fce35632..ac59b1daa93 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -98,33 +98,21 @@ impl Worker { }); // Checks if a block from this proposer is already known. - let proposal_already_known = || { + let block_equivocates = || { match self .chain .observed_block_producers .read() - .proposer_has_been_observed(block.message()) + .proposer_has_been_observed(block.message(), block.canonical_root()) { - Ok(is_observed) => is_observed, - // Both of these blocks will be rejected, so reject them now rather + Ok(seen_status) => seen_status.is_slashable(), + //Both of these blocks will be rejected, so reject them now rather // than re-queuing them. Err(ObserveError::FinalizedBlock { .. }) | Err(ObserveError::ValidatorIndexTooHigh { .. }) => false, } }; - // Returns `true` if the block is already known to fork choice. Notably, - // this will return `false` for blocks that we've already imported but - // ancestors of the finalized checkpoint. That should not be an issue - // for our use here since finalized blocks will always be late and won't - // be requeued anyway. - let block_is_already_known = || { - self.chain - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - }; - // If we've already seen a block from this proposer *and* the block // arrived before the attestation deadline, requeue it to ensure it is // imported late enough that it won't receive a proposer boost. @@ -132,7 +120,7 @@ impl Worker { // Don't requeue blocks if they're already known to fork choice, just // push them through to block processing so they can be handled through // the normal channels. - if !block_is_late && proposal_already_known() && !block_is_already_known() { + if !block_is_late && block_equivocates() { debug!( self.log, "Delaying processing of duplicate RPC block"; @@ -165,7 +153,7 @@ impl Worker { let parent_root = block.message().parent_root(); let result = self .chain - .process_block(block_root, block, NotifyExecutionLayer::Yes) + .process_block(block_root, block, NotifyExecutionLayer::Yes, || Ok(())) .await; metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 217d3569689..e34916bebab 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -322,6 +322,26 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts. + async fn post_generic_with_consensus_version( + &self, + url: U, + body: &T, + timeout: Option, + fork: ForkName, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let response = builder + .header(CONSENSUS_VERSION_HEADER, fork.to_string()) + .json(body) + .send() + .await?; + ok_or_error(response).await + } + /// `GET beacon/genesis` /// /// ## Errors @@ -654,6 +674,76 @@ impl BeaconNodeHttpClient { Ok(()) } + pub fn post_beacon_blocks_v2_path( + &self, + validation_level: Option, + ) -> Result { + let mut path = self.eth_path(V2)?; + path.path_segments_mut() + .map_err(|_| Error::InvalidUrl(self.server.clone()))? + .extend(&["beacon", "blocks"]); + + path.set_query( + validation_level + .map(|v| format!("broadcast_validation={}", v)) + .as_deref(), + ); + + Ok(path) + } + + pub fn post_beacon_blinded_blocks_v2_path( + &self, + validation_level: Option, + ) -> Result { + let mut path = self.eth_path(V2)?; + path.path_segments_mut() + .map_err(|_| Error::InvalidUrl(self.server.clone()))? + .extend(&["beacon", "blinded_blocks"]); + + path.set_query( + validation_level + .map(|v| format!("broadcast_validation={}", v)) + .as_deref(), + ); + + Ok(path) + } + + /// `POST v2/beacon/blocks` + pub async fn post_beacon_blocks_v2>( + &self, + block: &SignedBeaconBlock, + validation_level: Option, + ) -> Result<(), Error> { + self.post_generic_with_consensus_version( + self.post_beacon_blocks_v2_path(validation_level)?, + block, + Some(self.timeouts.proposal), + block.message().body().fork_name(), + ) + .await?; + + Ok(()) + } + + /// `POST v2/beacon/blinded_blocks` + pub async fn post_beacon_blinded_blocks_v2( + &self, + block: &SignedBlindedBeaconBlock, + validation_level: Option, + ) -> Result<(), Error> { + self.post_generic_with_consensus_version( + self.post_beacon_blinded_blocks_v2_path(validation_level)?, + block, + Some(self.timeouts.proposal), + block.message().body().fork_name(), + ) + .await?; + + Ok(()) + } + /// Path for `v2/beacon/blocks` pub fn get_beacon_blocks_path(&self, block_id: BlockId) -> Result { let mut path = self.eth_path(V2)?; diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 55759a2e158..5f2e1ada7be 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -6,7 +6,7 @@ use lighthouse_network::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStat use mediatype::{names, MediaType, MediaTypeList}; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; -use std::fmt; +use std::fmt::{self, Display}; use std::str::{from_utf8, FromStr}; use std::time::Duration; pub use types::*; @@ -1260,6 +1260,50 @@ pub struct ForkChoiceNode { pub execution_block_hash: Option, } +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum BroadcastValidation { + Gossip, + Consensus, + ConsensusAndEquivocation, +} + +impl Default for BroadcastValidation { + fn default() -> Self { + Self::Gossip + } +} + +impl Display for BroadcastValidation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Gossip => write!(f, "gossip"), + Self::Consensus => write!(f, "consensus"), + Self::ConsensusAndEquivocation => write!(f, "consensus_and_equivocation"), + } + } +} + +impl FromStr for BroadcastValidation { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + match s { + "gossip" => Ok(Self::Gossip), + "consensus" => Ok(Self::Consensus), + "consensus_and_equivocation" => Ok(Self::ConsensusAndEquivocation), + _ => Err("Invalid broadcast validation level"), + } + } +} + +#[derive(Default, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub struct BroadcastValidationQuery { + #[serde(default)] + pub broadcast_validation: BroadcastValidation, +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index c0ba8694100..dce1be742ff 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -89,7 +89,7 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload> BeaconBlockBodyRef<'a, T, } } -impl<'a, T: EthSpec> BeaconBlockBodyRef<'a, T> { +impl<'a, T: EthSpec, Payload: AbstractExecPayload> BeaconBlockBodyRef<'a, T, Payload> { /// Get the fork_name of this object pub fn fork_name(self) -> ForkName { match self { diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index e0f4043ac21..65528de1757 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -382,6 +382,7 @@ impl Tester { block_root, block.clone(), NotifyExecutionLayer::Yes, + || Ok(()), ))?; if result.is_ok() != valid { return Err(Error::DidntFail(format!(