Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync/p2p): adapt header sync to the latest p2p spec changes #1794

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/common/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub struct BlockHeader {
pub l1_da_mode: L1DataAvailabilityMode,
}

#[derive(Debug, Clone, PartialEq, Eq, Default, Dummy, serde::Serialize, serde::Deserialize)]
#[derive(
Debug, Copy, Clone, PartialEq, Eq, Default, Dummy, serde::Serialize, serde::Deserialize,
)]
#[serde(rename_all = "UPPERCASE")]
pub enum L1DataAvailabilityMode {
#[default]
Expand Down
8 changes: 3 additions & 5 deletions crates/p2p/src/client/peer_agnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tokio::sync::RwLock;

use crate::client::peer_aware;
use crate::client::types::{
RawDeployAccountTransaction, SignedBlockHeader as P2PSignedBlockHeader,
};
use crate::client::types::{RawDeployAccountTransaction, SignedBlockHeader};
use crate::sync::protocol;

/// Data received from a specific peer.
Expand Down Expand Up @@ -109,7 +107,7 @@ impl Client {
start: BlockNumber,
stop: BlockNumber,
reverse: bool,
) -> impl futures::Stream<Item = PeerData<P2PSignedBlockHeader>> {
) -> impl futures::Stream<Item = PeerData<SignedBlockHeader>> {
let (mut start, stop, direction) = match reverse {
true => (stop, start, Direction::Backward),
false => (start, stop, Direction::Forward),
Expand Down Expand Up @@ -147,7 +145,7 @@ impl Client {

while let Some(signed_header) = responses.next().await {
let signed_header = match signed_header {
BlockHeadersResponse::Header(hdr) => match P2PSignedBlockHeader::try_from(*hdr) {
BlockHeadersResponse::Header(hdr) => match SignedBlockHeader::try_from(*hdr) {
Ok(hdr) => hdr,
Err(error) => {
tracing::debug!(%peer, %error, "Header stream failed");
Expand Down
93 changes: 58 additions & 35 deletions crates/p2p/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pub trait TryFromDto<T> {
/// Represents a simplified [`pathfinder_common::SignedBlockHeader`], ie. excluding class commitment and storage commitment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SignedBlockHeader {
pub header: BlockHeader,
pub signature: BlockCommitmentSignature,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockHeader {
pub hash: BlockHash,
pub parent_hash: BlockHash,
pub number: BlockNumber,
Expand All @@ -47,7 +53,20 @@ pub struct SignedBlockHeader {
pub transaction_count: usize,
pub event_count: usize,
pub l1_da_mode: L1DataAvailabilityMode,
pub signature: BlockCommitmentSignature,
}

impl SignedBlockHeader {
pub fn verify_signature(&self) -> bool {
// TODO
true
}
}

impl BlockHeader {
pub fn verify_hash(&self) -> bool {
// TODO
true
}
}

impl TryFrom<p2p_proto::header::SignedBlockHeader> for SignedBlockHeader {
Expand All @@ -65,24 +84,26 @@ impl TryFrom<p2p_proto::header::SignedBlockHeader> for SignedBlockHeader {
.next()
.expect("exactly one element");
Ok(SignedBlockHeader {
hash: BlockHash(dto.block_hash.0),
parent_hash: BlockHash(dto.parent_hash.0),
number: BlockNumber::new(dto.number)
.ok_or(anyhow::anyhow!("block number > i64::MAX"))?,
timestamp: BlockTimestamp::new(dto.time)
.ok_or(anyhow::anyhow!("block timestamp > i64::MAX"))?,
eth_l1_gas_price: GasPrice(dto.gas_price_wei),
strk_l1_gas_price: GasPrice(dto.gas_price_fri),
eth_l1_data_gas_price: GasPrice(dto.data_gas_price_wei),
strk_l1_data_gas_price: GasPrice(dto.data_gas_price_fri),
sequencer_address: SequencerAddress(dto.sequencer_address.0),
starknet_version: dto.protocol_version.into(),
event_commitment: EventCommitment(dto.events.root.0),
state_commitment: StateCommitment(dto.state.root.0),
transaction_commitment: TransactionCommitment(dto.transactions.root.0),
transaction_count: dto.transactions.n_leaves.try_into()?,
event_count: dto.events.n_leaves.try_into()?,
l1_da_mode: TryFromDto::try_from_dto(dto.l1_data_availability_mode)?,
header: BlockHeader {
hash: BlockHash(dto.block_hash.0),
parent_hash: BlockHash(dto.parent_hash.0),
number: BlockNumber::new(dto.number)
.ok_or(anyhow::anyhow!("block number > i64::MAX"))?,
timestamp: BlockTimestamp::new(dto.time)
.ok_or(anyhow::anyhow!("block timestamp > i64::MAX"))?,
eth_l1_gas_price: GasPrice(dto.gas_price_wei),
strk_l1_gas_price: GasPrice(dto.gas_price_fri),
eth_l1_data_gas_price: GasPrice(dto.data_gas_price_wei),
strk_l1_data_gas_price: GasPrice(dto.data_gas_price_fri),
sequencer_address: SequencerAddress(dto.sequencer_address.0),
starknet_version: dto.protocol_version.into(),
event_commitment: EventCommitment(dto.events.root.0),
state_commitment: StateCommitment(dto.state.root.0),
transaction_commitment: TransactionCommitment(dto.transactions.root.0),
transaction_count: dto.transactions.n_leaves.try_into()?,
event_count: dto.events.n_leaves.try_into()?,
l1_da_mode: TryFromDto::try_from_dto(dto.l1_data_availability_mode)?,
},
signature,
})
}
Expand All @@ -101,22 +122,24 @@ impl
),
) -> Self {
Self {
hash: header.hash,
parent_hash: header.parent_hash,
number: header.number,
timestamp: header.timestamp,
eth_l1_gas_price: header.eth_l1_gas_price,
strk_l1_gas_price: header.strk_l1_gas_price,
eth_l1_data_gas_price: header.eth_l1_data_gas_price,
strk_l1_data_gas_price: header.strk_l1_data_gas_price,
sequencer_address: header.sequencer_address,
starknet_version: header.starknet_version,
event_commitment: header.event_commitment,
state_commitment: header.state_commitment,
transaction_commitment: header.transaction_commitment,
transaction_count: header.transaction_count,
event_count: header.event_count,
l1_da_mode: header.l1_da_mode,
header: BlockHeader {
hash: header.hash,
parent_hash: header.parent_hash,
number: header.number,
timestamp: header.timestamp,
eth_l1_gas_price: header.eth_l1_gas_price,
strk_l1_gas_price: header.strk_l1_gas_price,
eth_l1_data_gas_price: header.eth_l1_data_gas_price,
strk_l1_data_gas_price: header.strk_l1_data_gas_price,
sequencer_address: header.sequencer_address,
starknet_version: header.starknet_version,
event_commitment: header.event_commitment,
state_commitment: header.state_commitment,
transaction_commitment: header.transaction_commitment,
transaction_count: header.transaction_count,
event_count: header.event_count,
l1_da_mode: header.l1_da_mode,
},
signature,
}
}
Expand Down
8 changes: 3 additions & 5 deletions crates/pathfinder/src/p2p_network/sync_handlers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ mod prop {
use crate::p2p_network::sync_handlers;
use futures::channel::mpsc;
use futures::StreamExt;
use p2p::client::types::{
RawTransactionVariant, Receipt, SignedBlockHeader as P2PSignedBlockHeader, TryFromDto,
};
use p2p::client::types::{RawTransactionVariant, Receipt, SignedBlockHeader, TryFromDto};
use p2p_proto::class::{Class, ClassesRequest, ClassesResponse};
use p2p_proto::common::{BlockNumberOrHash, Iteration};
use p2p_proto::event::{EventsRequest, EventsResponse};
Expand Down Expand Up @@ -162,7 +160,7 @@ mod prop {
// Compute the overlapping set between the db and the request
// These are the headers that we expect to be read from the db
let expected = overlapping::get(in_db, start_block, limit, step, num_blocks, direction)
.into_iter().map(|(h, s, _, _, _, _)| P2PSignedBlockHeader::from((h, s)) ).collect::<Vec<_>>();
.into_iter().map(|(h, s, _, _, _, _)| SignedBlockHeader::from((h, s)) ).collect::<Vec<_>>();
// Run the handler
let request = BlockHeadersRequest { iteration: Iteration { start: BlockNumberOrHash::Number(start_block), limit, step, direction, } };
let mut responses = Runtime::new().unwrap().block_on(async {
Expand All @@ -181,7 +179,7 @@ mod prop {

// Check the rest
let actual = responses.into_iter().map(|response| match response {
BlockHeadersResponse::Header(hdr) => P2PSignedBlockHeader::try_from(*hdr).unwrap(),
BlockHeadersResponse::Header(hdr) => SignedBlockHeader::try_from(*hdr).unwrap(),
_ => panic!("unexpected response"),
}).collect::<Vec<_>>();

Expand Down
98 changes: 49 additions & 49 deletions crates/pathfinder/src/sync/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,59 +77,59 @@ impl Sync {
///
/// No guarantees are made about any headers newer than the anchor.
async fn sync_headers(&self, anchor: EthereumStateUpdate) -> anyhow::Result<()> {
// FIXME
todo!();
// while let Some(gap) =
// headers::next_gap(self.storage.clone(), anchor.block_number, anchor.block_hash)
// .await
// .context("Finding next gap in header chain")?
// {
// use futures::StreamExt;
// use futures::TryStreamExt;
while let Some(gap) =
headers::next_gap(self.storage.clone(), anchor.block_number, anchor.block_hash)
.await
.context("Finding next gap in header chain")?
{
use futures::StreamExt;
use futures::TryStreamExt;

// // TODO: create a tracing scope for this gap start, stop.
// TODO: create a tracing scope for this gap start, stop.

// tracing::info!("Syncing headers");
tracing::info!("Syncing headers");

// // TODO: consider .inspect_ok(tracing::trace!) for each stage.
// let result = self
// .p2p
// .clone()
// // TODO: consider buffering in the client to reduce request latency.
// .header_stream(gap.head, gap.tail, true)
// .scan((gap.head, gap.head_hash, false), headers::check_continuity)
// // TODO: rayon scope this.
// .and_then(headers::verify)
// // chunk so that persisting to storage can be batched.
// .try_chunks(1024)
// // TODO: Pull out remaining data from try_chunks error.
// // try_chunks::Error is a tuple of Err(data, error) so we
// // should re-stream that as Ok(data), Err(error). Right now
// // we just map to Err(error).
// .map_err(|e| e.1)
// .and_then(|x| headers::persist(x, self.storage.clone()))
// .inspect_ok(|x| tracing::info!(tail=%x.data.header.number, "Header chunk synced"))
// // Drive stream to completion.
// .try_fold((), |_state, _x| std::future::ready(Ok(())))
// .await;
// TODO: consider .inspect_ok(tracing::trace!) for each stage.
let result = self
.p2p
.clone()
// TODO: consider buffering in the client to reduce request latency.
.header_stream(gap.head, gap.tail, true)
.scan((gap.head, gap.head_hash, false), headers::check_continuity)
// TODO: rayon scope this.
.and_then(headers::verify)
// chunk so that persisting to storage can be batched.
.try_chunks(1024)
// TODO: Pull out remaining data from try_chunks error.
// try_chunks::Error is a tuple of Err(data, error) so we
// should re-stream that as Ok(data), Err(error). Right now
// we just map to Err(error).
.map_err(|e| e.1)
.and_then(|x| headers::persist(x, self.storage.clone()))
.inspect_ok(|x| tracing::info!(tail=%x.data.header.number, "Header chunk synced"))
// Drive stream to completion.
.try_fold((), |_state, _x| std::future::ready(Ok(())))
.await;

// match result {
// Ok(()) => {
// tracing::info!("Syncing headers complete");
// }
// Err(error) => {
// if let Some(peer_data) = error.peer_id_and_data() {
// // TODO: punish peer.
// tracing::debug!(
// peer=%peer_data.peer, block=%peer_data.data.header.number, %error,
// "Error while streaming headers"
// );
// } else {
// tracing::debug!(%error, "Error while streaming headers");
// }
// }
// }
// }
match result {
Ok(()) => {
tracing::info!("Syncing headers complete");
}
Err(error) => {
if let Some(peer_data) = error.peer_id_and_data() {
// TODO: punish peer.
tracing::debug!(
peer=%peer_data.peer, block=%peer_data.data.header.number, %error,
"Error while streaming headers"
);
} else {
tracing::debug!(%error, "Error while streaming headers");
}
}
}
}

Ok(())
}
}

Expand Down
30 changes: 26 additions & 4 deletions crates/pathfinder/src/sync/p2p/headers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code, unused_variables)]
use anyhow::Context;
use p2p::PeerData;
use pathfinder_common::{BlockHash, BlockNumber, SignedBlockHeader};
use p2p::{client::types::SignedBlockHeader, PeerData};
use pathfinder_common::{BlockHash, BlockNumber, ClassCommitment, StorageCommitment};
use pathfinder_storage::Storage;
use tokio::task::spawn_blocking;

Expand Down Expand Up @@ -142,6 +142,9 @@ pub(super) async fn verify(signed_header: PeerData<SignedBlockHeader>) -> Signed
.expect("Task should not crash")
}

/// # FIXME
/// class and storage commitments are 0 here
///
/// Writes the headers to storage.
pub(super) async fn persist(
mut signed_headers: Vec<PeerData<SignedBlockHeader>>,
Expand All @@ -154,8 +157,27 @@ pub(super) async fn persist(
let tx = db.transaction().context("Creating database transaction")?;

for SignedBlockHeader { header, signature } in signed_headers.iter().map(|x| &x.data) {
tx.insert_block_header(header)
.context("Persisting block header")?;
tx.insert_block_header(&pathfinder_common::BlockHeader {
hash: header.hash,
parent_hash: header.parent_hash,
number: header.number,
timestamp: header.timestamp,
eth_l1_gas_price: header.eth_l1_gas_price,
strk_l1_gas_price: header.strk_l1_gas_price,
eth_l1_data_gas_price: header.eth_l1_data_gas_price,
strk_l1_data_gas_price: header.strk_l1_data_gas_price,
sequencer_address: header.sequencer_address,
starknet_version: header.starknet_version.clone(),
class_commitment: ClassCommitment::ZERO,
event_commitment: header.event_commitment,
state_commitment: header.state_commitment,
storage_commitment: StorageCommitment::ZERO,
transaction_commitment: header.transaction_commitment,
transaction_count: header.transaction_count,
event_count: header.event_count,
l1_da_mode: header.l1_da_mode,
})
.context("Persisting block header")?;
tx.insert_signature(header.number, signature)
.context("Persisting block signature")?;
}
Expand Down
Loading