From 7c7e6ea52dcd79c2d7d59ddaab0d4275995a127b Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 16 Feb 2024 09:10:31 +0100 Subject: [PATCH] feat(sync/p2p): adapt header sync to the latest p2p spec changes --- crates/common/src/header.rs | 4 +- crates/p2p/src/client/peer_agnostic.rs | 8 +- crates/p2p/src/client/types.rs | 93 +++++++++++------- .../src/p2p_network/sync_handlers/tests.rs | 8 +- crates/pathfinder/src/sync/p2p.rs | 98 +++++++++---------- crates/pathfinder/src/sync/p2p/headers.rs | 30 +++++- 6 files changed, 142 insertions(+), 99 deletions(-) diff --git a/crates/common/src/header.rs b/crates/common/src/header.rs index b6ffd3c840..5bb69d307b 100644 --- a/crates/common/src/header.rs +++ b/crates/common/src/header.rs @@ -24,7 +24,9 @@ pub struct BlockHeader { pub l1_da_mode: L1DataAvailabilityMode, } -#[derive(Debug, Clone, PartialEq, Eq, Default, Dummy, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Copy, Clone, PartialEq, Eq, Default, Dummy, serde::Serialize, serde::Deserialize, +)] #[serde(rename_all = "UPPERCASE")] pub enum L1DataAvailabilityMode { #[default] diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index d4b05ac5f0..439f6396a0 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -21,9 +21,7 @@ use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use tokio::sync::RwLock; use crate::client::peer_aware; -use crate::client::types::{ - RawDeployAccountTransaction, SignedBlockHeader as P2PSignedBlockHeader, -}; +use crate::client::types::{RawDeployAccountTransaction, SignedBlockHeader}; use crate::sync::protocol; /// Data received from a specific peer. @@ -109,7 +107,7 @@ impl Client { start: BlockNumber, stop: BlockNumber, reverse: bool, - ) -> impl futures::Stream> { + ) -> impl futures::Stream> { let (mut start, stop, direction) = match reverse { true => (stop, start, Direction::Backward), false => (start, stop, Direction::Forward), @@ -147,7 +145,7 @@ impl Client { while let Some(signed_header) = responses.next().await { let signed_header = match signed_header { - BlockHeadersResponse::Header(hdr) => match P2PSignedBlockHeader::try_from(*hdr) { + BlockHeadersResponse::Header(hdr) => match SignedBlockHeader::try_from(*hdr) { Ok(hdr) => hdr, Err(error) => { tracing::debug!(%peer, %error, "Header stream failed"); diff --git a/crates/p2p/src/client/types.rs b/crates/p2p/src/client/types.rs index eb9bd82676..b08acf800c 100644 --- a/crates/p2p/src/client/types.rs +++ b/crates/p2p/src/client/types.rs @@ -31,6 +31,12 @@ pub trait TryFromDto { /// Represents a simplified [`pathfinder_common::SignedBlockHeader`], ie. excluding class commitment and storage commitment. #[derive(Debug, Clone, PartialEq, Eq)] pub struct SignedBlockHeader { + pub header: BlockHeader, + pub signature: BlockCommitmentSignature, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BlockHeader { pub hash: BlockHash, pub parent_hash: BlockHash, pub number: BlockNumber, @@ -47,7 +53,20 @@ pub struct SignedBlockHeader { pub transaction_count: usize, pub event_count: usize, pub l1_da_mode: L1DataAvailabilityMode, - pub signature: BlockCommitmentSignature, +} + +impl SignedBlockHeader { + pub fn verify_signature(&self) -> bool { + // TODO + true + } +} + +impl BlockHeader { + pub fn verify_hash(&self) -> bool { + // TODO + true + } } impl TryFrom for SignedBlockHeader { @@ -65,24 +84,26 @@ impl TryFrom for SignedBlockHeader { .next() .expect("exactly one element"); Ok(SignedBlockHeader { - hash: BlockHash(dto.block_hash.0), - parent_hash: BlockHash(dto.parent_hash.0), - number: BlockNumber::new(dto.number) - .ok_or(anyhow::anyhow!("block number > i64::MAX"))?, - timestamp: BlockTimestamp::new(dto.time) - .ok_or(anyhow::anyhow!("block timestamp > i64::MAX"))?, - eth_l1_gas_price: GasPrice(dto.gas_price_wei), - strk_l1_gas_price: GasPrice(dto.gas_price_fri), - eth_l1_data_gas_price: GasPrice(dto.data_gas_price_wei), - strk_l1_data_gas_price: GasPrice(dto.data_gas_price_fri), - sequencer_address: SequencerAddress(dto.sequencer_address.0), - starknet_version: dto.protocol_version.into(), - event_commitment: EventCommitment(dto.events.root.0), - state_commitment: StateCommitment(dto.state.root.0), - transaction_commitment: TransactionCommitment(dto.transactions.root.0), - transaction_count: dto.transactions.n_leaves.try_into()?, - event_count: dto.events.n_leaves.try_into()?, - l1_da_mode: TryFromDto::try_from_dto(dto.l1_data_availability_mode)?, + header: BlockHeader { + hash: BlockHash(dto.block_hash.0), + parent_hash: BlockHash(dto.parent_hash.0), + number: BlockNumber::new(dto.number) + .ok_or(anyhow::anyhow!("block number > i64::MAX"))?, + timestamp: BlockTimestamp::new(dto.time) + .ok_or(anyhow::anyhow!("block timestamp > i64::MAX"))?, + eth_l1_gas_price: GasPrice(dto.gas_price_wei), + strk_l1_gas_price: GasPrice(dto.gas_price_fri), + eth_l1_data_gas_price: GasPrice(dto.data_gas_price_wei), + strk_l1_data_gas_price: GasPrice(dto.data_gas_price_fri), + sequencer_address: SequencerAddress(dto.sequencer_address.0), + starknet_version: dto.protocol_version.into(), + event_commitment: EventCommitment(dto.events.root.0), + state_commitment: StateCommitment(dto.state.root.0), + transaction_commitment: TransactionCommitment(dto.transactions.root.0), + transaction_count: dto.transactions.n_leaves.try_into()?, + event_count: dto.events.n_leaves.try_into()?, + l1_da_mode: TryFromDto::try_from_dto(dto.l1_data_availability_mode)?, + }, signature, }) } @@ -101,22 +122,24 @@ impl ), ) -> Self { Self { - hash: header.hash, - parent_hash: header.parent_hash, - number: header.number, - timestamp: header.timestamp, - eth_l1_gas_price: header.eth_l1_gas_price, - strk_l1_gas_price: header.strk_l1_gas_price, - eth_l1_data_gas_price: header.eth_l1_data_gas_price, - strk_l1_data_gas_price: header.strk_l1_data_gas_price, - sequencer_address: header.sequencer_address, - starknet_version: header.starknet_version, - event_commitment: header.event_commitment, - state_commitment: header.state_commitment, - transaction_commitment: header.transaction_commitment, - transaction_count: header.transaction_count, - event_count: header.event_count, - l1_da_mode: header.l1_da_mode, + header: BlockHeader { + hash: header.hash, + parent_hash: header.parent_hash, + number: header.number, + timestamp: header.timestamp, + eth_l1_gas_price: header.eth_l1_gas_price, + strk_l1_gas_price: header.strk_l1_gas_price, + eth_l1_data_gas_price: header.eth_l1_data_gas_price, + strk_l1_data_gas_price: header.strk_l1_data_gas_price, + sequencer_address: header.sequencer_address, + starknet_version: header.starknet_version, + event_commitment: header.event_commitment, + state_commitment: header.state_commitment, + transaction_commitment: header.transaction_commitment, + transaction_count: header.transaction_count, + event_count: header.event_count, + l1_da_mode: header.l1_da_mode, + }, signature, } } diff --git a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs index 592177e6ba..900bbff2b8 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs @@ -99,9 +99,7 @@ mod prop { use crate::p2p_network::sync_handlers; use futures::channel::mpsc; use futures::StreamExt; - use p2p::client::types::{ - RawTransactionVariant, Receipt, SignedBlockHeader as P2PSignedBlockHeader, TryFromDto, - }; + use p2p::client::types::{RawTransactionVariant, Receipt, SignedBlockHeader, TryFromDto}; use p2p_proto::class::{Class, ClassesRequest, ClassesResponse}; use p2p_proto::common::{BlockNumberOrHash, Iteration}; use p2p_proto::event::{EventsRequest, EventsResponse}; @@ -162,7 +160,7 @@ mod prop { // Compute the overlapping set between the db and the request // These are the headers that we expect to be read from the db let expected = overlapping::get(in_db, start_block, limit, step, num_blocks, direction) - .into_iter().map(|(h, s, _, _, _, _)| P2PSignedBlockHeader::from((h, s)) ).collect::>(); + .into_iter().map(|(h, s, _, _, _, _)| SignedBlockHeader::from((h, s)) ).collect::>(); // Run the handler let request = BlockHeadersRequest { iteration: Iteration { start: BlockNumberOrHash::Number(start_block), limit, step, direction, } }; let mut responses = Runtime::new().unwrap().block_on(async { @@ -181,7 +179,7 @@ mod prop { // Check the rest let actual = responses.into_iter().map(|response| match response { - BlockHeadersResponse::Header(hdr) => P2PSignedBlockHeader::try_from(*hdr).unwrap(), + BlockHeadersResponse::Header(hdr) => SignedBlockHeader::try_from(*hdr).unwrap(), _ => panic!("unexpected response"), }).collect::>(); diff --git a/crates/pathfinder/src/sync/p2p.rs b/crates/pathfinder/src/sync/p2p.rs index d29649230e..7dd280a66b 100644 --- a/crates/pathfinder/src/sync/p2p.rs +++ b/crates/pathfinder/src/sync/p2p.rs @@ -77,59 +77,59 @@ impl Sync { /// /// No guarantees are made about any headers newer than the anchor. async fn sync_headers(&self, anchor: EthereumStateUpdate) -> anyhow::Result<()> { - // FIXME - todo!(); - // while let Some(gap) = - // headers::next_gap(self.storage.clone(), anchor.block_number, anchor.block_hash) - // .await - // .context("Finding next gap in header chain")? - // { - // use futures::StreamExt; - // use futures::TryStreamExt; + while let Some(gap) = + headers::next_gap(self.storage.clone(), anchor.block_number, anchor.block_hash) + .await + .context("Finding next gap in header chain")? + { + use futures::StreamExt; + use futures::TryStreamExt; - // // TODO: create a tracing scope for this gap start, stop. + // TODO: create a tracing scope for this gap start, stop. - // tracing::info!("Syncing headers"); + tracing::info!("Syncing headers"); - // // TODO: consider .inspect_ok(tracing::trace!) for each stage. - // let result = self - // .p2p - // .clone() - // // TODO: consider buffering in the client to reduce request latency. - // .header_stream(gap.head, gap.tail, true) - // .scan((gap.head, gap.head_hash, false), headers::check_continuity) - // // TODO: rayon scope this. - // .and_then(headers::verify) - // // chunk so that persisting to storage can be batched. - // .try_chunks(1024) - // // TODO: Pull out remaining data from try_chunks error. - // // try_chunks::Error is a tuple of Err(data, error) so we - // // should re-stream that as Ok(data), Err(error). Right now - // // we just map to Err(error). - // .map_err(|e| e.1) - // .and_then(|x| headers::persist(x, self.storage.clone())) - // .inspect_ok(|x| tracing::info!(tail=%x.data.header.number, "Header chunk synced")) - // // Drive stream to completion. - // .try_fold((), |_state, _x| std::future::ready(Ok(()))) - // .await; + // TODO: consider .inspect_ok(tracing::trace!) for each stage. + let result = self + .p2p + .clone() + // TODO: consider buffering in the client to reduce request latency. + .header_stream(gap.head, gap.tail, true) + .scan((gap.head, gap.head_hash, false), headers::check_continuity) + // TODO: rayon scope this. + .and_then(headers::verify) + // chunk so that persisting to storage can be batched. + .try_chunks(1024) + // TODO: Pull out remaining data from try_chunks error. + // try_chunks::Error is a tuple of Err(data, error) so we + // should re-stream that as Ok(data), Err(error). Right now + // we just map to Err(error). + .map_err(|e| e.1) + .and_then(|x| headers::persist(x, self.storage.clone())) + .inspect_ok(|x| tracing::info!(tail=%x.data.header.number, "Header chunk synced")) + // Drive stream to completion. + .try_fold((), |_state, _x| std::future::ready(Ok(()))) + .await; - // match result { - // Ok(()) => { - // tracing::info!("Syncing headers complete"); - // } - // Err(error) => { - // if let Some(peer_data) = error.peer_id_and_data() { - // // TODO: punish peer. - // tracing::debug!( - // peer=%peer_data.peer, block=%peer_data.data.header.number, %error, - // "Error while streaming headers" - // ); - // } else { - // tracing::debug!(%error, "Error while streaming headers"); - // } - // } - // } - // } + match result { + Ok(()) => { + tracing::info!("Syncing headers complete"); + } + Err(error) => { + if let Some(peer_data) = error.peer_id_and_data() { + // TODO: punish peer. + tracing::debug!( + peer=%peer_data.peer, block=%peer_data.data.header.number, %error, + "Error while streaming headers" + ); + } else { + tracing::debug!(%error, "Error while streaming headers"); + } + } + } + } + + Ok(()) } } diff --git a/crates/pathfinder/src/sync/p2p/headers.rs b/crates/pathfinder/src/sync/p2p/headers.rs index 303567221b..268a54f921 100644 --- a/crates/pathfinder/src/sync/p2p/headers.rs +++ b/crates/pathfinder/src/sync/p2p/headers.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_variables)] use anyhow::Context; -use p2p::PeerData; -use pathfinder_common::{BlockHash, BlockNumber, SignedBlockHeader}; +use p2p::{client::types::SignedBlockHeader, PeerData}; +use pathfinder_common::{BlockHash, BlockNumber, ClassCommitment, StorageCommitment}; use pathfinder_storage::Storage; use tokio::task::spawn_blocking; @@ -142,6 +142,9 @@ pub(super) async fn verify(signed_header: PeerData) -> Signed .expect("Task should not crash") } +/// # FIXME +/// class and storage commitments are 0 here +/// /// Writes the headers to storage. pub(super) async fn persist( mut signed_headers: Vec>, @@ -154,8 +157,27 @@ pub(super) async fn persist( let tx = db.transaction().context("Creating database transaction")?; for SignedBlockHeader { header, signature } in signed_headers.iter().map(|x| &x.data) { - tx.insert_block_header(header) - .context("Persisting block header")?; + tx.insert_block_header(&pathfinder_common::BlockHeader { + hash: header.hash, + parent_hash: header.parent_hash, + number: header.number, + timestamp: header.timestamp, + eth_l1_gas_price: header.eth_l1_gas_price, + strk_l1_gas_price: header.strk_l1_gas_price, + eth_l1_data_gas_price: header.eth_l1_data_gas_price, + strk_l1_data_gas_price: header.strk_l1_data_gas_price, + sequencer_address: header.sequencer_address, + starknet_version: header.starknet_version.clone(), + class_commitment: ClassCommitment::ZERO, + event_commitment: header.event_commitment, + state_commitment: header.state_commitment, + storage_commitment: StorageCommitment::ZERO, + transaction_commitment: header.transaction_commitment, + transaction_count: header.transaction_count, + event_count: header.event_count, + l1_da_mode: header.l1_da_mode, + }) + .context("Persisting block header")?; tx.insert_signature(header.number, signature) .context("Persisting block signature")?; }