Skip to content

Commit

Permalink
feat(p2p_network): update header stream
Browse files Browse the repository at this point in the history
  • Loading branch information
CHr15F0x committed Feb 12, 2024
1 parent 0911859 commit fb06208
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 127 deletions.
141 changes: 67 additions & 74 deletions crates/p2p/src/client/peer_agnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,29 @@ use std::{
use anyhow::Context;
use futures::{channel::mpsc, StreamExt};
use libp2p::PeerId;
use p2p_proto::class::{Class, ClassesRequest};
use p2p_proto::common::{Direction, Iteration};
use p2p_proto::event::EventsRequest;
use p2p_proto::header::BlockHeadersRequest;
use p2p_proto::header::{BlockHeadersRequest, BlockHeadersResponse};
use p2p_proto::receipt::{Receipt, ReceiptsRequest};
use p2p_proto::state::StateDiffsRequest;
use p2p_proto::transaction::TransactionsRequest;
use pathfinder_common::transaction::{
DeployAccountTransactionV0V1, DeployAccountTransactionV3, TransactionVariant,
use p2p_proto::{
class::{Class, ClassesRequest},
header::SignedBlockHeader,
};
use pathfinder_common::{event::Event, StateUpdate};
use pathfinder_common::{
BlockHash, BlockNumber, ContractAddress, SignedBlockHeader, TransactionHash,
transaction::{DeployAccountTransactionV0V1, DeployAccountTransactionV3, TransactionVariant},
BlockNumber,
};
use pathfinder_common::{BlockHash, ContractAddress, TransactionHash};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tokio::sync::RwLock;

use crate::client::peer_aware;
use crate::client::types::RawDeployAccountTransaction;
use crate::client::types::{
RawDeployAccountTransaction, SignedBlockHeader as P2PSignedBlockHeader,
};
use crate::sync::protocol;

/// Data received from a specific peer.
Expand Down Expand Up @@ -113,82 +117,71 @@ impl Client {
start: BlockNumber,
stop: BlockNumber,
reverse: bool,
) -> impl futures::Stream<Item = PeerData<SignedBlockHeader>> {
// FIXME
// Keep compiler happy
let (_, rx) = futures::channel::mpsc::channel(0);
rx
// let (mut start, stop, direction) = match reverse {
// true => (stop, start, Direction::Backward),
// false => (start, stop, Direction::Forward),
// };

// async_stream::stream! {
// // Loop which refreshes peer set once we exhaust it.
// loop {
// let peers = self
// .get_update_peers_with_sync_capability(protocol::Headers::NAME)
// .await;

// // Attempt each peer.
// 'next_peer: for peer in peers {
// let limit = start.get().max(stop.get()) - start.get().min(stop.get());
) -> impl futures::Stream<Item = PeerData<P2PSignedBlockHeader>> {
let (mut start, stop, direction) = match reverse {
true => (stop, start, Direction::Backward),
false => (start, stop, Direction::Forward),
};

// let request = BlockHeadersRequest {
// iteration: Iteration {
// start: start.get().into(),
// direction,
// limit,
// step: 1.into(),
// },
// };
async_stream::stream! {
// Loop which refreshes peer set once we exhaust it.
loop {
let peers = self
.get_update_peers_with_sync_capability(protocol::Headers::NAME)
.await;

// let responses = match self.inner.send_headers_sync_request(peer, request).await
// {
// Ok(x) => x,
// Err(error) => {
// // Failed to establish connection, try next peer.
// tracing::debug!(%peer, reason=%error, "Headers request failed");
// continue 'next_peer;
// }
// };
// Attempt each peer.
'next_peer: for peer in peers {
let limit = start.get().max(stop.get()) - start.get().min(stop.get());

// let mut responses = responses
// .flat_map(|response| futures::stream::iter(response.parts))
// .chunks(2)
// .scan((), |(), chunk| async { parse::handle_signed_header_chunk(chunk) })
// .boxed();
let request = BlockHeadersRequest {
iteration: Iteration {
start: start.get().into(),
direction,
limit,
step: 1.into(),
},
};

// while let Some(signed_header) = responses.next().await {
// let signed_header = match signed_header {
// Ok(signed_header) => signed_header,
// Err(error) => {
// tracing::debug!(%peer, %error, "Header stream failed");
// continue 'next_peer;
// }
// };
let mut responses = match self.inner.send_headers_sync_request(peer, request).await
{
Ok(x) => x,
Err(error) => {
// Failed to establish connection, try next peer.
tracing::debug!(%peer, reason=%error, "Headers request failed");
continue 'next_peer;
}
};

// // Small sanity check. We cannot reliably check the hash here,
// // its easier for the caller to ensure it matches expectations.
// if signed_header.header.number != start {
// tracing::debug!(%peer, "Wrong block number");
// continue 'next_peer;
// }
while let Some(signed_header) = responses.next().await {
let signed_header = match signed_header {
BlockHeadersResponse::Header(hdr) => match P2PSignedBlockHeader::try_from(*hdr) {
Ok(hdr) => hdr,
Err(error) => {
tracing::debug!(%peer, %error, "Header stream failed");
continue 'next_peer;
},
},
BlockHeadersResponse::Fin => {
tracing::debug!(%peer, "Header stream Fin");
continue 'next_peer;
}
};

// start = match direction {
// Direction::Forward => start + 1,
// // unwrap_or_default is safe as this is the genesis edge case,
// // at which point the loop will complete at the end of this iteration.
// Direction::Backward => start.parent().unwrap_or_default(),
// };
start = match direction {
Direction::Forward => start + 1,
// unwrap_or_default is safe as this is the genesis edge case,
// at which point the loop will complete at the end of this iteration.
Direction::Backward => start.parent().unwrap_or_default(),
};

// yield PeerData::new(peer, signed_header);
// }
yield PeerData::new(peer, signed_header);
}

// // TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc.
// }
// }
// }
// TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc.
}
}
}
}
}

