Skip to content

Commit

Permalink
feat: integrate HeaderValidator + make FileClient generic over bl…
Browse files Browse the repository at this point in the history
…ock (#12681)
  • Loading branch information
klkvr authored Nov 20, 2024
1 parent 6977cf0 commit 868f3ac
Show file tree
Hide file tree
Showing 19 changed files with 143 additions and 89 deletions.
2 changes: 1 addition & 1 deletion bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
{
// building network downloaders using the fetch client
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(client.clone(), Arc::clone(&consensus))
.build(client.clone(), consensus.clone().as_header_validator())
.into_task_with(task_executor);

let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
Expand Down
2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ where
.with_tip_sender(tip_tx),
TestPipelineConfig::Real => {
let header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(client.clone(), consensus.clone())
.build(client.clone(), consensus.clone().as_header_validator())
.into_task();

let body_downloader = BodiesDownloaderBuilder::default()
Expand Down
23 changes: 21 additions & 2 deletions crates/consensus/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

extern crate alloc;

use alloc::{fmt::Debug, vec::Vec};
use alloc::{fmt::Debug, sync::Arc, vec::Vec};
use alloy_consensus::Header;
use alloy_eips::eip7685::Requests;
use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256, U256};
Expand Down Expand Up @@ -46,7 +46,9 @@ impl<'a> PostExecutionInput<'a> {

/// Consensus is a protocol that chooses canonical chain.
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus<H = Header, B = BlockBody>: HeaderValidator<H> + Debug + Send + Sync {
pub trait Consensus<H = Header, B = BlockBody>:
AsHeaderValidator<H> + HeaderValidator<H> + Debug + Send + Sync
{
/// Ensures that body field values match the header.
fn validate_body_against_header(
&self,
Expand Down Expand Up @@ -143,6 +145,23 @@ pub trait HeaderValidator<H = Header>: Debug + Send + Sync {
) -> Result<(), ConsensusError>;
}

/// Helper trait to cast `Arc<dyn Consensus>` to `Arc<dyn HeaderValidator>`
pub trait AsHeaderValidator<H>: HeaderValidator<H> {
/// Converts the [`Arc`] of self to [`Arc`] of [`HeaderValidator`]
fn as_header_validator<'a>(self: Arc<Self>) -> Arc<dyn HeaderValidator<H> + 'a>
where
Self: 'a;
}

impl<T: HeaderValidator<H>, H> AsHeaderValidator<H> for T {
fn as_header_validator<'a>(self: Arc<Self>) -> Arc<dyn HeaderValidator<H> + 'a>
where
Self: 'a,
{
self
}
}

/// Consensus Errors
#[derive(Debug, PartialEq, Eq, Clone, derive_more::Display, derive_more::Error)]
pub enum ConsensusError {
Expand Down
3 changes: 2 additions & 1 deletion crates/net/downloaders/src/bodies/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::Ordering,
collections::BinaryHeap,
fmt::Debug,
mem,
ops::RangeInclusive,
pin::Pin,
Expand Down Expand Up @@ -298,7 +299,7 @@ where

impl<B, Provider> BodyDownloader for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
B: BodiesClient<Body: Debug + InMemorySize> + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Body = B::Body;
Expand Down
7 changes: 4 additions & 3 deletions crates/net/downloaders/src/bodies/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use reth_network_p2p::{
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
fmt::Debug,
future::Future,
ops::RangeInclusive,
pin::Pin,
Expand Down Expand Up @@ -47,10 +48,10 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
/// use reth_network_p2p::bodies::client::BodiesClient;
/// use reth_primitives_traits::InMemorySize;
/// use reth_storage_api::HeaderProvider;
/// use std::sync::Arc;
/// use std::{fmt::Debug, sync::Arc};
///
/// fn t<
/// B: BodiesClient<Body: InMemorySize> + 'static,
/// B: BodiesClient<Body: Debug + InMemorySize> + 'static,
/// Provider: HeaderProvider + Unpin + 'static,
/// >(
/// client: Arc<B>,
Expand Down Expand Up @@ -90,7 +91,7 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
}
}

impl<B: Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
type Body = B;

fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
Expand Down
76 changes: 39 additions & 37 deletions crates/net/downloaders/src/file_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{collections::HashMap, io, path::Path};

use alloy_consensus::Header;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, B256};
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_network_p2p::{
Expand All @@ -13,7 +13,8 @@ use reth_network_p2p::{
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_primitives::{BlockBody, SealedHeader};
use reth_primitives::SealedHeader;
use reth_primitives_traits::{Block, BlockBody, FullBlock};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
Expand All @@ -40,15 +41,15 @@ pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
///
/// This reads the entire file into memory, so it is not suitable for large files.
#[derive(Debug)]
pub struct FileClient {
pub struct FileClient<B: Block = reth_primitives::Block> {
/// The buffered headers retrieved when fetching new bodies.
headers: HashMap<BlockNumber, Header>,
headers: HashMap<BlockNumber, B::Header>,

/// A mapping between block hash and number.
hash_to_number: HashMap<BlockHash, BlockNumber>,

/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, BlockBody>,
bodies: HashMap<BlockHash, B::Body>,
}

/// An error that can occur when constructing and using a [`FileClient`].
Expand All @@ -73,7 +74,7 @@ impl From<&'static str> for FileClientError {
}
}

impl FileClient {
impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Expand Down Expand Up @@ -114,7 +115,7 @@ impl FileClient {

/// Clones and returns the highest header of this client has or `None` if empty. Seals header
/// before returning.
pub fn tip_header(&self) -> Option<SealedHeader> {
pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone()))
}

Expand All @@ -137,13 +138,13 @@ impl FileClient {
}

/// Use the provided bodies as the file client's block body buffer.
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self {
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
self.bodies = bodies;
self
}

/// Use the provided headers as the file client's block body buffer.
pub fn with_headers(mut self, headers: HashMap<BlockNumber, Header>) -> Self {
pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
self.headers = headers;
for (number, header) in &self.headers {
self.hash_to_number.insert(header.hash_slow(), *number);
Expand All @@ -162,42 +163,43 @@ impl FileClient {
}

/// Returns an iterator over headers in the client.
pub fn headers_iter(&self) -> impl Iterator<Item = &Header> {
pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
self.headers.values()
}

/// Returns a mutable iterator over bodies in the client.
///
/// Panics, if file client headers and bodies are not mapping 1-1.
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut BlockBody)> {
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
let bodies = &mut self.bodies;
let numbers = &self.hash_to_number;
bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
}

/// Returns the current number of transactions in the client.
pub fn total_transactions(&self) -> usize {
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions.len())
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
}
}

impl FromReader for FileClient {
impl<B: FullBlock> FromReader for FileClient<B> {
type Error = FileClientError;

/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<B>(
reader: B,
fn from_reader<R>(
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
R: AsyncReadExt + Unpin,
{
let mut headers = HashMap::default();
let mut hash_to_number = HashMap::default();
let mut bodies = HashMap::default();

// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
let mut stream =
FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);

trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
Expand Down Expand Up @@ -225,13 +227,13 @@ impl FromReader for FileClient {
}
Err(err) => return Err(err),
};
let block_number = block.header.number;
let block_hash = block.header.hash_slow();
let block_number = block.header().number();
let block_hash = block.header().hash_slow();

// add to the internal maps
headers.insert(block.header.number, block.header.clone());
hash_to_number.insert(block_hash, block.header.number);
bodies.insert(block_hash, block.into());
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
bodies.insert(block_hash, block.body().clone());

if log_interval == 0 {
trace!(target: "downloaders::file",
Expand Down Expand Up @@ -260,9 +262,9 @@ impl FromReader for FileClient {
}
}

impl HeadersClient for FileClient {
type Header = Header;
type Output = HeadersFut;
impl<B: FullBlock> HeadersClient for FileClient<B> {
type Header = B::Header;
type Output = HeadersFut<B::Header>;

fn get_headers_with_priority(
&self,
Expand Down Expand Up @@ -311,9 +313,9 @@ impl HeadersClient for FileClient {
}
}

impl BodiesClient for FileClient {
type Body = BlockBody;
type Output = BodiesFut;
impl<B: FullBlock> BodiesClient for FileClient<B> {
type Body = B::Body;
type Output = BodiesFut<B::Body>;

fn get_block_bodies_with_priority(
&self,
Expand All @@ -336,7 +338,7 @@ impl BodiesClient for FileClient {
}
}

impl DownloadClient for FileClient {
impl<B: FullBlock> DownloadClient for FileClient<B> {
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
// noop
Expand Down Expand Up @@ -542,7 +544,7 @@ mod tests {
// create an empty file
let file = tempfile::tempfile().unwrap();

let client =
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Expand All @@ -567,14 +569,14 @@ mod tests {
let p0 = child_header(&p1);

let file = tempfile::tempfile().unwrap();
let client = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_headers(
HashMap::from([
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone().unseal()),
(1, p1.clone().unseal()),
(2, p2.clone().unseal()),
(3, p3.clone().unseal()),
]),
));
])),
);

let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
Expand All @@ -596,7 +598,7 @@ mod tests {
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());

// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -621,7 +623,7 @@ mod tests {
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;

// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());

// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);
Expand Down
20 changes: 12 additions & 8 deletions crates/net/downloaders/src/file_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::file_client::FileClientError;
use alloy_primitives::bytes::{Buf, BytesMut};
use alloy_rlp::{Decodable, Encodable};
use reth_primitives::Block;
use tokio_util::codec::{Decoder, Encoder};

/// Codec for reading raw block bodies from a file.
Expand All @@ -19,10 +18,16 @@ use tokio_util::codec::{Decoder, Encoder};
///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file.
pub(crate) struct BlockFileCodec;
pub(crate) struct BlockFileCodec<B>(std::marker::PhantomData<B>);

impl Decoder for BlockFileCodec {
type Item = Block;
impl<B> Default for BlockFileCodec<B> {
fn default() -> Self {
Self(std::marker::PhantomData)
}
}

impl<B: Decodable> Decoder for BlockFileCodec<B> {
type Item = B;
type Error = FileClientError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -31,18 +36,17 @@ impl Decoder for BlockFileCodec {
}

let buf_slice = &mut src.as_ref();
let body =
Block::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
let body = B::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
src.advance(src.len() - buf_slice.len());

Ok(Some(body))
}
}

impl Encoder<Block> for BlockFileCodec {
impl<B: Encodable> Encoder<B> for BlockFileCodec<B> {
type Error = FileClientError;

fn encode(&mut self, item: Block, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: B, dst: &mut BytesMut) -> Result<(), Self::Error> {
item.encode(dst);
Ok(())
}
Expand Down
Loading

0 comments on commit 868f3ac

Please sign in to comment.