Expand Down
90 changes: 87 additions & 3 deletions crates/p2p/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use pathfinder_common::transaction::{
ResourceBound, ResourceBounds, TransactionVariant,
};
use pathfinder_common::{
AccountDeploymentDataElem, CallParam, CasmHash, ClassHash, ConstructorParam, ContractAddress,
ContractAddressSalt, EntryPoint, EventData, EventKey, Fee, PaymasterDataElem, Tip,
TransactionNonce, TransactionSignatureElem, TransactionVersion,
AccountDeploymentDataElem, BlockCommitmentSignature, BlockCommitmentSignatureElem, BlockHash,
BlockNumber, BlockTimestamp, CallParam, CasmHash, ClassHash, ConstructorParam, ContractAddress,
ContractAddressSalt, EntryPoint, EventCommitment, EventData, EventKey, Fee, GasPrice,
PaymasterDataElem, SequencerAddress, StarknetVersion, StateCommitment, Tip,
TransactionCommitment, TransactionNonce, TransactionSignatureElem, TransactionVersion,
};

/// We don't want to introduce circular dependencies between crates
Expand All @@ -22,6 +24,88 @@ pub trait TryFromDto<T> {
Self: Sized;
}

/// Represents a simplified [`pathfinder_common::SignedBlockHeader`], ie. excluding class commitment and storage commitment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SignedBlockHeader {
pub hash: BlockHash,
pub parent_hash: BlockHash,
pub number: BlockNumber,
pub timestamp: BlockTimestamp,
pub eth_l1_gas_price: GasPrice,
pub sequencer_address: SequencerAddress,
pub starknet_version: StarknetVersion,
pub event_commitment: EventCommitment,
pub state_commitment: StateCommitment,
pub transaction_commitment: TransactionCommitment,
pub transaction_count: usize,
pub event_count: usize,
pub signature: BlockCommitmentSignature,
}

impl TryFrom<p2p_proto::header::SignedBlockHeader> for SignedBlockHeader {
type Error = anyhow::Error;

fn try_from(dto: p2p_proto::header::SignedBlockHeader) -> Result<Self, Self::Error> {
anyhow::ensure!(dto.signatures.len() == 1, "expected exactly one signature");
let signature = dto
.signatures
.into_iter()
.map(|sig| BlockCommitmentSignature {
r: BlockCommitmentSignatureElem(sig.r),
s: BlockCommitmentSignatureElem(sig.s),
})
.next()
.expect("exactly one element");
Ok(SignedBlockHeader {
hash: BlockHash(dto.block_hash.0),
parent_hash: BlockHash(dto.parent_hash.0),
number: BlockNumber::new(dto.number)
.ok_or(anyhow::anyhow!("block number > i64::MAX"))?,
timestamp: BlockTimestamp::new(dto.time)
.ok_or(anyhow::anyhow!("block timestamp > i64::MAX"))?,
eth_l1_gas_price: dto.gas_price.into(),
sequencer_address: SequencerAddress(dto.sequencer_address.0),
starknet_version: dto.protocol_version.into(),
event_commitment: EventCommitment(dto.events.root.0),
state_commitment: StateCommitment(dto.state.root.0),
transaction_commitment: TransactionCommitment(dto.transactions.root.0),
transaction_count: dto.transactions.n_leaves.try_into()?,
event_count: dto.events.n_leaves.try_into()?,
signature,
})
}
}

impl
From<(
pathfinder_common::BlockHeader,
pathfinder_common::BlockCommitmentSignature,
)> for SignedBlockHeader
{
fn from(
(header, signature): (
pathfinder_common::BlockHeader,
pathfinder_common::BlockCommitmentSignature,
),
) -> Self {
Self {
hash: header.hash,
parent_hash: header.parent_hash,
number: header.number,
timestamp: header.timestamp,
eth_l1_gas_price: header.eth_l1_gas_price,
sequencer_address: header.sequencer_address,
starknet_version: header.starknet_version,
event_commitment: header.event_commitment,
state_commitment: header.state_commitment,
transaction_commitment: header.transaction_commitment,
transaction_count: header.transaction_count,
event_count: header.event_count,
signature,
}
}
}

/// Deployed contract address has not been computed for deploy account transactions.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RawTransactionVariant {
Expand Down
1 change: 0 additions & 1 deletion crates/pathfinder/src/p2p_network/client/conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use serde_json::value::RawValue;
use starknet_gateway_types::{
class_definition::{self, SierraEntryPoints},
class_hash::from_parts::{compute_cairo_class_hash, compute_sierra_class_hash},
reply::transaction as gw,
request::contract::{SelectorAndFunctionIndex, SelectorAndOffset},
};

Expand Down
4 changes: 2 additions & 2 deletions crates/pathfinder/src/p2p_network/sync_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ fn get_events_for_block(
}

/// Assupmtions:
/// - `block_handler` returns Ok(true) if the iteration should continue.
/// - T::default() always returns the `Fin` variant of the implementing type.
/// - `block_handler` returns `Ok(true)` if the iteration should continue.
/// - `T::default()` always returns the `Fin` variant of the implementing type.
fn iterate<T: Default>(
tx: Transaction<'_>,
iteration: Iteration,
Expand Down
Loading

0 comments on commit fb06208

Please sign in to comment.