From 2279a96359ea510168ade5a27e296deab319618a Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 1 Jul 2024 16:29:14 +0200 Subject: [PATCH 01/18] chore: remove dead code --- crates/p2p/src/client/peer_agnostic.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 096b4dc2b3..5e7997d1d7 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -143,6 +143,8 @@ pub struct Client { peers_with_capability: Arc>, } +// pub trait TrackingClient; + impl Client { pub fn new(inner: peer_aware::Client, block_propagation_topic: String) -> Self { Self { @@ -293,16 +295,6 @@ impl Client { } } - pub async fn send_transactions_sync_request( - &self, - peer: PeerId, - request: TransactionsRequest, - ) -> anyhow::Result> { - self.inner - .send_transactions_sync_request(peer, request) - .await - } - pub fn transaction_stream( self, start: BlockNumber, From 3fb16f735e50aecd814b5778a655bcfae7fa0368 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 1 Jul 2024 16:52:36 +0200 Subject: [PATCH 02/18] refactor(p2p): extract a trait from the client --- crates/p2p/src/client/peer_agnostic.rs | 173 ++++++++++++++++++++++++- crates/pathfinder/src/sync/track.rs | 1 + 2 files changed, 168 insertions(+), 6 deletions(-) diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 5e7997d1d7..9f589a9c4a 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Context; use fake::Dummy; -use futures::{pin_mut, StreamExt}; +use futures::{pin_mut, Future, StreamExt}; use libp2p::PeerId; use p2p_proto::class::{ClassesRequest, ClassesResponse}; use p2p_proto::common::{Direction, Iteration}; @@ -143,8 +143,6 @@ pub struct Client { peers_with_capability: Arc>, } -// pub trait TrackingClient; - impl Client { pub fn new(inner: peer_aware::Client, block_propagation_topic: String) -> Self { Self { @@ -472,7 +470,124 @@ impl Client { None } - pub async fn transactions_for_block( + pub async fn class_definitions_for_block( + self, + block: BlockNumber, + declared_classes_count: u64, + ) -> Result)>, ClassDefinitionsError> { + let request = ClassesRequest { + iteration: Iteration { + start: block.get().into(), + direction: Direction::Forward, + limit: 1, + step: 1.into(), + }, + }; + + let peers = self + .get_update_peers_with_sync_capability(protocol::Classes::NAME) + .await; + + for peer in peers { + let Ok(mut stream) = self + .inner + .send_classes_sync_request(peer, request) + .await + .inspect_err(|error| tracing::debug!(%peer, %error, "State diffs request failed")) + else { + continue; + }; + + let mut current_count = declared_classes_count; + let mut class_definitions = Vec::new(); + + while let Some(resp) = stream.next().await { + match resp { + ClassesResponse::Class(p2p_proto::class::Class::Cairo0 { + class, + domain: _, + }) => { + let definition = CairoDefinition::try_from_dto(class) + .map_err(|_| ClassDefinitionsError::CairoDefinitionError(peer))?; + class_definitions.push(ClassDefinition::Cairo { + block_number: block, + definition: definition.0, + }); + } + ClassesResponse::Class(p2p_proto::class::Class::Cairo1 { + class, + domain: _, + }) => { + let definition = SierraDefinition::try_from_dto(class) + .map_err(|_| ClassDefinitionsError::SierraDefinitionError(peer))?; + class_definitions.push(ClassDefinition::Sierra { + block_number: block, + sierra_definition: definition.0, + }); + } + ClassesResponse::Fin => { + tracing::debug!(%peer, "Received FIN in class definitions source"); + break; + } + } + + current_count = match current_count.checked_sub(1) { + Some(x) => x, + None => { + tracing::debug!(%peer, "Too many class definitions"); + return Err(ClassDefinitionsError::IncorrectClassDefinitionCount(peer)); + } + }; + } + + if current_count != 0 { + tracing::debug!(%peer, "Too few class definitions"); + return Err(ClassDefinitionsError::IncorrectClassDefinitionCount(peer)); + } + + return Ok(Some((peer, class_definitions))); + } + + Ok(None) + } +} + +pub trait BlockClient { + fn transactions_for_block( + self, + block: BlockNumber, + ) -> impl Future< + Output = Option<( + PeerId, + impl futures::Stream>, + )>, + >; + + fn state_diff_for_block( + self, + block: BlockNumber, + state_diff_length: u64, + ) -> impl Future, IncorrectStateDiffCount>>; + + fn class_definitions_for_block( + self, + block: BlockNumber, + declared_classes_count: u64, + ) -> impl Future)>, ClassDefinitionsError>>; + + fn events_for_block( + self, + block: BlockNumber, + ) -> impl Future< + Output = Option<( + PeerId, + impl futures::Stream, + )>, + >; +} + +impl BlockClient for Client { + async fn transactions_for_block( self, block: BlockNumber, ) -> Option<( @@ -525,7 +640,7 @@ impl Client { None } - pub async fn state_diff_for_block( + async fn state_diff_for_block( self, block: BlockNumber, state_diff_length: u64, @@ -653,7 +768,7 @@ impl Client { Ok(None) } - pub async fn class_definitions_for_block( + async fn class_definitions_for_block( self, block: BlockNumber, declared_classes_count: u64, @@ -733,6 +848,52 @@ impl Client { Ok(None) } + + async fn events_for_block( + self, + block: BlockNumber, + ) -> Option<( + PeerId, + impl futures::Stream, + )> { + let request = EventsRequest { + iteration: Iteration { + start: block.get().into(), + direction: Direction::Forward, + limit: 1, + step: 1.into(), + }, + }; + + let peers = self + .get_update_peers_with_sync_capability(protocol::Events::NAME) + .await; + + for peer in peers { + let Ok(stream) = self + .inner + .send_events_sync_request(peer, request) + .await + .inspect_err(|error| tracing::debug!(%peer, %error, "Events request failed")) + else { + continue; + }; + + let stream = stream + .take_while(|x| std::future::ready(!matches!(x, &EventsResponse::Fin))) + .map(|x| match x { + EventsResponse::Fin => unreachable!("Already handled Fin above"), + EventsResponse::Event(event) => ( + TransactionHash(event.transaction_hash.0), + Event::from_dto(event), + ), + }); + + return Some((peer, stream)); + } + + None + } } pub fn make_transaction_stream( diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 8f71bb1a8c..e56d06cd37 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -5,6 +5,7 @@ use futures::stream::BoxStream; use futures::{Stream, StreamExt, TryStreamExt}; use p2p::client::peer_agnostic::{ self, + BlockClient, BlockHeader as P2PBlockHeader, ClassDefinition as P2PClassDefinition, ClassDefinitionsError, From 1334636af295b5f1e72b5bc1db9e5a0114b42338 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 1 Jul 2024 16:58:40 +0200 Subject: [PATCH 03/18] refactor(p2p): extract a trait from the client --- crates/p2p/src/client/peer_agnostic.rs | 247 ++++++++++++----------- crates/pathfinder/src/sync/checkpoint.rs | 1 + crates/pathfinder/src/sync/track.rs | 1 + 3 files changed, 131 insertions(+), 118 deletions(-) diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 9f589a9c4a..4e3d46c46d 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -143,6 +143,49 @@ pub struct Client { peers_with_capability: Arc>, } +pub trait HeaderStream { + fn header_stream( + self, + start: BlockNumber, + stop: BlockNumber, + reverse: bool, + ) -> impl futures::Stream>; +} + +pub trait BlockClient { + fn transactions_for_block( + self, + block: BlockNumber, + ) -> impl Future< + Output = Option<( + PeerId, + impl futures::Stream>, + )>, + >; + + fn state_diff_for_block( + self, + block: BlockNumber, + state_diff_length: u64, + ) -> impl Future, IncorrectStateDiffCount>>; + + fn class_definitions_for_block( + self, + block: BlockNumber, + declared_classes_count: u64, + ) -> impl Future)>, ClassDefinitionsError>>; + + fn events_for_block( + self, + block: BlockNumber, + ) -> impl Future< + Output = Option<( + PeerId, + impl futures::Stream, + )>, + >; +} + impl Client { pub fn new(inner: peer_aware::Client, block_propagation_topic: String) -> Self { Self { @@ -203,96 +246,6 @@ impl Client { .await } - pub fn header_stream( - self, - start: BlockNumber, - stop: BlockNumber, - reverse: bool, - ) -> impl futures::Stream> { - let (mut start, stop, direction) = match reverse { - true => (stop, start, Direction::Backward), - false => (start, stop, Direction::Forward), - }; - - tracing::trace!(?start, ?stop, ?direction, "Streaming headers"); - - async_stream::stream! { - // Loop which refreshes peer set once we exhaust it. - 'outer: loop { - let peers = self - .get_update_peers_with_sync_capability(protocol::Headers::NAME) - .await; - - // Attempt each peer. - 'next_peer: for peer in peers { - - match direction { - Direction::Forward => { - if start >= stop { - break 'outer; - } - } - Direction::Backward => { - if start <= stop { - break 'outer; - } - } - } - - let limit = start.get().max(stop.get()) - start.get().min(stop.get()) + 1; - - let request = BlockHeadersRequest { - iteration: Iteration { - start: start.get().into(), - direction, - limit, - step: 1.into(), - }, - }; - - let mut responses = - match self.inner.send_headers_sync_request(peer, request).await { - Ok(x) => x, - Err(error) => { - // Failed to establish connection, try next peer. - tracing::debug!(%peer, reason=%error, "Headers request failed"); - continue 'next_peer; - } - }; - - while let Some(signed_header) = responses.next().await { - let signed_header = match signed_header { - BlockHeadersResponse::Header(hdr) => { - match SignedBlockHeader::try_from(*hdr) { - Ok(hdr) => hdr, - Err(error) => { - tracing::debug!(%peer, %error, "Header stream failed"); - continue 'next_peer; - } - } - } - BlockHeadersResponse::Fin => { - tracing::debug!(%peer, "Header stream Fin"); - continue 'next_peer; - } - }; - - start = match direction { - Direction::Forward => start + 1, - // unwrap_or_default is safe as this is the genesis edge case, - // at which point the loop will complete at the end of this iteration. - Direction::Backward => start.parent().unwrap_or_default(), - }; - - yield PeerData::new(peer, signed_header); - } - - // TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc. - } - } - } - } - pub fn transaction_stream( self, start: BlockNumber, @@ -552,38 +505,96 @@ impl Client { } } -pub trait BlockClient { - fn transactions_for_block( +impl HeaderStream for Client { + fn header_stream( self, - block: BlockNumber, - ) -> impl Future< - Output = Option<( - PeerId, - impl futures::Stream>, - )>, - >; + start: BlockNumber, + stop: BlockNumber, + reverse: bool, + ) -> impl futures::Stream> { + let (mut start, stop, direction) = match reverse { + true => (stop, start, Direction::Backward), + false => (start, stop, Direction::Forward), + }; - fn state_diff_for_block( - self, - block: BlockNumber, - state_diff_length: u64, - ) -> impl Future, IncorrectStateDiffCount>>; + tracing::trace!(?start, ?stop, ?direction, "Streaming headers"); - fn class_definitions_for_block( - self, - block: BlockNumber, - declared_classes_count: u64, - ) -> impl Future)>, ClassDefinitionsError>>; + async_stream::stream! { + // Loop which refreshes peer set once we exhaust it. + 'outer: loop { + let peers = self + .get_update_peers_with_sync_capability(protocol::Headers::NAME) + .await; - fn events_for_block( - self, - block: BlockNumber, - ) -> impl Future< - Output = Option<( - PeerId, - impl futures::Stream, - )>, - >; + // Attempt each peer. + 'next_peer: for peer in peers { + + match direction { + Direction::Forward => { + if start >= stop { + break 'outer; + } + } + Direction::Backward => { + if start <= stop { + break 'outer; + } + } + } + + let limit = start.get().max(stop.get()) - start.get().min(stop.get()) + 1; + + let request = BlockHeadersRequest { + iteration: Iteration { + start: start.get().into(), + direction, + limit, + step: 1.into(), + }, + }; + + let mut responses = + match self.inner.send_headers_sync_request(peer, request).await { + Ok(x) => x, + Err(error) => { + // Failed to establish connection, try next peer. + tracing::debug!(%peer, reason=%error, "Headers request failed"); + continue 'next_peer; + } + }; + + while let Some(signed_header) = responses.next().await { + let signed_header = match signed_header { + BlockHeadersResponse::Header(hdr) => { + match SignedBlockHeader::try_from(*hdr) { + Ok(hdr) => hdr, + Err(error) => { + tracing::debug!(%peer, %error, "Header stream failed"); + continue 'next_peer; + } + } + } + BlockHeadersResponse::Fin => { + tracing::debug!(%peer, "Header stream Fin"); + continue 'next_peer; + } + }; + + start = match direction { + Direction::Forward => start + 1, + // unwrap_or_default is safe as this is the genesis edge case, + // at which point the loop will complete at the end of this iteration. + Direction::Backward => start.parent().unwrap_or_default(), + }; + + yield PeerData::new(peer, signed_header); + } + + // TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc. + } + } + } + } } impl BlockClient for Client { diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 89b5692141..824b963704 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -9,6 +9,7 @@ use p2p::client::peer_agnostic::{ ClassDefinition, Client as P2PClient, EventsForBlockByTransaction, + HeaderStream, SignedBlockHeader as P2PSignedBlockHeader, UnverifiedStateUpdateData, UnverifiedTransactionData, diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index e56d06cd37..d36b14a0c5 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -10,6 +10,7 @@ use p2p::client::peer_agnostic::{ ClassDefinition as P2PClassDefinition, ClassDefinitionsError, Client as P2PClient, + HeaderStream, IncorrectStateDiffCount, SignedBlockHeader as P2PSignedBlockHeader, UnverifiedStateUpdateData, From ad765b79c77f6abc999b2a2cfe8a23c8fd9f7394 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 2 Jul 2024 12:23:22 +0200 Subject: [PATCH 04/18] refactor(p2p): extract more traits from the client --- crates/p2p/src/client/peer_agnostic.rs | 448 ++++++++++------------- crates/pathfinder/src/sync/checkpoint.rs | 6 +- crates/pathfinder/src/sync/track.rs | 57 ++- 3 files changed, 228 insertions(+), 283 deletions(-) diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 4e3d46c46d..de84b66b23 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Context; use fake::Dummy; -use futures::{pin_mut, Future, StreamExt}; +use futures::{pin_mut, Future, Stream, StreamExt}; use libp2p::PeerId; use p2p_proto::class::{ClassesRequest, ClassesResponse}; use p2p_proto::common::{Direction, Iteration}; @@ -149,7 +149,63 @@ pub trait HeaderStream { start: BlockNumber, stop: BlockNumber, reverse: bool, - ) -> impl futures::Stream>; + ) -> impl Stream> + Send; +} + +pub trait TransactionStream { + fn transaction_stream( + self, + start: BlockNumber, + stop: BlockNumber, + transaction_counts_and_commitments_stream: impl Stream< + Item = anyhow::Result<(usize, TransactionCommitment)>, + >, + ) -> impl Stream< + Item = Result, PeerData>, + >; +} + +pub trait StateDiffStream { + /// ### Important + /// + /// Contract class updates are by default set to + /// `ContractClassUpdate::Deploy` but __the caller is responsible for + /// determining if the class was really deployed or replaced__. + fn state_diff_stream( + self, + start: BlockNumber, + stop: BlockNumber, + state_diff_length_and_commitment_stream: impl Stream< + Item = anyhow::Result<(usize, StateDiffCommitment)>, + >, + ) -> impl Stream< + Item = Result, PeerData>, + >; +} + +pub trait ClassStream { + fn class_stream( + self, + start: BlockNumber, + stop: BlockNumber, + declared_class_counts_stream: impl Stream>, + ) -> impl Stream, PeerData>>; +} + +pub trait EventStream { + /// ### Important + /// + /// Events are grouped by block and by transaction. The order of flattened + /// events in a block is guaranteed to be correct because the event + /// commitment is part of block hash. However the number of events per + /// transaction for __pre 0.13.2__ Starknet blocks is __TRUSTED__ + /// because neither signature nor block hash contain this information. + fn event_stream( + self, + start: BlockNumber, + stop: BlockNumber, + event_counts_stream: impl Stream>, + ) -> impl Stream, PeerData>>; } pub trait BlockClient { @@ -159,31 +215,26 @@ pub trait BlockClient { ) -> impl Future< Output = Option<( PeerId, - impl futures::Stream>, + impl Stream> + Send, )>, - >; + > + Send; fn state_diff_for_block( self, block: BlockNumber, state_diff_length: u64, - ) -> impl Future, IncorrectStateDiffCount>>; + ) -> impl Future, IncorrectStateDiffCount>> + Send; fn class_definitions_for_block( self, block: BlockNumber, declared_classes_count: u64, - ) -> impl Future)>, ClassDefinitionsError>>; + ) -> impl Future)>, ClassDefinitionsError>> + Send; fn events_for_block( self, block: BlockNumber, - ) -> impl Future< - Output = Option<( - PeerId, - impl futures::Stream, - )>, - >; + ) -> impl Future + Send)>> + Send; } impl Client { @@ -240,20 +291,109 @@ impl Client { peers.shuffle(&mut rand::thread_rng()); peers } +} - pub async fn get_update_peers_with_transaction_sync_capability(&self) -> Vec { - self.get_update_peers_with_sync_capability(protocol::Transactions::NAME) - .await +impl HeaderStream for Client { + fn header_stream( + self, + start: BlockNumber, + stop: BlockNumber, + reverse: bool, + ) -> impl Stream> { + let (mut start, stop, direction) = match reverse { + true => (stop, start, Direction::Backward), + false => (start, stop, Direction::Forward), + }; + + tracing::trace!(?start, ?stop, ?direction, "Streaming headers"); + + async_stream::stream! { + // Loop which refreshes peer set once we exhaust it. + 'outer: loop { + let peers = self + .get_update_peers_with_sync_capability(protocol::Headers::NAME) + .await; + + // Attempt each peer. + 'next_peer: for peer in peers { + + match direction { + Direction::Forward => { + if start >= stop { + break 'outer; + } + } + Direction::Backward => { + if start <= stop { + break 'outer; + } + } + } + + let limit = start.get().max(stop.get()) - start.get().min(stop.get()) + 1; + + let request = BlockHeadersRequest { + iteration: Iteration { + start: start.get().into(), + direction, + limit, + step: 1.into(), + }, + }; + + let mut responses = + match self.inner.send_headers_sync_request(peer, request).await { + Ok(x) => x, + Err(error) => { + // Failed to establish connection, try next peer. + tracing::debug!(%peer, reason=%error, "Headers request failed"); + continue 'next_peer; + } + }; + + while let Some(signed_header) = responses.next().await { + let signed_header = match signed_header { + BlockHeadersResponse::Header(hdr) => { + match SignedBlockHeader::try_from(*hdr) { + Ok(hdr) => hdr, + Err(error) => { + tracing::debug!(%peer, %error, "Header stream failed"); + continue 'next_peer; + } + } + } + BlockHeadersResponse::Fin => { + tracing::debug!(%peer, "Header stream Fin"); + continue 'next_peer; + } + }; + + start = match direction { + Direction::Forward => start + 1, + // unwrap_or_default is safe as this is the genesis edge case, + // at which point the loop will complete at the end of this iteration. + Direction::Backward => start.parent().unwrap_or_default(), + }; + + yield PeerData::new(peer, signed_header); + } + + // TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc. + } + } + } } +} - pub fn transaction_stream( +impl TransactionStream for Client { + fn transaction_stream( self, start: BlockNumber, stop: BlockNumber, - transaction_counts_and_commitments_stream: impl futures::Stream< + transaction_counts_and_commitments_stream: impl Stream< Item = anyhow::Result<(usize, TransactionCommitment)>, >, - ) -> impl futures::Stream< + ) -> impl Stream< Item = Result, PeerData>, > { let inner = self.inner.clone(); @@ -266,7 +406,7 @@ impl Client { let outer = outer.clone(); async move { outer - .get_update_peers_with_transaction_sync_capability() + .get_update_peers_with_sync_capability(protocol::Transactions::NAME) .await } }, @@ -276,20 +416,22 @@ impl Client { }, ) } +} +impl StateDiffStream for Client { /// ### Important /// /// Contract class updates are by default set to /// `ContractClassUpdate::Deploy` but __the caller is responsible for /// determining if the class was really deployed or replaced__. - pub fn state_diff_stream( + fn state_diff_stream( self, start: BlockNumber, stop: BlockNumber, - state_diff_length_and_commitment_stream: impl futures::Stream< + state_diff_length_and_commitment_stream: impl Stream< Item = anyhow::Result<(usize, StateDiffCommitment)>, >, - ) -> impl futures::Stream< + ) -> impl Stream< Item = Result, PeerData>, > { let inner = self.inner.clone(); @@ -312,14 +454,15 @@ impl Client { }, ) } +} - pub fn class_definition_stream( +impl ClassStream for Client { + fn class_stream( self, start: BlockNumber, stop: BlockNumber, - declared_class_counts_stream: impl futures::Stream>, - ) -> impl futures::Stream, PeerData>> - { + declared_class_counts_stream: impl Stream>, + ) -> impl Stream, PeerData>> { let inner = self.inner.clone(); let outer = self; make_class_definition_stream( @@ -340,7 +483,9 @@ impl Client { }, ) } +} +impl EventStream for Client { /// ### Important /// /// Events are grouped by block and by transaction. The order of flattened @@ -348,14 +493,13 @@ impl Client { /// commitment is part of block hash. However the number of events per /// transaction for __pre 0.13.2__ Starknet blocks is __TRUSTED__ /// because neither signature nor block hash contain this information. - pub fn event_stream( + fn event_stream( self, start: BlockNumber, stop: BlockNumber, - event_counts_stream: impl futures::Stream>, - ) -> impl futures::Stream< - Item = Result, PeerData>, - > { + event_counts_stream: impl Stream>, + ) -> impl Stream, PeerData>> + { let inner = self.inner.clone(); let outer = self; make_event_stream( @@ -376,225 +520,6 @@ impl Client { }, ) } - - pub async fn events_for_block( - self, - block: BlockNumber, - ) -> Option<( - PeerId, - impl futures::Stream, - )> { - let request = EventsRequest { - iteration: Iteration { - start: block.get().into(), - direction: Direction::Forward, - limit: 1, - step: 1.into(), - }, - }; - - let peers = self - .get_update_peers_with_sync_capability(protocol::Events::NAME) - .await; - - for peer in peers { - let Ok(stream) = self - .inner - .send_events_sync_request(peer, request) - .await - .inspect_err(|error| tracing::debug!(%peer, %error, "Events request failed")) - else { - continue; - }; - - let stream = stream - .take_while(|x| std::future::ready(!matches!(x, &EventsResponse::Fin))) - .map(|x| match x { - EventsResponse::Fin => unreachable!("Already handled Fin above"), - EventsResponse::Event(event) => ( - TransactionHash(event.transaction_hash.0), - Event::from_dto(event), - ), - }); - - return Some((peer, stream)); - } - - None - } - - pub async fn class_definitions_for_block( - self, - block: BlockNumber, - declared_classes_count: u64, - ) -> Result)>, ClassDefinitionsError> { - let request = ClassesRequest { - iteration: Iteration { - start: block.get().into(), - direction: Direction::Forward, - limit: 1, - step: 1.into(), - }, - }; - - let peers = self - .get_update_peers_with_sync_capability(protocol::Classes::NAME) - .await; - - for peer in peers { - let Ok(mut stream) = self - .inner - .send_classes_sync_request(peer, request) - .await - .inspect_err(|error| tracing::debug!(%peer, %error, "State diffs request failed")) - else { - continue; - }; - - let mut current_count = declared_classes_count; - let mut class_definitions = Vec::new(); - - while let Some(resp) = stream.next().await { - match resp { - ClassesResponse::Class(p2p_proto::class::Class::Cairo0 { - class, - domain: _, - }) => { - let definition = CairoDefinition::try_from_dto(class) - .map_err(|_| ClassDefinitionsError::CairoDefinitionError(peer))?; - class_definitions.push(ClassDefinition::Cairo { - block_number: block, - definition: definition.0, - }); - } - ClassesResponse::Class(p2p_proto::class::Class::Cairo1 { - class, - domain: _, - }) => { - let definition = SierraDefinition::try_from_dto(class) - .map_err(|_| ClassDefinitionsError::SierraDefinitionError(peer))?; - class_definitions.push(ClassDefinition::Sierra { - block_number: block, - sierra_definition: definition.0, - }); - } - ClassesResponse::Fin => { - tracing::debug!(%peer, "Received FIN in class definitions source"); - break; - } - } - - current_count = match current_count.checked_sub(1) { - Some(x) => x, - None => { - tracing::debug!(%peer, "Too many class definitions"); - return Err(ClassDefinitionsError::IncorrectClassDefinitionCount(peer)); - } - }; - } - - if current_count != 0 { - tracing::debug!(%peer, "Too few class definitions"); - return Err(ClassDefinitionsError::IncorrectClassDefinitionCount(peer)); - } - - return Ok(Some((peer, class_definitions))); - } - - Ok(None) - } -} - -impl HeaderStream for Client { - fn header_stream( - self, - start: BlockNumber, - stop: BlockNumber, - reverse: bool, - ) -> impl futures::Stream> { - let (mut start, stop, direction) = match reverse { - true => (stop, start, Direction::Backward), - false => (start, stop, Direction::Forward), - }; - - tracing::trace!(?start, ?stop, ?direction, "Streaming headers"); - - async_stream::stream! { - // Loop which refreshes peer set once we exhaust it. - 'outer: loop { - let peers = self - .get_update_peers_with_sync_capability(protocol::Headers::NAME) - .await; - - // Attempt each peer. - 'next_peer: for peer in peers { - - match direction { - Direction::Forward => { - if start >= stop { - break 'outer; - } - } - Direction::Backward => { - if start <= stop { - break 'outer; - } - } - } - - let limit = start.get().max(stop.get()) - start.get().min(stop.get()) + 1; - - let request = BlockHeadersRequest { - iteration: Iteration { - start: start.get().into(), - direction, - limit, - step: 1.into(), - }, - }; - - let mut responses = - match self.inner.send_headers_sync_request(peer, request).await { - Ok(x) => x, - Err(error) => { - // Failed to establish connection, try next peer. - tracing::debug!(%peer, reason=%error, "Headers request failed"); - continue 'next_peer; - } - }; - - while let Some(signed_header) = responses.next().await { - let signed_header = match signed_header { - BlockHeadersResponse::Header(hdr) => { - match SignedBlockHeader::try_from(*hdr) { - Ok(hdr) => hdr, - Err(error) => { - tracing::debug!(%peer, %error, "Header stream failed"); - continue 'next_peer; - } - } - } - BlockHeadersResponse::Fin => { - tracing::debug!(%peer, "Header stream Fin"); - continue 'next_peer; - } - }; - - start = match direction { - Direction::Forward => start + 1, - // unwrap_or_default is safe as this is the genesis edge case, - // at which point the loop will complete at the end of this iteration. - Direction::Backward => start.parent().unwrap_or_default(), - }; - - yield PeerData::new(peer, signed_header); - } - - // TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc. - } - } - } - } } impl BlockClient for Client { @@ -603,7 +528,7 @@ impl BlockClient for Client { block: BlockNumber, ) -> Option<( PeerId, - impl futures::Stream>, + impl Stream>, )> { let request = TransactionsRequest { iteration: Iteration { @@ -863,10 +788,7 @@ impl BlockClient for Client { async fn events_for_block( self, block: BlockNumber, - ) -> Option<( - PeerId, - impl futures::Stream, - )> { + ) -> Option<(PeerId, impl Stream)> { let request = EventsRequest { iteration: Iteration { start: block.get().into(), @@ -910,12 +832,12 @@ impl BlockClient for Client { pub fn make_transaction_stream( mut start: BlockNumber, stop: BlockNumber, - transaction_counts_and_commitments_stream: impl futures::Stream< + transaction_counts_and_commitments_stream: impl Stream< Item = anyhow::Result<(usize, TransactionCommitment)>, >, get_peers: impl Fn() -> PF, send_request: impl Fn(PeerId, TransactionsRequest) -> RF, -) -> impl futures::Stream< +) -> impl Stream< Item = Result, PeerData>, > where @@ -1095,12 +1017,12 @@ where pub fn make_state_diff_stream( mut start: BlockNumber, stop: BlockNumber, - state_diff_length_and_commitment_stream: impl futures::Stream< + state_diff_length_and_commitment_stream: impl Stream< Item = anyhow::Result<(usize, StateDiffCommitment)>, >, get_peers: impl Fn() -> PF, send_request: impl Fn(PeerId, StateDiffsRequest) -> RF, -) -> impl futures::Stream< +) -> impl Stream< Item = Result, PeerData>, > where @@ -1340,10 +1262,10 @@ where pub fn make_class_definition_stream( mut start: BlockNumber, stop: BlockNumber, - declared_class_counts_stream: impl futures::Stream>, + declared_class_counts_stream: impl Stream>, get_peers: impl Fn() -> PF, send_request: impl Fn(PeerId, ClassesRequest) -> RF, -) -> impl futures::Stream, PeerData>> +) -> impl Stream, PeerData>> where PF: std::future::Future>, RF: std::future::Future< @@ -1477,10 +1399,10 @@ where pub fn make_event_stream( mut start: BlockNumber, stop: BlockNumber, - event_counts_stream: impl futures::Stream>, + event_counts_stream: impl Stream>, get_peers: impl Fn() -> PF, send_request: impl Fn(PeerId, EventsRequest) -> RF, -) -> impl futures::Stream, PeerData>> +) -> impl Stream, PeerData>> where PF: std::future::Future>, RF: std::future::Future< diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 824b963704..f3a20f830e 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -7,10 +7,14 @@ use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use p2p::client::conv::TryFromDto; use p2p::client::peer_agnostic::{ ClassDefinition, + ClassStream, Client as P2PClient, + EventStream, EventsForBlockByTransaction, HeaderStream, SignedBlockHeader as P2PSignedBlockHeader, + StateDiffStream, + TransactionStream, UnverifiedStateUpdateData, UnverifiedTransactionData, }; @@ -205,7 +209,7 @@ impl Sync { return Ok(()); }; - let class_stream = self.p2p.clone().class_definition_stream( + let class_stream = self.p2p.clone().class_stream( start, stop, class_definitions::declared_class_counts_stream(self.storage.clone(), start, stop), diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index d36b14a0c5..d7e23c266e 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -1,8 +1,9 @@ use std::collections::{HashMap, HashSet}; +use std::pin; use anyhow::{anyhow, Context}; use futures::stream::BoxStream; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use p2p::client::peer_agnostic::{ self, BlockClient, @@ -47,18 +48,19 @@ use crate::sync::error::SyncError2; use crate::sync::stream::{ProcessStage, SyncReceiver, SyncResult}; use crate::sync::{events, headers}; -pub struct Sync { +pub struct Sync { latest: L, - p2p: P2PClient, + p2p: P, storage: Storage, chain: Chain, chain_id: ChainId, public_key: PublicKey, } -impl Sync +impl Sync where L: Stream + Clone + Send + 'static, + P: BlockClient + Clone + HeaderStream + Send + 'static, { pub async fn run( self, @@ -164,15 +166,16 @@ where } } -struct HeaderSource { - p2p: P2PClient, +struct HeaderSource { + p2p: P, latest_onchain: L, start: BlockNumber, } -impl HeaderSource +impl HeaderSource where L: Stream + Send + 'static, + P: Clone + HeaderStream + Send + 'static, { fn spawn(self) -> SyncReceiver { let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -333,12 +336,15 @@ impl HeaderFanout { } } -struct TransactionSource { - p2p: P2PClient, +struct TransactionSource

{ + p2p: P, headers: BoxStream<'static, P2PBlockHeader>, } -impl TransactionSource { +impl

TransactionSource

+where + P: Clone + BlockClient + Send + 'static, +{ fn spawn(self) -> SyncReceiver<(UnverifiedTransactionData, StarknetVersion)> { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { @@ -354,6 +360,8 @@ impl TransactionSource { let transaction_count = header.transaction_count; let mut transactions_vec = Vec::new(); + pin_mut!(transactions); + // Receive the exact amount of expected events for this block. for _ in 0..transaction_count { let (transaction, receipt) = match transactions.next().await { @@ -399,8 +407,8 @@ impl TransactionSource { } } -struct EventSource { - p2p: P2PClient, +struct EventSource

{ + p2p: P, headers: BoxStream<'static, P2PBlockHeader>, transactions: BoxStream<'static, Vec>, } @@ -412,7 +420,10 @@ type EventsWithCommitment = ( StarknetVersion, ); -impl EventSource { +impl

EventSource

+where + P: Clone + BlockClient + Send + 'static, +{ fn spawn(self) -> SyncReceiver { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { @@ -438,6 +449,8 @@ impl EventSource { let mut block_events: HashMap<_, Vec> = HashMap::new(); let event_count = header.event_count; + pin_mut!(events); + // Receive the exact amount of expected events for this block. for _ in 0..event_count { let Some((tx_hash, event)) = events.next().await else { @@ -478,12 +491,15 @@ impl EventSource { } } -struct StateDiffSource { - p2p: P2PClient, +struct StateDiffSource

{ + p2p: P, headers: BoxStream<'static, P2PSignedBlockHeader>, } -impl StateDiffSource { +impl

StateDiffSource

+where + P: Clone + BlockClient + Send + 'static, +{ fn spawn(self) -> SyncReceiver<(UnverifiedStateUpdateData, StarknetVersion)> { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { @@ -529,13 +545,16 @@ impl StateDiffSource { } } -struct ClassSource { - p2p: P2PClient, +struct ClassSource

{ + p2p: P, declarations: BoxStream<'static, DeclaredClasses>, start: BlockNumber, } -impl ClassSource { +impl

ClassSource

+where + P: Clone + BlockClient + Send + 'static, +{ fn spawn(self) -> SyncReceiver> { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { From 55aff0ce6a47964013672960f3e809e37dedc685 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 2 Jul 2024 12:45:46 +0200 Subject: [PATCH 05/18] refactor(p2p): move PeerData and traits to separate files --- crates/p2p/src/client/peer_agnostic.rs | 156 ++---------------- crates/p2p/src/client/peer_agnostic/traits.rs | 112 +++++++++++++ crates/p2p/src/lib.rs | 3 +- crates/p2p/src/peer_data.rs | 51 ++++++ crates/pathfinder/src/sync/checkpoint.rs | 12 +- crates/pathfinder/src/sync/track.rs | 3 +- 6 files changed, 185 insertions(+), 152 deletions(-) create mode 100644 crates/p2p/src/client/peer_agnostic/traits.rs create mode 100644 crates/p2p/src/peer_data.rs diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index de84b66b23..4cffc594d0 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Context; use fake::Dummy; -use futures::{pin_mut, Future, Stream, StreamExt}; +use futures::{pin_mut, Stream, StreamExt}; use libp2p::PeerId; use p2p_proto::class::{ClassesRequest, ClassesResponse}; use p2p_proto::common::{Direction, Iteration}; @@ -58,60 +58,22 @@ use tokio::sync::RwLock; mod fixtures; #[cfg(test)] mod tests; +pub mod traits; + +use traits::{ + BlockClient, + ClassStream, + EventStream, + HeaderStream, + StateDiffStream, + TransactionStream, +}; use crate::client::conv::{CairoDefinition, FromDto, SierraDefinition, TryFromDto}; use crate::client::peer_aware; +use crate::peer_data::PeerData; use crate::sync::protocol; -/// Data received from a specific peer. -#[derive(Clone, Debug, PartialEq)] -pub struct PeerData { - pub peer: PeerId, - pub data: T, -} - -impl PeerData { - pub fn new(peer: PeerId, data: T) -> Self { - Self { peer, data } - } - - pub fn from_result(peer: PeerId, result: Result) -> Result, PeerData> { - result - .map(|x| Self::new(peer, x)) - .map_err(|e| PeerData::::new(peer, e)) - } - - pub fn for_tests(data: T) -> Self { - Self { - peer: PeerId::random(), - data, - } - } - - pub fn map(self, f: F) -> PeerData - where - F: FnOnce(T) -> U, - { - PeerData { - peer: self.peer, - data: f(self.data), - } - } -} - -impl> Dummy for PeerData { - fn dummy_with_rng(config: &T, rng: &mut R) -> Self { - let digest = rng.gen::<[u8; 32]>(); - let multihash = libp2p::multihash::Multihash::wrap(0x0, &digest) - .expect("The digest size is never too large"); - - PeerData { - peer: PeerId::from_multihash(multihash).expect("Valid multihash"), - data: U::dummy_with_rng(config, rng), - } - } -} - #[derive(Clone, PartialEq, Dummy, TaggedDebug)] pub enum ClassDefinition { Cairo { @@ -143,100 +105,6 @@ pub struct Client { peers_with_capability: Arc>, } -pub trait HeaderStream { - fn header_stream( - self, - start: BlockNumber, - stop: BlockNumber, - reverse: bool, - ) -> impl Stream> + Send; -} - -pub trait TransactionStream { - fn transaction_stream( - self, - start: BlockNumber, - stop: BlockNumber, - transaction_counts_and_commitments_stream: impl Stream< - Item = anyhow::Result<(usize, TransactionCommitment)>, - >, - ) -> impl Stream< - Item = Result, PeerData>, - >; -} - -pub trait StateDiffStream { - /// ### Important - /// - /// Contract class updates are by default set to - /// `ContractClassUpdate::Deploy` but __the caller is responsible for - /// determining if the class was really deployed or replaced__. - fn state_diff_stream( - self, - start: BlockNumber, - stop: BlockNumber, - state_diff_length_and_commitment_stream: impl Stream< - Item = anyhow::Result<(usize, StateDiffCommitment)>, - >, - ) -> impl Stream< - Item = Result, PeerData>, - >; -} - -pub trait ClassStream { - fn class_stream( - self, - start: BlockNumber, - stop: BlockNumber, - declared_class_counts_stream: impl Stream>, - ) -> impl Stream, PeerData>>; -} - -pub trait EventStream { - /// ### Important - /// - /// Events are grouped by block and by transaction. The order of flattened - /// events in a block is guaranteed to be correct because the event - /// commitment is part of block hash. However the number of events per - /// transaction for __pre 0.13.2__ Starknet blocks is __TRUSTED__ - /// because neither signature nor block hash contain this information. - fn event_stream( - self, - start: BlockNumber, - stop: BlockNumber, - event_counts_stream: impl Stream>, - ) -> impl Stream, PeerData>>; -} - -pub trait BlockClient { - fn transactions_for_block( - self, - block: BlockNumber, - ) -> impl Future< - Output = Option<( - PeerId, - impl Stream> + Send, - )>, - > + Send; - - fn state_diff_for_block( - self, - block: BlockNumber, - state_diff_length: u64, - ) -> impl Future, IncorrectStateDiffCount>> + Send; - - fn class_definitions_for_block( - self, - block: BlockNumber, - declared_classes_count: u64, - ) -> impl Future)>, ClassDefinitionsError>> + Send; - - fn events_for_block( - self, - block: BlockNumber, - ) -> impl Future + Send)>> + Send; -} - impl Client { pub fn new(inner: peer_aware::Client, block_propagation_topic: String) -> Self { Self { diff --git a/crates/p2p/src/client/peer_agnostic/traits.rs b/crates/p2p/src/client/peer_agnostic/traits.rs new file mode 100644 index 0000000000..24000a831c --- /dev/null +++ b/crates/p2p/src/client/peer_agnostic/traits.rs @@ -0,0 +1,112 @@ +use futures::{Future, Stream}; +use libp2p::PeerId; +use pathfinder_common::event::Event; +use pathfinder_common::state_update::StateUpdateData; +use pathfinder_common::transaction::TransactionVariant; +use pathfinder_common::{BlockNumber, StateDiffCommitment, TransactionCommitment, TransactionHash}; + +use super::{ + ClassDefinition, + ClassDefinitionsError, + EventsForBlockByTransaction, + IncorrectStateDiffCount, + Receipt, + SignedBlockHeader, + UnverifiedStateUpdateData, + UnverifiedTransactionData, +}; +use crate::PeerData; + +pub trait HeaderStream { + fn header_stream( + self, + start: BlockNumber, + stop: BlockNumber, + reverse: bool, + ) -> impl Stream> + Send; +} + +pub trait TransactionStream { + fn transaction_stream( + self, + start: BlockNumber, + stop: BlockNumber, + transaction_counts_and_commitments_stream: impl Stream< + Item = anyhow::Result<(usize, TransactionCommitment)>, + >, + ) -> impl Stream< + Item = Result, PeerData>, + >; +} + +pub trait StateDiffStream { + /// ### Important + /// + /// Contract class updates are by default set to + /// `ContractClassUpdate::Deploy` but __the caller is responsible for + /// determining if the class was really deployed or replaced__. + fn state_diff_stream( + self, + start: BlockNumber, + stop: BlockNumber, + state_diff_length_and_commitment_stream: impl Stream< + Item = anyhow::Result<(usize, StateDiffCommitment)>, + >, + ) -> impl Stream< + Item = Result, PeerData>, + >; +} + +pub trait ClassStream { + fn class_stream( + self, + start: BlockNumber, + stop: BlockNumber, + declared_class_counts_stream: impl Stream>, + ) -> impl Stream, PeerData>>; +} + +pub trait EventStream { + /// ### Important + /// + /// Events are grouped by block and by transaction. The order of flattened + /// events in a block is guaranteed to be correct because the event + /// commitment is part of block hash. However the number of events per + /// transaction for __pre 0.13.2__ Starknet blocks is __TRUSTED__ + /// because neither signature nor block hash contain this information. + fn event_stream( + self, + start: BlockNumber, + stop: BlockNumber, + event_counts_stream: impl Stream>, + ) -> impl Stream, PeerData>>; +} + +pub trait BlockClient { + fn transactions_for_block( + self, + block: BlockNumber, + ) -> impl Future< + Output = Option<( + PeerId, + impl Stream> + Send, + )>, + > + Send; + + fn state_diff_for_block( + self, + block: BlockNumber, + state_diff_length: u64, + ) -> impl Future, IncorrectStateDiffCount>> + Send; + + fn class_definitions_for_block( + self, + block: BlockNumber, + declared_classes_count: u64, + ) -> impl Future)>, ClassDefinitionsError>> + Send; + + fn events_for_block( + self, + block: BlockNumber, + ) -> impl Future + Send)>> + Send; +} diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 7d15252c0d..d4b7d93350 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -20,6 +20,7 @@ use tokio::sync::{mpsc, oneshot}; mod behaviour; pub mod client; mod main_loop; +mod peer_data; mod peers; mod secret; mod sync; @@ -30,10 +31,10 @@ mod tests; mod transport; pub use behaviour::kademlia_protocol_name; -pub use client::peer_agnostic::PeerData; use client::peer_aware::Client; pub use libp2p; use main_loop::MainLoop; +pub use peer_data::PeerData; pub use sync::protocol::PROTOCOLS; pub fn new(keypair: Keypair, cfg: Config, chain_id: ChainId) -> (Client, EventReceiver, MainLoop) { diff --git a/crates/p2p/src/peer_data.rs b/crates/p2p/src/peer_data.rs new file mode 100644 index 0000000000..e543d1c71e --- /dev/null +++ b/crates/p2p/src/peer_data.rs @@ -0,0 +1,51 @@ +use fake::Dummy; +use libp2p::PeerId; + +/// Data received from a specific peer. +#[derive(Clone, Debug, PartialEq)] +pub struct PeerData { + pub peer: PeerId, + pub data: T, +} + +impl PeerData { + pub fn new(peer: PeerId, data: T) -> Self { + Self { peer, data } + } + + pub fn from_result(peer: PeerId, result: Result) -> Result, PeerData> { + result + .map(|x| Self::new(peer, x)) + .map_err(|e| PeerData::::new(peer, e)) + } + + pub fn for_tests(data: T) -> Self { + Self { + peer: PeerId::random(), + data, + } + } + + pub fn map(self, f: F) -> PeerData + where + F: FnOnce(T) -> U, + { + PeerData { + peer: self.peer, + data: f(self.data), + } + } +} + +impl> Dummy for PeerData { + fn dummy_with_rng(config: &T, rng: &mut R) -> Self { + let digest = rng.gen::<[u8; 32]>(); + let multihash = libp2p::multihash::Multihash::wrap(0x0, &digest) + .expect("The digest size is never too large"); + + PeerData { + peer: PeerId::from_multihash(multihash).expect("Valid multihash"), + data: U::dummy_with_rng(config, rng), + } + } +} diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index f3a20f830e..45b7669cbf 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -5,16 +5,18 @@ use std::sync::{Arc, RwLock}; use anyhow::Context; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use p2p::client::conv::TryFromDto; -use p2p::client::peer_agnostic::{ - ClassDefinition, +use p2p::client::peer_agnostic::traits::{ ClassStream, - Client as P2PClient, EventStream, - EventsForBlockByTransaction, HeaderStream, - SignedBlockHeader as P2PSignedBlockHeader, StateDiffStream, TransactionStream, +}; +use p2p::client::peer_agnostic::{ + ClassDefinition, + Client as P2PClient, + EventsForBlockByTransaction, + SignedBlockHeader as P2PSignedBlockHeader, UnverifiedStateUpdateData, UnverifiedTransactionData, }; diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index d7e23c266e..07c29c5cc5 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -4,14 +4,13 @@ use std::pin; use anyhow::{anyhow, Context}; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; +use p2p::client::peer_agnostic::traits::{BlockClient, HeaderStream}; use p2p::client::peer_agnostic::{ self, - BlockClient, BlockHeader as P2PBlockHeader, ClassDefinition as P2PClassDefinition, ClassDefinitionsError, Client as P2PClient, - HeaderStream, IncorrectStateDiffCount, SignedBlockHeader as P2PSignedBlockHeader, UnverifiedStateUpdateData, From f499a747f022aa5cad4bdf71f686be6bad39ce9b Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 2 Jul 2024 13:04:55 +0200 Subject: [PATCH 06/18] refactor(p2p): move p2p client specific types into a separate file --- crates/p2p/src/client.rs | 1 + crates/p2p/src/client/conv.rs | 8 +- crates/p2p/src/client/peer_agnostic.rs | 229 +---------------- crates/p2p/src/client/peer_agnostic/traits.rs | 2 +- crates/p2p/src/client/types.rs | 232 ++++++++++++++++++ .../src/p2p_network/sync_handlers/tests.rs | 2 +- crates/pathfinder/src/sync/checkpoint.rs | 10 +- .../pathfinder/src/sync/class_definitions.rs | 2 +- crates/pathfinder/src/sync/events.rs | 2 +- crates/pathfinder/src/sync/headers.rs | 2 +- crates/pathfinder/src/sync/state_updates.rs | 2 +- crates/pathfinder/src/sync/track.rs | 5 +- crates/pathfinder/src/sync/transactions.rs | 2 +- 13 files changed, 260 insertions(+), 239 deletions(-) create mode 100644 crates/p2p/src/client/types.rs diff --git a/crates/p2p/src/client.rs b/crates/p2p/src/client.rs index f7f50c70ed..8bc401f673 100644 --- a/crates/p2p/src/client.rs +++ b/crates/p2p/src/client.rs @@ -1,3 +1,4 @@ pub mod conv; pub mod peer_agnostic; pub mod peer_aware; +pub mod types; diff --git a/crates/p2p/src/client/conv.rs b/crates/p2p/src/client/conv.rs index 57ab59c19f..13808d3056 100644 --- a/crates/p2p/src/client/conv.rs +++ b/crates/p2p/src/client/conv.rs @@ -320,9 +320,7 @@ impl ToDto for (&TransactionVariant, Receipt) { } #[cfg(test)] -impl ToDto - for (&TransactionVariant, crate::client::peer_agnostic::Receipt) -{ +impl ToDto for (&TransactionVariant, crate::client::types::Receipt) { fn to_dto(self) -> p2p_proto::receipt::Receipt { let (t, r) = self; ( @@ -626,9 +624,7 @@ impl TryFromDto for TransactionVariant { } } -impl TryFrom<(p2p_proto::receipt::Receipt, TransactionIndex)> - for crate::client::peer_agnostic::Receipt -{ +impl TryFrom<(p2p_proto::receipt::Receipt, TransactionIndex)> for crate::client::types::Receipt { type Error = anyhow::Error; fn try_from( diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 4cffc594d0..d1cdd28dff 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use fake::Dummy; use futures::{pin_mut, Stream, StreamExt}; use libp2p::PeerId; use p2p_proto::class::{ClassesRequest, ClassesResponse}; @@ -21,28 +20,15 @@ use p2p_proto::state::{ }; use p2p_proto::transaction::{TransactionWithReceipt, TransactionsRequest, TransactionsResponse}; use pathfinder_common::event::Event; -use pathfinder_common::receipt::{ExecutionResources, ExecutionStatus, L2ToL1Message}; use pathfinder_common::state_update::{ContractClassUpdate, StateUpdateData}; use pathfinder_common::transaction::TransactionVariant; use pathfinder_common::{ - BlockCommitmentSignature, - BlockCommitmentSignatureElem, - BlockHash, BlockNumber, - BlockTimestamp, CasmHash, ClassHash, ContractAddress, ContractNonce, - EventCommitment, - Fee, - GasPrice, - L1DataAvailabilityMode, - ReceiptCommitment, - SequencerAddress, SierraHash, - StarknetVersion, - StateCommitment, StateDiffCommitment, StorageAddress, StorageValue, @@ -50,8 +36,6 @@ use pathfinder_common::{ TransactionHash, TransactionIndex, }; -use tagged::Tagged; -use tagged_debug_derive::TaggedDebug; use tokio::sync::RwLock; #[cfg(test)] @@ -71,33 +55,20 @@ use traits::{ use crate::client::conv::{CairoDefinition, FromDto, SierraDefinition, TryFromDto}; use crate::client::peer_aware; +use crate::client::types::{ + ClassDefinition, + ClassDefinitionsError, + EventsForBlockByTransaction, + IncorrectStateDiffCount, + Receipt, + SignedBlockHeader, + UnverifiedStateUpdateData, + UnverifiedTransactionData, + UnverifiedTransactionDataWithBlockNumber, +}; use crate::peer_data::PeerData; use crate::sync::protocol; -#[derive(Clone, PartialEq, Dummy, TaggedDebug)] -pub enum ClassDefinition { - Cairo { - block_number: BlockNumber, - definition: Vec, - }, - Sierra { - block_number: BlockNumber, - sierra_definition: Vec, - }, -} - -impl ClassDefinition { - /// Return Cairo or Sierra class definition depending on the variant. - pub fn class_definition(&self) -> Vec { - match self { - Self::Cairo { definition, .. } => definition.clone(), - Self::Sierra { - sierra_definition, .. - } => sierra_definition.clone(), - } - } -} - #[derive(Clone, Debug)] pub struct Client { inner: peer_aware::Client, @@ -1434,181 +1405,3 @@ impl Default for PeersWithCapability { Self::new(Duration::from_secs(60)) } } - -#[derive(Clone, Debug, Default, PartialEq, Eq, Dummy)] -pub struct Receipt { - pub actual_fee: Fee, - pub execution_resources: ExecutionResources, - pub l2_to_l1_messages: Vec, - pub execution_status: ExecutionStatus, - pub transaction_index: TransactionIndex, -} - -impl From for Receipt { - fn from(receipt: pathfinder_common::receipt::Receipt) -> Self { - Self { - actual_fee: receipt.actual_fee, - execution_resources: receipt.execution_resources, - l2_to_l1_messages: receipt.l2_to_l1_messages, - execution_status: receipt.execution_status, - transaction_index: receipt.transaction_index, - } - } -} - -/// For a single block -#[derive(Clone, Debug, PartialEq)] -pub struct UnverifiedTransactionData { - pub expected_commitment: TransactionCommitment, - pub transactions: Vec<(TransactionVariant, Receipt)>, -} - -pub type UnverifiedTransactionDataWithBlockNumber = (UnverifiedTransactionData, BlockNumber); - -/// For a single block -#[derive(Clone, PartialEq, Dummy, TaggedDebug)] -pub struct UnverifiedStateUpdateData { - pub expected_commitment: StateDiffCommitment, - pub state_diff: StateUpdateData, -} - -pub type UnverifiedStateUpdateWithBlockNumber = (UnverifiedStateUpdateData, BlockNumber); - -pub type EventsForBlockByTransaction = (BlockNumber, Vec<(TransactionHash, Vec)>); - -#[derive(Debug, Clone, PartialEq, Eq, Default, Dummy)] -pub struct BlockHeader { - pub hash: BlockHash, - pub parent_hash: BlockHash, - pub number: BlockNumber, - pub timestamp: BlockTimestamp, - pub eth_l1_gas_price: GasPrice, - pub strk_l1_gas_price: GasPrice, - pub eth_l1_data_gas_price: GasPrice, - pub strk_l1_data_gas_price: GasPrice, - pub sequencer_address: SequencerAddress, - pub starknet_version: StarknetVersion, - pub event_commitment: EventCommitment, - pub state_commitment: StateCommitment, - pub transaction_commitment: TransactionCommitment, - pub transaction_count: usize, - pub event_count: usize, - pub l1_da_mode: L1DataAvailabilityMode, - pub receipt_commitment: ReceiptCommitment, -} - -#[derive(Debug, Clone, PartialEq, Default)] -pub struct SignedBlockHeader { - pub header: BlockHeader, - pub signature: BlockCommitmentSignature, - pub state_diff_commitment: StateDiffCommitment, - pub state_diff_length: u64, -} - -impl From for SignedBlockHeader { - fn from(h: pathfinder_common::SignedBlockHeader) -> Self { - Self { - header: h.header.into(), - signature: h.signature, - state_diff_commitment: h.state_diff_commitment, - state_diff_length: h.state_diff_length, - } - } -} - -impl From for BlockHeader { - fn from(h: pathfinder_common::BlockHeader) -> Self { - Self { - hash: h.hash, - parent_hash: h.parent_hash, - number: h.number, - timestamp: h.timestamp, - eth_l1_gas_price: h.eth_l1_gas_price, - strk_l1_gas_price: h.strk_l1_gas_price, - eth_l1_data_gas_price: h.eth_l1_data_gas_price, - strk_l1_data_gas_price: h.strk_l1_data_gas_price, - sequencer_address: h.sequencer_address, - starknet_version: h.starknet_version, - event_commitment: h.event_commitment, - state_commitment: h.state_commitment, - transaction_commitment: h.transaction_commitment, - transaction_count: h.transaction_count, - event_count: h.event_count, - l1_da_mode: h.l1_da_mode, - receipt_commitment: Default::default(), - } - } -} - -impl TryFrom for SignedBlockHeader { - type Error = anyhow::Error; - - fn try_from(dto: p2p_proto::header::SignedBlockHeader) -> anyhow::Result { - anyhow::ensure!(dto.signatures.len() == 1, "expected exactly one signature"); - let signature = dto - .signatures - .into_iter() - .map(|sig| BlockCommitmentSignature { - r: BlockCommitmentSignatureElem(sig.r), - s: BlockCommitmentSignatureElem(sig.s), - }) - .next() - .expect("exactly one element"); - Ok(SignedBlockHeader { - header: BlockHeader { - hash: BlockHash(dto.block_hash.0), - parent_hash: BlockHash(dto.parent_hash.0), - number: BlockNumber::new(dto.number).context("block number > i64::MAX")?, - timestamp: BlockTimestamp::new(dto.time).context("block timestamp > i64::MAX")?, - eth_l1_gas_price: GasPrice(dto.gas_price_wei), - strk_l1_gas_price: GasPrice(dto.gas_price_fri), - eth_l1_data_gas_price: GasPrice(dto.data_gas_price_wei), - strk_l1_data_gas_price: GasPrice(dto.data_gas_price_fri), - sequencer_address: SequencerAddress(dto.sequencer_address.0), - starknet_version: dto.protocol_version.parse()?, - event_commitment: EventCommitment(dto.events.root.0), - state_commitment: StateCommitment(dto.state_root.0), - transaction_commitment: TransactionCommitment(dto.transactions.root.0), - transaction_count: dto.transactions.n_leaves.try_into()?, - event_count: dto.events.n_leaves.try_into()?, - receipt_commitment: ReceiptCommitment(dto.receipts.0), - l1_da_mode: TryFromDto::try_from_dto(dto.l1_data_availability_mode)?, - }, - signature, - state_diff_commitment: StateDiffCommitment(dto.state_diff_commitment.root.0), - state_diff_length: dto.state_diff_commitment.state_diff_length, - }) - } -} - -#[derive(Debug)] -pub struct IncorrectStateDiffCount(pub PeerId); - -impl std::fmt::Display for IncorrectStateDiffCount { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Incorrect state diff count from peer {}", self.0) - } -} - -#[derive(Debug)] -pub enum ClassDefinitionsError { - IncorrectClassDefinitionCount(PeerId), - CairoDefinitionError(PeerId), - SierraDefinitionError(PeerId), -} - -impl std::fmt::Display for ClassDefinitionsError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ClassDefinitionsError::IncorrectClassDefinitionCount(peer) => { - write!(f, "Incorrect class definition count from peer {}", peer) - } - ClassDefinitionsError::CairoDefinitionError(peer) => { - write!(f, "Cairo class definition error from peer {}", peer) - } - ClassDefinitionsError::SierraDefinitionError(peer) => { - write!(f, "Sierra class definition error from peer {}", peer) - } - } - } -} diff --git a/crates/p2p/src/client/peer_agnostic/traits.rs b/crates/p2p/src/client/peer_agnostic/traits.rs index 24000a831c..4c38f9f07b 100644 --- a/crates/p2p/src/client/peer_agnostic/traits.rs +++ b/crates/p2p/src/client/peer_agnostic/traits.rs @@ -5,7 +5,7 @@ use pathfinder_common::state_update::StateUpdateData; use pathfinder_common::transaction::TransactionVariant; use pathfinder_common::{BlockNumber, StateDiffCommitment, TransactionCommitment, TransactionHash}; -use super::{ +use crate::client::types::{ ClassDefinition, ClassDefinitionsError, EventsForBlockByTransaction, diff --git a/crates/p2p/src/client/types.rs b/crates/p2p/src/client/types.rs new file mode 100644 index 0000000000..b36cf7fb91 --- /dev/null +++ b/crates/p2p/src/client/types.rs @@ -0,0 +1,232 @@ +use anyhow::Context; +use fake::Dummy; +use libp2p::PeerId; +use pathfinder_common::event::Event; +use pathfinder_common::receipt::{ExecutionResources, ExecutionStatus, L2ToL1Message}; +use pathfinder_common::state_update::StateUpdateData; +use pathfinder_common::transaction::TransactionVariant; +use pathfinder_common::{ + BlockCommitmentSignature, + BlockCommitmentSignatureElem, + BlockHash, + BlockNumber, + BlockTimestamp, + EventCommitment, + Fee, + GasPrice, + L1DataAvailabilityMode, + ReceiptCommitment, + SequencerAddress, + StarknetVersion, + StateCommitment, + StateDiffCommitment, + TransactionCommitment, + TransactionHash, + TransactionIndex, +}; +use tagged::Tagged; +use tagged_debug_derive::TaggedDebug; + +use crate::client::conv::TryFromDto; + +#[derive(Clone, PartialEq, Dummy, TaggedDebug)] +pub enum ClassDefinition { + Cairo { + block_number: BlockNumber, + definition: Vec, + }, + Sierra { + block_number: BlockNumber, + sierra_definition: Vec, + }, +} + +impl ClassDefinition { + /// Return Cairo or Sierra class definition depending on the variant. + pub fn class_definition(&self) -> Vec { + match self { + Self::Cairo { definition, .. } => definition.clone(), + Self::Sierra { + sierra_definition, .. + } => sierra_definition.clone(), + } + } +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, Dummy)] +pub struct Receipt { + pub actual_fee: Fee, + pub execution_resources: ExecutionResources, + pub l2_to_l1_messages: Vec, + pub execution_status: ExecutionStatus, + pub transaction_index: TransactionIndex, +} + +impl From for Receipt { + fn from(receipt: pathfinder_common::receipt::Receipt) -> Self { + Self { + actual_fee: receipt.actual_fee, + execution_resources: receipt.execution_resources, + l2_to_l1_messages: receipt.l2_to_l1_messages, + execution_status: receipt.execution_status, + transaction_index: receipt.transaction_index, + } + } +} + +/// For a single block +#[derive(Clone, Debug, PartialEq)] +pub struct UnverifiedTransactionData { + pub expected_commitment: TransactionCommitment, + pub transactions: Vec<(TransactionVariant, Receipt)>, +} + +pub type UnverifiedTransactionDataWithBlockNumber = (UnverifiedTransactionData, BlockNumber); + +/// For a single block +#[derive(Clone, PartialEq, Dummy, TaggedDebug)] +pub struct UnverifiedStateUpdateData { + pub expected_commitment: StateDiffCommitment, + pub state_diff: StateUpdateData, +} + +pub type UnverifiedStateUpdateWithBlockNumber = (UnverifiedStateUpdateData, BlockNumber); + +pub type EventsForBlockByTransaction = (BlockNumber, Vec<(TransactionHash, Vec)>); + +#[derive(Debug, Clone, PartialEq, Eq, Default, Dummy)] +pub struct BlockHeader { + pub hash: BlockHash, + pub parent_hash: BlockHash, + pub number: BlockNumber, + pub timestamp: BlockTimestamp, + pub eth_l1_gas_price: GasPrice, + pub strk_l1_gas_price: GasPrice, + pub eth_l1_data_gas_price: GasPrice, + pub strk_l1_data_gas_price: GasPrice, + pub sequencer_address: SequencerAddress, + pub starknet_version: StarknetVersion, + pub event_commitment: EventCommitment, + pub state_commitment: StateCommitment, + pub transaction_commitment: TransactionCommitment, + pub transaction_count: usize, + pub event_count: usize, + pub l1_da_mode: L1DataAvailabilityMode, + pub receipt_commitment: ReceiptCommitment, +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct SignedBlockHeader { + pub header: BlockHeader, + pub signature: BlockCommitmentSignature, + pub state_diff_commitment: StateDiffCommitment, + pub state_diff_length: u64, +} + +impl From for SignedBlockHeader { + fn from(h: pathfinder_common::SignedBlockHeader) -> Self { + Self { + header: h.header.into(), + signature: h.signature, + state_diff_commitment: h.state_diff_commitment, + state_diff_length: h.state_diff_length, + } + } +} + +impl From for BlockHeader { + fn from(h: pathfinder_common::BlockHeader) -> Self { + Self { + hash: h.hash, + parent_hash: h.parent_hash, + number: h.number, + timestamp: h.timestamp, + eth_l1_gas_price: h.eth_l1_gas_price, + strk_l1_gas_price: h.strk_l1_gas_price, + eth_l1_data_gas_price: h.eth_l1_data_gas_price, + strk_l1_data_gas_price: h.strk_l1_data_gas_price, + sequencer_address: h.sequencer_address, + starknet_version: h.starknet_version, + event_commitment: h.event_commitment, + state_commitment: h.state_commitment, + transaction_commitment: h.transaction_commitment, + transaction_count: h.transaction_count, + event_count: h.event_count, + l1_da_mode: h.l1_da_mode, + receipt_commitment: Default::default(), + } + } +} + +impl TryFrom for SignedBlockHeader { + type Error = anyhow::Error; + + fn try_from(dto: p2p_proto::header::SignedBlockHeader) -> anyhow::Result { + anyhow::ensure!(dto.signatures.len() == 1, "expected exactly one signature"); + let signature = dto + .signatures + .into_iter() + .map(|sig| BlockCommitmentSignature { + r: BlockCommitmentSignatureElem(sig.r), + s: BlockCommitmentSignatureElem(sig.s), + }) + .next() + .expect("exactly one element"); + Ok(SignedBlockHeader { + header: BlockHeader { + hash: BlockHash(dto.block_hash.0), + parent_hash: BlockHash(dto.parent_hash.0), + number: BlockNumber::new(dto.number).context("block number > i64::MAX")?, + timestamp: BlockTimestamp::new(dto.time).context("block timestamp > i64::MAX")?, + eth_l1_gas_price: GasPrice(dto.gas_price_wei), + strk_l1_gas_price: GasPrice(dto.gas_price_fri), + eth_l1_data_gas_price: GasPrice(dto.data_gas_price_wei), + strk_l1_data_gas_price: GasPrice(dto.data_gas_price_fri), + sequencer_address: SequencerAddress(dto.sequencer_address.0), + starknet_version: dto.protocol_version.parse()?, + event_commitment: EventCommitment(dto.events.root.0), + state_commitment: StateCommitment(dto.state_root.0), + transaction_commitment: TransactionCommitment(dto.transactions.root.0), + transaction_count: dto.transactions.n_leaves.try_into()?, + event_count: dto.events.n_leaves.try_into()?, + receipt_commitment: ReceiptCommitment(dto.receipts.0), + l1_da_mode: TryFromDto::try_from_dto(dto.l1_data_availability_mode)?, + }, + signature, + state_diff_commitment: StateDiffCommitment(dto.state_diff_commitment.root.0), + state_diff_length: dto.state_diff_commitment.state_diff_length, + }) + } +} + +#[derive(Debug)] +pub struct IncorrectStateDiffCount(pub PeerId); + +impl std::fmt::Display for IncorrectStateDiffCount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Incorrect state diff count from peer {}", self.0) + } +} + +#[derive(Debug)] +pub enum ClassDefinitionsError { + IncorrectClassDefinitionCount(PeerId), + CairoDefinitionError(PeerId), + SierraDefinitionError(PeerId), +} + +impl std::fmt::Display for ClassDefinitionsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ClassDefinitionsError::IncorrectClassDefinitionCount(peer) => { + write!(f, "Incorrect class definition count from peer {}", peer) + } + ClassDefinitionsError::CairoDefinitionError(peer) => { + write!(f, "Cairo class definition error from peer {}", peer) + } + ClassDefinitionsError::SierraDefinitionError(peer) => { + write!(f, "Sierra class definition error from peer {}", peer) + } + } + } +} diff --git a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs index 3283008e65..e200533e60 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs @@ -104,7 +104,7 @@ mod prop { use futures::channel::mpsc; use futures::StreamExt; use p2p::client::conv::{CairoDefinition, SierraDefinition, TryFromDto}; - use p2p::client::peer_agnostic::{Receipt, SignedBlockHeader as P2PSignedBlockHeader}; + use p2p::client::types::{Receipt, SignedBlockHeader as P2PSignedBlockHeader}; use p2p_proto::class::{Class, ClassesRequest, ClassesResponse}; use p2p_proto::common::{BlockNumberOrHash, Iteration}; use p2p_proto::event::{EventsRequest, EventsResponse}; diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 45b7669cbf..330a8068c9 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -12,9 +12,9 @@ use p2p::client::peer_agnostic::traits::{ StateDiffStream, TransactionStream, }; -use p2p::client::peer_agnostic::{ +use p2p::client::peer_agnostic::Client as P2PClient; +use p2p::client::types::{ ClassDefinition, - Client as P2PClient, EventsForBlockByTransaction, SignedBlockHeader as P2PSignedBlockHeader, UnverifiedStateUpdateData, @@ -647,7 +647,7 @@ mod tests { use assert_matches::assert_matches; use fake::{Dummy, Fake, Faker}; use futures::stream; - use p2p::client::peer_agnostic::BlockHeader as P2PBlockHeader; + use p2p::client::types::BlockHeader as P2PBlockHeader; use p2p::libp2p::PeerId; use p2p_proto::header; use pathfinder_common::{ @@ -909,7 +909,7 @@ mod tests { use assert_matches::assert_matches; use fake::{Dummy, Faker}; use futures::stream; - use p2p::client::peer_agnostic::{ + use p2p::client::types::{ UnverifiedTransactionData, UnverifiedTransactionDataWithBlockNumber, }; @@ -1092,7 +1092,7 @@ mod tests { use assert_matches::assert_matches; use fake::{Dummy, Fake, Faker}; use futures::stream; - use p2p::client::peer_agnostic::UnverifiedStateUpdateWithBlockNumber; + use p2p::client::types::UnverifiedStateUpdateWithBlockNumber; use p2p::libp2p::PeerId; use pathfinder_common::state_update::{ContractClassUpdate, StateUpdateData}; use pathfinder_common::transaction::DeployTransactionV0; diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index e432ff1233..4d6ecfd7fa 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::Context; use futures::pin_mut; use futures::stream::{BoxStream, StreamExt}; -use p2p::client::peer_agnostic::ClassDefinition as P2PClassDefinition; +use p2p::client::types::ClassDefinition as P2PClassDefinition; use p2p::PeerData; use p2p_proto::transaction; use pathfinder_common::class_definition::{Cairo, ClassDefinition as GwClassDefinition, Sierra}; diff --git a/crates/pathfinder/src/sync/events.rs b/crates/pathfinder/src/sync/events.rs index 3c850f0746..9d2e60c85e 100644 --- a/crates/pathfinder/src/sync/events.rs +++ b/crates/pathfinder/src/sync/events.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZeroUsize; use anyhow::Context; -use p2p::client::peer_agnostic::{BlockHeader as P2PBlockHeader, EventsForBlockByTransaction}; +use p2p::client::types::{BlockHeader as P2PBlockHeader, EventsForBlockByTransaction}; use p2p::PeerData; use pathfinder_common::event::Event; use pathfinder_common::receipt::Receipt; diff --git a/crates/pathfinder/src/sync/headers.rs b/crates/pathfinder/src/sync/headers.rs index 95a7700742..6dcfd0ac26 100644 --- a/crates/pathfinder/src/sync/headers.rs +++ b/crates/pathfinder/src/sync/headers.rs @@ -1,7 +1,7 @@ #![allow(dead_code, unused_variables)] use anyhow::Context; use futures::StreamExt; -use p2p::client::peer_agnostic::SignedBlockHeader; +use p2p::client::types::SignedBlockHeader; use p2p::PeerData; use pathfinder_common::{ BlockHash, diff --git a/crates/pathfinder/src/sync/state_updates.rs b/crates/pathfinder/src/sync/state_updates.rs index 3db3ca15b6..4ecb67344e 100644 --- a/crates/pathfinder/src/sync/state_updates.rs +++ b/crates/pathfinder/src/sync/state_updates.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::num::NonZeroUsize; use anyhow::Context; -use p2p::client::peer_agnostic::UnverifiedStateUpdateData; +use p2p::client::types::UnverifiedStateUpdateData; use p2p::PeerData; use pathfinder_common::state_update::{ContractClassUpdate, ContractUpdate, StateUpdateData}; use pathfinder_common::{ diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 07c29c5cc5..c190c76466 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -5,12 +5,11 @@ use anyhow::{anyhow, Context}; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use p2p::client::peer_agnostic::traits::{BlockClient, HeaderStream}; -use p2p::client::peer_agnostic::{ - self, +use p2p::client::peer_agnostic::Client as P2PClient; +use p2p::client::types::{ BlockHeader as P2PBlockHeader, ClassDefinition as P2PClassDefinition, ClassDefinitionsError, - Client as P2PClient, IncorrectStateDiffCount, SignedBlockHeader as P2PSignedBlockHeader, UnverifiedStateUpdateData, diff --git a/crates/pathfinder/src/sync/transactions.rs b/crates/pathfinder/src/sync/transactions.rs index 46951104d0..6c2f1b6adb 100644 --- a/crates/pathfinder/src/sync/transactions.rs +++ b/crates/pathfinder/src/sync/transactions.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZeroUsize; use anyhow::{anyhow, Context}; -use p2p::client::peer_agnostic::{self, UnverifiedTransactionData}; +use p2p::client::types::UnverifiedTransactionData; use p2p::PeerData; use pathfinder_common::receipt::Receipt; use pathfinder_common::transaction::{Transaction, TransactionVariant}; From 4aa11fdd30d8534e69a6feb0eb7d4122045f7c71 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 2 Jul 2024 16:06:42 +0200 Subject: [PATCH 07/18] test(track): add mocks and happy path scaffolding --- crates/pathfinder/src/sync/track.rs | 229 +++++++++++++++++++++++----- 1 file changed, 195 insertions(+), 34 deletions(-) diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index c190c76466..c1f9cae0ff 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -55,18 +55,17 @@ pub struct Sync { public_key: PublicKey, } -impl Sync -where - L: Stream + Clone + Send + 'static, - P: BlockClient + Clone + HeaderStream + Send + 'static, -{ +impl Sync { pub async fn run( self, next: BlockNumber, parent_hash: BlockHash, - chain_id: ChainId, fgw: SequencerClient, - ) -> Result<(), PeerData> { + ) -> Result<(), PeerData> + where + L: Stream + Clone + Send + 'static, + P: BlockClient + Clone + HeaderStream + Send + 'static, + { let storage_connection = self .storage .connection() @@ -101,7 +100,7 @@ where headers: transactions, } .spawn() - .pipe(transactions::CalculateHashes(chain_id), 10) + .pipe(transactions::CalculateHashes(self.chain_id), 10) .pipe(transactions::VerifyCommitment, 10); let TransactionsFanout { @@ -170,12 +169,12 @@ struct HeaderSource { start: BlockNumber, } -impl HeaderSource -where - L: Stream + Send + 'static, - P: Clone + HeaderStream + Send + 'static, -{ - fn spawn(self) -> SyncReceiver { +impl HeaderSource { + fn spawn(self) -> SyncReceiver + where + L: Stream + Send + 'static, + P: Clone + HeaderStream + Send + 'static, + { let (tx, rx) = tokio::sync::mpsc::channel(1); let Self { p2p, @@ -339,11 +338,11 @@ struct TransactionSource

{ headers: BoxStream<'static, P2PBlockHeader>, } -impl

TransactionSource

-where - P: Clone + BlockClient + Send + 'static, -{ - fn spawn(self) -> SyncReceiver<(UnverifiedTransactionData, StarknetVersion)> { +impl

TransactionSource

{ + fn spawn(self) -> SyncReceiver<(UnverifiedTransactionData, StarknetVersion)> + where + P: Clone + BlockClient + Send + 'static, + { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let Self { p2p, mut headers } = self; @@ -418,11 +417,11 @@ type EventsWithCommitment = ( StarknetVersion, ); -impl

EventSource

-where - P: Clone + BlockClient + Send + 'static, -{ - fn spawn(self) -> SyncReceiver { +impl

EventSource

{ + fn spawn(self) -> SyncReceiver + where + P: Clone + BlockClient + Send + 'static, + { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let Self { @@ -494,11 +493,11 @@ struct StateDiffSource

{ headers: BoxStream<'static, P2PSignedBlockHeader>, } -impl

StateDiffSource

-where - P: Clone + BlockClient + Send + 'static, -{ - fn spawn(self) -> SyncReceiver<(UnverifiedStateUpdateData, StarknetVersion)> { +impl

StateDiffSource

{ + fn spawn(self) -> SyncReceiver<(UnverifiedStateUpdateData, StarknetVersion)> + where + P: Clone + BlockClient + Send + 'static, + { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let Self { p2p, mut headers } = self; @@ -549,11 +548,11 @@ struct ClassSource

{ start: BlockNumber, } -impl

ClassSource

-where - P: Clone + BlockClient + Send + 'static, -{ - fn spawn(self) -> SyncReceiver> { +impl

ClassSource

{ + fn spawn(self) -> SyncReceiver> + where + P: Clone + BlockClient + Send + 'static, + { let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let Self { @@ -724,3 +723,165 @@ impl ProcessStage for StoreBlock { .map_err(Into::into) } } + +#[cfg(test)] +mod tests { + use std::iter; + + use futures::{stream, Stream, StreamExt}; + use p2p::client::peer_agnostic::traits::{BlockClient, HeaderStream}; + use p2p::client::types::{ + ClassDefinition, + ClassDefinitionsError, + IncorrectStateDiffCount, + Receipt, + SignedBlockHeader, + }; + use p2p::libp2p::PeerId; + use p2p::PeerData; + use pathfinder_common::event::Event; + use pathfinder_common::state_update::StateUpdateData; + use pathfinder_common::transaction::TransactionVariant; + use pathfinder_common::{BlockHash, BlockNumber, Chain, ChainId, PublicKey, TransactionHash}; + use pathfinder_storage::fake::{self, Block}; + use pathfinder_storage::StorageBuilder; + use starknet_gateway_client::GatewayApi; + + use super::Sync; + + #[derive(Clone)] + struct FakeP2PClient { + pub blocks: Vec, + } + + impl HeaderStream for FakeP2PClient { + fn header_stream( + self, + start: BlockNumber, + stop: BlockNumber, + reverse: bool, + ) -> impl Stream> + Send { + assert!(!reverse); + assert_eq!(start, self.blocks.first().unwrap().header.header.number); + assert_eq!(start, self.blocks.last().unwrap().header.header.number); + + stream::iter( + self.blocks + .into_iter() + .map(|block| PeerData::for_tests(block.header.into())), + ) + } + } + + impl BlockClient for FakeP2PClient { + async fn transactions_for_block( + self, + block: BlockNumber, + ) -> Option<( + PeerId, + impl Stream> + Send, + )> { + let tr = self + .blocks + .iter() + .find(|b| b.header.header.number == block) + .unwrap() + .transaction_data + .iter() + .map(|(t, r, e)| Ok((t.variant.clone(), Receipt::from(r.clone())))) + .collect::>>(); + + Some((PeerId::random(), stream::iter(tr))) + } + + async fn state_diff_for_block( + self, + block: BlockNumber, + state_diff_length: u64, + ) -> Result, IncorrectStateDiffCount> { + let sd: StateUpdateData = self + .blocks + .iter() + .find(|b| b.header.header.number == block) + .unwrap() + .state_update + .clone() + .into(); + + assert_eq!(sd.state_diff_length() as u64, state_diff_length); + + Ok(Some((PeerId::random(), sd))) + } + + async fn class_definitions_for_block( + self, + block: BlockNumber, + declared_classes_count: u64, + ) -> Result)>, ClassDefinitionsError> { + let b = self + .blocks + .iter() + .find(|b| b.header.header.number == block) + .unwrap(); + let defs = b + .cairo_defs + .iter() + .map(|(_, x)| ClassDefinition::Cairo { + block_number: block, + definition: x.clone(), + }) + .chain( + b.sierra_defs + .iter() + .map(|(_, x, _)| ClassDefinition::Sierra { + block_number: block, + sierra_definition: x.clone(), + }), + ) + .collect::>(); + + Ok(Some((PeerId::random(), defs))) + } + + async fn events_for_block( + self, + block: BlockNumber, + ) -> Option<(PeerId, impl Stream + Send)> { + let e = self + .blocks + .iter() + .find(|b| b.header.header.number == block) + .unwrap() + .transaction_data + .iter() + .flat_map(|(t, _, e)| e.iter().map(move |e| (t.hash, e.clone()))) + .collect::>(); + + Some((PeerId::random(), stream::iter(e))) + } + } + + #[derive(Clone)] + struct FakeFgw; + + impl GatewayApi for FakeFgw {} + + #[tokio::test] + async fn happy_path() { + let blocks = fake::init::with_n_blocks(1); + let p2p: FakeP2PClient = FakeP2PClient { blocks }; + + let s = Sync { + latest: futures::stream::iter(vec![(BlockNumber::GENESIS, BlockHash::default())]), + p2p, + storage: StorageBuilder::in_memory().unwrap(), + chain: Chain::SepoliaTestnet, + chain_id: ChainId::SEPOLIA_TESTNET, + public_key: PublicKey::default(), + }; + + s.run(BlockNumber::GENESIS, BlockHash::default(), FakeFgw) + .await + .unwrap(); + } +} From f57c4effd25fd4ef5f1efb50cf8e04f5d4e2495b Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 2 Jul 2024 17:20:35 +0200 Subject: [PATCH 08/18] test: fix state diff length computation in fixture --- crates/storage/src/fake.rs | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/crates/storage/src/fake.rs b/crates/storage/src/fake.rs index 1884500874..872b9613b2 100644 --- a/crates/storage/src/fake.rs +++ b/crates/storage/src/fake.rs @@ -401,38 +401,7 @@ pub mod init { ) })); - *state_diff_length += u64::try_from( - state_update.contract_updates.iter().fold( - state_update - .system_contract_updates - .iter() - .fold(0, |acc, (_, u)| acc + u.storage.len()), - |acc, (_, u)| acc + u.storage.len(), - ), - ) - .expect("ptr size is 64 bits"); - *state_diff_length += u64::try_from( - state_update - .contract_updates - .iter() - .filter(|(_, u)| u.nonce.is_some()) - .count(), - ) - .expect("ptr size is 64 bits"); - *state_diff_length = u64::try_from( - state_update.declared_cairo_classes.len() - + state_update.declared_sierra_classes.len(), - ) - .expect("ptr size is 64 bits"); - *state_diff_length = u64::try_from( - state_update - .contract_updates - .iter() - .filter(|(_, u)| u.class.is_some()) - .count(), - ) - .expect("ptr size is 64 bits"); - + *state_diff_length = state_update.state_diff_length(); *state_diff_commitment = state_update.compute_state_diff_commitment(*starknet_version); } From c7895cb90838885449678647c81bca46c032a1fb Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 3 Jul 2024 13:43:28 +0200 Subject: [PATCH 09/18] chore: make block hash computation for >= 0.13.2 pub(crate) --- crates/pathfinder/src/state/block_hash.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/pathfinder/src/state/block_hash.rs b/crates/pathfinder/src/state/block_hash.rs index d7e7fa0a18..2c9ad0497a 100644 --- a/crates/pathfinder/src/state/block_hash.rs +++ b/crates/pathfinder/src/state/block_hash.rs @@ -388,8 +388,8 @@ fn compute_final_hash_pre_0_13_2(header: &BlockHeaderData) -> BlockHash { BlockHash(chain.finalize()) } -fn compute_final_hash(header: &BlockHeaderData) -> Result { - // Concatinate the transaction count, event count, state diff length, and L1 +pub(crate) fn compute_final_hash(header: &BlockHeaderData) -> Result { + // Concatenate the transaction count, event count, state diff length, and L1 // data availability mode into a single felt. let mut concat_counts = [0u8; 32]; let mut writer = concat_counts.as_mut_slice(); From 8b8f751f8b75a21812940ecbaae05349e3a1215b Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 3 Jul 2024 17:49:27 +0200 Subject: [PATCH 10/18] test: update fake storage fixture --- crates/common/src/class_definition.rs | 37 +++- crates/p2p/src/client/types.rs | 14 +- .../src/p2p_network/sync_handlers/tests.rs | 2 +- crates/pathfinder/src/state/block_hash.rs | 37 ++++ crates/pathfinder/src/sync/checkpoint.rs | 7 +- crates/pathfinder/src/sync/track.rs | 74 +++++--- crates/storage/src/fake.rs | 178 +++++++++++++++--- 7 files changed, 287 insertions(+), 62 deletions(-) diff --git a/crates/common/src/class_definition.rs b/crates/common/src/class_definition.rs index 1806655d03..860b346e7b 100644 --- a/crates/common/src/class_definition.rs +++ b/crates/common/src/class_definition.rs @@ -33,12 +33,16 @@ pub struct Sierra<'a> { } impl Dummy for Sierra<'_> { - fn dummy_with_rng(_: &T, rng: &mut R) -> Self { + fn dummy_with_rng(_: &T, _: &mut R) -> Self { Self { - abi: Cow::Owned(Faker.fake_with_rng(rng)), - sierra_program: Faker.fake_with_rng(rng), + abi: "[]".into(), + sierra_program: vec![], contract_class_version: "0.1.0".into(), - entry_points_by_type: Faker.fake_with_rng(rng), + entry_points_by_type: SierraEntryPoints { + external: vec![], + l1_handler: vec![], + constructor: vec![], + }, } } } @@ -58,11 +62,28 @@ pub struct Cairo<'a> { impl Dummy for Cairo<'_> { fn dummy_with_rng(_: &T, rng: &mut R) -> Self { - let abi = serde_json::Value::Object(Faker.fake_with_rng(rng)); - let program = serde_json::Value::Object(Faker.fake_with_rng(rng)); Self { - abi: Cow::Owned(serde_json::value::to_raw_value(&abi).unwrap()), - program: Cow::Owned(serde_json::value::to_raw_value(&program).unwrap()), + abi: Cow::Owned( + RawValue::from_string("[]".into()).unwrap(), + ), + program: Cow::Owned( + RawValue::from_string( + r#" + { + "attributes": [], + "builtins": [], + "data": [], + "debug_info": null, + "hints": {}, + "identifiers": {}, + "main_scope": "__main__", + "prime": "0x800000000000011000000000000000000000000000000000000000000000001", + "reference_manager": {} + } + "#.into() + ) + .unwrap(), + ), entry_points_by_type: Faker.fake_with_rng(rng), } } diff --git a/crates/p2p/src/client/types.rs b/crates/p2p/src/client/types.rs index b36cf7fb91..1b90d35882 100644 --- a/crates/p2p/src/client/types.rs +++ b/crates/p2p/src/client/types.rs @@ -123,10 +123,12 @@ pub struct SignedBlockHeader { pub state_diff_length: u64, } -impl From for SignedBlockHeader { - fn from(h: pathfinder_common::SignedBlockHeader) -> Self { +impl From<(pathfinder_common::SignedBlockHeader, ReceiptCommitment)> for SignedBlockHeader { + fn from( + (h, receipt_commitment): (pathfinder_common::SignedBlockHeader, ReceiptCommitment), + ) -> Self { Self { - header: h.header.into(), + header: (h.header, receipt_commitment).into(), signature: h.signature, state_diff_commitment: h.state_diff_commitment, state_diff_length: h.state_diff_length, @@ -134,8 +136,8 @@ impl From for SignedBlockHeader { } } -impl From for BlockHeader { - fn from(h: pathfinder_common::BlockHeader) -> Self { +impl From<(pathfinder_common::BlockHeader, ReceiptCommitment)> for BlockHeader { + fn from((h, receipt_commitment): (pathfinder_common::BlockHeader, ReceiptCommitment)) -> Self { Self { hash: h.hash, parent_hash: h.parent_hash, @@ -153,7 +155,7 @@ impl From for BlockHeader { transaction_count: h.transaction_count, event_count: h.event_count, l1_da_mode: h.l1_da_mode, - receipt_commitment: Default::default(), + receipt_commitment, } } } diff --git a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs index e200533e60..348583724a 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs @@ -182,7 +182,7 @@ mod prop { // Compute the overlapping set between the db and the request // These are the headers that we expect to be read from the db let expected = overlapping::get(in_db, start_block, limit, step, num_blocks, direction) - .into_iter().map(|Block { header, .. }| P2PSignedBlockHeader::from(header)).collect::>(); + .into_iter().map(|Block { header, receipt_commitment, .. }| P2PSignedBlockHeader::from((header, receipt_commitment))).collect::>(); // Run the handler let request = BlockHeadersRequest { iteration: Iteration { start: BlockNumberOrHash::Number(start_block), limit, step, direction, } }; let mut responses = Runtime::new().unwrap().block_on(async { diff --git a/crates/pathfinder/src/state/block_hash.rs b/crates/pathfinder/src/state/block_hash.rs index 2c9ad0497a..0eaa924a5c 100644 --- a/crates/pathfinder/src/state/block_hash.rs +++ b/crates/pathfinder/src/state/block_hash.rs @@ -18,6 +18,7 @@ use pathfinder_common::{ L1DataAvailabilityMode, ReceiptCommitment, SequencerAddress, + SignedBlockHeader, StarknetVersion, StateCommitment, StateDiffCommitment, @@ -204,6 +205,42 @@ impl BlockHeaderData { l1_da_mode: block.l1_da_mode.into(), }) } + + pub fn from_signed_header( + sbh: &SignedBlockHeader, + receipt_commitment: ReceiptCommitment, + ) -> Self { + Self { + hash: sbh.header.hash, + parent_hash: sbh.header.parent_hash, + number: sbh.header.number, + timestamp: sbh.header.timestamp, + sequencer_address: sbh.header.sequencer_address, + state_commitment: sbh.header.state_commitment, + transaction_commitment: sbh.header.transaction_commitment, + transaction_count: sbh + .header + .transaction_count + .try_into() + .expect("ptr size is 64 bits"), + event_commitment: sbh.header.event_commitment, + event_count: sbh + .header + .event_count + .try_into() + .expect("ptr size is 64 bits"), + state_diff_commitment: sbh.state_diff_commitment, + state_diff_length: sbh.state_diff_length, + starknet_version: sbh.header.starknet_version, + starknet_version_str: sbh.header.starknet_version.to_string(), + eth_l1_gas_price: sbh.header.eth_l1_gas_price, + strk_l1_gas_price: sbh.header.strk_l1_gas_price, + eth_l1_data_gas_price: sbh.header.eth_l1_data_gas_price, + strk_l1_data_gas_price: sbh.header.strk_l1_data_gas_price, + receipt_commitment, + l1_da_mode: sbh.header.l1_da_mode, + } + } } pub fn verify_block_hash( diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 330a8068c9..596363f261 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -658,6 +658,7 @@ mod tests { BlockHeader, BlockTimestamp, EventCommitment, + ReceiptCommitment, SequencerAddress, StarknetVersion, StateCommitment, @@ -782,7 +783,11 @@ mod tests { .unwrap() .unwrap(); P2PSignedBlockHeader { - header: db.block_header(block_id).unwrap().unwrap().into(), + header: ( + db.block_header(block_id).unwrap().unwrap(), + ReceiptCommitment::ZERO, + ) + .into(), signature: db.signature(block_id).unwrap().unwrap(), state_diff_commitment, state_diff_length: state_diff_length as u64, diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index c1f9cae0ff..3b9594ca3a 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -115,6 +115,7 @@ impl Sync { } .spawn() .pipe(events::VerifyCommitment, 10); + let state_diff = StateDiffSource { p2p: self.p2p.clone(), headers: state_diff, @@ -260,6 +261,18 @@ impl TransactionsFanout { tokio::spawn(async move { while let Some(transactions) = source.recv().await { + match transactions.as_ref() { + Ok(txns) => { + eprintln!( + "TransactionsFanout transactions = Ok(len:{})", + txns.data.len() + ); + } + Err(err) => { + eprintln!("TransactionsFanout transactions = Err({err:?})",); + } + } + let is_err = transactions.is_err(); if t_tx.send(transactions.clone()).await.is_err() || is_err { @@ -431,11 +444,14 @@ impl

EventSource

{ } = self; while let Some(header) = headers.next().await { + eprintln!("EventSource header.number = {}", header.number); + let (peer, mut events) = loop { if let Some(stream) = p2p.clone().events_for_block(header.number).await { break stream; } }; + let Some(block_transactions) = transactions.next().await else { let err = PeerData::new(peer, SyncError2::Other(anyhow!("No transactions").into())); @@ -726,28 +742,29 @@ impl ProcessStage for StoreBlock { #[cfg(test)] mod tests { - use std::iter; - use futures::{stream, Stream, StreamExt}; - use p2p::client::peer_agnostic::traits::{BlockClient, HeaderStream}; use p2p::client::types::{ ClassDefinition, ClassDefinitionsError, IncorrectStateDiffCount, - Receipt, - SignedBlockHeader, + Receipt as P2PReceipt, }; use p2p::libp2p::PeerId; use p2p::PeerData; - use pathfinder_common::event::Event; - use pathfinder_common::state_update::StateUpdateData; - use pathfinder_common::transaction::TransactionVariant; - use pathfinder_common::{BlockHash, BlockNumber, Chain, ChainId, PublicKey, TransactionHash}; + use pathfinder_common::{ReceiptCommitment, SignedBlockHeader}; + use pathfinder_storage::fake::init::Config; use pathfinder_storage::fake::{self, Block}; use pathfinder_storage::StorageBuilder; - use starknet_gateway_client::GatewayApi; - - use super::Sync; + use starknet_gateway_types::error::SequencerError; + + use super::*; + use crate::state::block_hash::{ + calculate_event_commitment, + calculate_receipt_commitment, + calculate_transaction_commitment, + compute_final_hash, + BlockHeaderData, + }; #[derive(Clone)] struct FakeP2PClient { @@ -760,15 +777,15 @@ mod tests { start: BlockNumber, stop: BlockNumber, reverse: bool, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { assert!(!reverse); assert_eq!(start, self.blocks.first().unwrap().header.header.number); assert_eq!(start, self.blocks.last().unwrap().header.header.number); stream::iter( - self.blocks - .into_iter() - .map(|block| PeerData::for_tests(block.header.into())), + self.blocks.into_iter().map(|block| { + PeerData::for_tests((block.header, block.receipt_commitment).into()) + }), ) } } @@ -779,7 +796,7 @@ mod tests { block: BlockNumber, ) -> Option<( PeerId, - impl Stream> + Send, + impl Stream> + Send, )> { let tr = self .blocks @@ -788,8 +805,8 @@ mod tests { .unwrap() .transaction_data .iter() - .map(|(t, r, e)| Ok((t.variant.clone(), Receipt::from(r.clone())))) - .collect::>>(); + .map(|(t, r, e)| Ok((t.variant.clone(), P2PReceipt::from(r.clone())))) + .collect::>>(); Some((PeerId::random(), stream::iter(tr))) } @@ -864,11 +881,26 @@ mod tests { #[derive(Clone)] struct FakeFgw; - impl GatewayApi for FakeFgw {} + #[async_trait::async_trait] + impl GatewayApi for FakeFgw { + async fn pending_casm_by_hash(&self, _: ClassHash) -> Result { + Ok(bytes::Bytes::from_static(b"I'm from the fgw!")) + } + } #[tokio::test] async fn happy_path() { - let blocks = fake::init::with_n_blocks(1); + let blocks = fake::init::with_n_blocks_and_config( + 1, + Config::new( + |sbh: &SignedBlockHeader, rc: ReceiptCommitment| { + compute_final_hash(&BlockHeaderData::from_signed_header(sbh, rc)) + }, + calculate_transaction_commitment, + calculate_receipt_commitment, + calculate_event_commitment, + ), + ); let p2p: FakeP2PClient = FakeP2PClient { blocks }; let s = Sync { diff --git a/crates/storage/src/fake.rs b/crates/storage/src/fake.rs index 872b9613b2..33a5b97b9c 100644 --- a/crates/storage/src/fake.rs +++ b/crates/storage/src/fake.rs @@ -2,7 +2,7 @@ use pathfinder_common::event::Event; use pathfinder_common::receipt::Receipt; use pathfinder_common::transaction::Transaction; -use pathfinder_common::{ClassHash, SierraHash, SignedBlockHeader, StateUpdate}; +use pathfinder_common::{ClassHash, ReceiptCommitment, SierraHash, SignedBlockHeader, StateUpdate}; use rand::Rng; use crate::Storage; @@ -14,6 +14,8 @@ pub struct Block { pub state_update: StateUpdate, pub cairo_defs: Vec<(ClassHash, Vec)>, // Cairo 0 definitions pub sierra_defs: Vec<(SierraHash, Vec, Vec)>, // Sierra + Casm definitions + // TODO merge into the header + pub receipt_commitment: ReceiptCommitment, } /// Initialize [`Storage`] with fake blocks and state updates @@ -36,6 +38,7 @@ pub fn fill(storage: &Storage, blocks: &[Block]) { state_update, cairo_defs, sierra_defs, + .. }| { tx.insert_block_header(&header.header).unwrap(); tx.insert_transaction_data( @@ -117,21 +120,74 @@ pub mod init { BlockNumber, ChainId, ContractAddress, + EventCommitment, + ReceiptCommitment, SignedBlockHeader, + StarknetVersion, StateCommitment, StateUpdate, + TransactionCommitment, + TransactionHash, TransactionIndex, }; use rand::Rng; use super::Block; + pub struct Config { + calculate_block_hash: + Box anyhow::Result>, + calculate_transaction_commitment: + Box anyhow::Result>, + calculate_receipt_commitment: Box anyhow::Result>, + calculate_event_commitment: Box< + dyn Fn( + &[(TransactionHash, &[Event])], + StarknetVersion, + ) -> anyhow::Result, + >, + } + + impl Config { + pub fn new( + calculate_block_hash: impl Fn(&SignedBlockHeader, ReceiptCommitment) -> anyhow::Result + + 'static, + calculate_transaction_commitment: impl Fn(&[Transaction], StarknetVersion) -> anyhow::Result + + 'static, + calculate_receipt_commitment: impl Fn(&[Receipt]) -> anyhow::Result + + 'static, + calculate_event_commitment: impl Fn( + &[(TransactionHash, &[Event])], + StarknetVersion, + ) -> anyhow::Result + + 'static, + ) -> Self { + Self { + calculate_block_hash: Box::new(calculate_block_hash), + calculate_transaction_commitment: Box::new(calculate_transaction_commitment), + calculate_receipt_commitment: Box::new(calculate_receipt_commitment), + calculate_event_commitment: Box::new(calculate_event_commitment), + } + } + } + + impl Default for Config { + fn default() -> Self { + Self { + calculate_block_hash: Box::new(|_, _| Ok(Faker.fake())), + calculate_transaction_commitment: Box::new(|_, _| Ok(Faker.fake())), + calculate_receipt_commitment: Box::new(|_| Ok(Faker.fake())), + calculate_event_commitment: Box::new(|_, _| Ok(Faker.fake())), + } + } + } + /// Create fake blocks and state updates with __limited consistency /// guarantees__: /// - block headers: /// - consecutive numbering starting from genesis (`0`) up to `n-1` /// - parent hash wrt previous block, parent hash of the genesis block - /// is `0s` + /// is `0` /// - state commitment is a hash of storage and class commitments /// - block bodies: /// - transaction indices within a block @@ -139,8 +195,10 @@ pub mod init { /// - at least 1 transaction with receipt per block /// - state updates: /// - block hashes + /// - parent state commitment wrt previous state update, parent state + /// commitment of the genesis state update is `0` /// - old roots wrt previous state update, old root of the genesis state - /// update is `0s` + /// update is `0` /// - replaced classes for block N point to some deployed contracts from /// block N-1 /// - each storage diff has its respective nonce update @@ -148,6 +206,7 @@ pub mod init { /// - deployed Cairo0 contracts are treated as implicit declarations and /// are added to declared cairo classes` /// - declared cairo|sierra definitions + /// /// - class definition is a serialized to JSON representation of /// `class_definition::Cairo|Sierra` respectively with random fields /// - all those definitions are **very short and fall far below the soft @@ -161,12 +220,29 @@ pub mod init { with_n_blocks_and_rng(n, &mut rng) } + /// Same as [`with_n_blocks`] except caller can specify additional + /// configuration + pub fn with_n_blocks_and_config(n: usize, config: Config) -> Vec { + let mut rng = rand::thread_rng(); + with_n_blocks_rng_and_config(n, &mut rng, config) + } + /// Same as [`with_n_blocks`] except caller can specify the rng used pub fn with_n_blocks_and_rng(n: usize, rng: &mut R) -> Vec { + with_n_blocks_rng_and_config(n, rng, Default::default()) + } + + /// Same as [`with_n_blocks`] except caller can specify the rng used + pub fn with_n_blocks_rng_and_config( + n: usize, + rng: &mut R, + config: Config, + ) -> Vec { let mut init = Vec::with_capacity(n); for i in 0..n { let mut header: BlockHeader = Faker.fake_with_rng(rng); + header.starknet_version = StarknetVersion::new(0, 13, 2, 0); header.number = BlockNumber::new_or_panic(i.try_into().expect("u64 is at least as wide as usize")); header.state_commitment = @@ -197,13 +273,38 @@ pub mod init { }) .collect::>(); + header.transaction_commitment = (config.calculate_transaction_commitment)( + &transaction_data + .iter() + .map(|(t, ..)| t.clone()) + .collect::>(), + header.starknet_version, + ) + .unwrap(); + + header.event_commitment = (config.calculate_event_commitment)( + &transaction_data + .iter() + .map(|(t, _, e)| (t.hash, e.as_slice())) + .collect::>(), + header.starknet_version, + ) + .unwrap(); + + let receipt_commitment = (config.calculate_receipt_commitment)( + &transaction_data + .iter() + .map(|(_, r, ..)| r.clone()) + .collect::>(), + ) + .unwrap(); + header.transaction_count = transaction_data.len(); header.event_count = transaction_data .iter() .map(|(_, _, events)| events.len()) .sum(); - let block_hash = header.hash; let state_commitment = header.state_commitment; let declared_cairo_classes = Faker.fake_with_rng::, _>(rng); let declared_sierra_classes = Faker.fake_with_rng::, _>(rng); @@ -213,10 +314,8 @@ pub mod init { .map(|&class_hash| { ( class_hash, - serde_json::to_vec( - &Faker.fake_with_rng::, _>(rng), - ) - .unwrap(), + serde_json::to_vec(&Faker.fake_with_rng::(rng)) + .unwrap(), ) }) .collect::>(); @@ -226,7 +325,7 @@ pub mod init { ( sierra_hash, serde_json::to_vec( - &Faker.fake_with_rng::, _>(rng), + &Faker.fake_with_rng::(rng), ) .unwrap(), Faker.fake_with_rng::(rng).into_bytes(), @@ -242,9 +341,10 @@ pub mod init { }, transaction_data, state_update: StateUpdate { - block_hash, + // Will be fixed after block hash computation + block_hash: BlockHash::ZERO, state_commitment, - // Will be fixed in the next loop + // Will be fixed after block hash computation parent_state_commitment: StateCommitment::ZERO, declared_cairo_classes, declared_sierra_classes, @@ -272,28 +372,26 @@ pub mod init { }, cairo_defs, sierra_defs, + receipt_commitment, }); } - // - // "Fix" block headers and state updates - // + // Calculate state commitments and randomly choose which contract updates should + // be "replace" instead of "deploy" if !init.is_empty() { let Block { header, state_update, .. } = init.get_mut(0).unwrap(); - header.header.parent_hash = BlockHash::ZERO; header.header.state_commitment = StateCommitment::calculate( header.header.storage_commitment, header.header.class_commitment, ); - state_update.block_hash = header.header.hash; state_update.parent_state_commitment = StateCommitment::ZERO; for i in 1..n { - let (parent_hash, parent_state_commitment, deployed_in_parent) = init + let (parent_state_commitment, deployed_in_parent) = init .get(i - 1) .map( |Block { @@ -302,7 +400,6 @@ pub mod init { .. }| { ( - header.header.hash, header.header.state_commitment, state_update .contract_updates @@ -324,12 +421,10 @@ pub mod init { .. } = init.get_mut(i).unwrap(); - header.header.parent_hash = parent_hash; header.header.state_commitment = StateCommitment::calculate( header.header.storage_commitment, header.header.class_commitment, ); - state_update.block_hash = header.header.hash; // // Fix state updates @@ -358,7 +453,8 @@ pub mod init { } } - // Update counts + // Compute state diff length and commitment + // Generate definitions for the implicitly declared classes for Block { header: SignedBlockHeader { @@ -394,10 +490,8 @@ pub mod init { cairo_defs.extend(implicitly_declared.map(|class_hash| { ( class_hash, - serde_json::to_vec( - &Faker.fake_with_rng::, _>(rng), - ) - .unwrap(), + serde_json::to_vec(&Faker.fake_with_rng::(rng)) + .unwrap(), ) })); @@ -405,6 +499,40 @@ pub mod init { *state_diff_commitment = state_update.compute_state_diff_commitment(*starknet_version); } + + // Compute the block hash, update parent block hash with the correct value + let Block { + header, + state_update, + receipt_commitment, + .. + } = init.get_mut(0).unwrap(); + header.header.parent_hash = BlockHash::ZERO; + + header.header.hash = + (config.calculate_block_hash)(&header, *receipt_commitment).unwrap(); + + state_update.block_hash = header.header.hash; + + for i in 1..n { + let parent_hash = init + .get(i - 1) + .map(|Block { header, .. }| header.header.hash) + .unwrap(); + let Block { + header, + state_update, + receipt_commitment, + .. + } = init.get_mut(i).unwrap(); + + header.header.parent_hash = parent_hash; + + header.header.hash = + (config.calculate_block_hash)(&header, *receipt_commitment).unwrap(); + + state_update.block_hash = header.header.hash; + } } init From 47ff33e6eac0244a33969219a9079406606890d2 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Thu, 4 Jul 2024 13:40:23 +0200 Subject: [PATCH 11/18] test: compute class hashes in the storage fixture --- crates/common/src/class_definition.rs | 8 +- .../pathfinder/src/sync/class_definitions.rs | 1 + crates/pathfinder/src/sync/track.rs | 15 +-- crates/storage/src/fake.rs | 92 +++++++++++-------- 4 files changed, 59 insertions(+), 57 deletions(-) diff --git a/crates/common/src/class_definition.rs b/crates/common/src/class_definition.rs index 860b346e7b..15cbb0d8b6 100644 --- a/crates/common/src/class_definition.rs +++ b/crates/common/src/class_definition.rs @@ -33,16 +33,12 @@ pub struct Sierra<'a> { } impl Dummy for Sierra<'_> { - fn dummy_with_rng(_: &T, _: &mut R) -> Self { + fn dummy_with_rng(_: &T, rng: &mut R) -> Self { Self { abi: "[]".into(), sierra_program: vec![], contract_class_version: "0.1.0".into(), - entry_points_by_type: SierraEntryPoints { - external: vec![], - l1_handler: vec![], - constructor: vec![], - }, + entry_points_by_type: Faker.fake_with_rng(rng), } } } diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index 4d6ecfd7fa..e9785aa988 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -455,6 +455,7 @@ impl ProcessStage for VerifyClassHashes { .tokio_handle .block_on(self.declarations.next()) .context("Getting declared classes")?; + for class in input.iter() { match class.definition { CompiledClassDefinition::Cairo(_) => { diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 3b9594ca3a..52889f74d4 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -261,18 +261,6 @@ impl TransactionsFanout { tokio::spawn(async move { while let Some(transactions) = source.recv().await { - match transactions.as_ref() { - Ok(txns) => { - eprintln!( - "TransactionsFanout transactions = Ok(len:{})", - txns.data.len() - ); - } - Err(err) => { - eprintln!("TransactionsFanout transactions = Err({err:?})",); - } - } - let is_err = transactions.is_err(); if t_tx.send(transactions.clone()).await.is_err() || is_err { @@ -444,8 +432,6 @@ impl

EventSource

{ } = self; while let Some(header) = headers.next().await { - eprintln!("EventSource header.number = {}", header.number); - let (peer, mut events) = loop { if let Some(stream) = p2p.clone().events_for_block(header.number).await { break stream; @@ -901,6 +887,7 @@ mod tests { calculate_event_commitment, ), ); + let p2p: FakeP2PClient = FakeP2PClient { blocks }; let s = Sync { diff --git a/crates/storage/src/fake.rs b/crates/storage/src/fake.rs index 33a5b97b9c..c84fbd0a5a 100644 --- a/crates/storage/src/fake.rs +++ b/crates/storage/src/fake.rs @@ -122,6 +122,7 @@ pub mod init { ContractAddress, EventCommitment, ReceiptCommitment, + SierraHash, SignedBlockHeader, StarknetVersion, StateCommitment, @@ -131,6 +132,7 @@ pub mod init { TransactionIndex, }; use rand::Rng; + use starknet_gateway_types::class_hash::compute_class_hash; use super::Block; @@ -184,6 +186,7 @@ pub mod init { /// Create fake blocks and state updates with __limited consistency /// guarantees__: + /// - starknet version: 0.13.2 /// - block headers: /// - consecutive numbering starting from genesis (`0`) up to `n-1` /// - parent hash wrt previous block, parent hash of the genesis block @@ -206,12 +209,13 @@ pub mod init { /// - deployed Cairo0 contracts are treated as implicit declarations and /// are added to declared cairo classes` /// - declared cairo|sierra definitions - /// /// - class definition is a serialized to JSON representation of /// `class_definition::Cairo|Sierra` respectively with random fields /// - all those definitions are **very short and fall far below the soft /// limit in protobuf encoding /// - casm definitions for sierra classes are purely random Strings + /// - cairo class hashes and sierra class hashes are correctly + /// calculated from the definitions, casm hashes are random /// - transactions /// - transaction hashes are calculated from their respective variant, /// with ChainId set to `SEPOLIA_TESTNET` @@ -232,7 +236,8 @@ pub mod init { with_n_blocks_rng_and_config(n, rng, Default::default()) } - /// Same as [`with_n_blocks`] except caller can specify the rng used + /// Same as [`with_n_blocks`] except caller can specify the rng used and + /// additional configuration pub fn with_n_blocks_rng_and_config( n: usize, rng: &mut R, @@ -306,33 +311,42 @@ pub mod init { .sum(); let state_commitment = header.state_commitment; - let declared_cairo_classes = Faker.fake_with_rng::, _>(rng); - let declared_sierra_classes = Faker.fake_with_rng::, _>(rng); + let num_cairo_classes = rng.gen_range(0..=0); + let num_sierra_classes = rng.gen_range(0..=10); - let cairo_defs = declared_cairo_classes - .iter() - .map(|&class_hash| { - ( - class_hash, + let cairo_defs = (0..num_cairo_classes) + .into_iter() + .map(|_| { + let def = serde_json::to_vec(&Faker.fake_with_rng::(rng)) - .unwrap(), - ) + .unwrap(); + (compute_class_hash(&def).unwrap().hash(), def) }) .collect::>(); - let sierra_defs = declared_sierra_classes - .iter() - .map(|(&sierra_hash, _)| { + let sierra_defs = (0..num_sierra_classes) + .into_iter() + .map(|_| { + let def = serde_json::to_vec( + &Faker.fake_with_rng::(rng), + ) + .unwrap(); ( - sierra_hash, - serde_json::to_vec( - &Faker.fake_with_rng::(rng), - ) - .unwrap(), + SierraHash(compute_class_hash(&def).unwrap().hash().0), + def, Faker.fake_with_rng::(rng).into_bytes(), ) }) .collect::>(); + let declared_cairo_classes = cairo_defs + .iter() + .map(|(class_hash, _)| *class_hash) + .collect::>(); + let declared_sierra_classes = sierra_defs + .iter() + .map(|(sierra_hash, _, _)| (*sierra_hash, Faker.fake())) + .collect::>(); + init.push(Block { header: SignedBlockHeader { header, @@ -475,25 +489,29 @@ pub mod init { // added to `declared_cairo_classes` because Cairo0 Deploys // were not initially preceded by an explicit declare // transaction - let implicitly_declared = - state_update - .contract_updates + let implicitly_declared = state_update + .contract_updates + .iter_mut() + .filter_map(|(_, update)| match &mut update.class { + Some(ContractClassUpdate::Deploy(class_hash)) => { + let def = serde_json::to_vec( + &Faker.fake_with_rng::(rng), + ) + .unwrap(); + let new_hash = compute_class_hash(&def).unwrap().hash(); + *class_hash = new_hash; + Some((new_hash, def)) + } + Some(ContractClassUpdate::Replace(_)) | None => None, + }) + .collect::>(); + + state_update.declared_cairo_classes.extend( + implicitly_declared .iter() - .filter_map(|(_, update)| match update.class { - Some(ContractClassUpdate::Deploy(class_hash)) => Some(class_hash), - Some(ContractClassUpdate::Replace(_)) | None => None, - }); - - state_update - .declared_cairo_classes - .extend(implicitly_declared.clone()); - cairo_defs.extend(implicitly_declared.map(|class_hash| { - ( - class_hash, - serde_json::to_vec(&Faker.fake_with_rng::(rng)) - .unwrap(), - ) - })); + .map(|(class_hash, _)| *class_hash), + ); + cairo_defs.extend(implicitly_declared); *state_diff_length = state_update.state_diff_length(); *state_diff_commitment = From 341a19d4dcf5c2f8ee613d1f3dc62810ce452f89 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Thu, 4 Jul 2024 15:14:34 +0200 Subject: [PATCH 12/18] test: fix assert in fixture --- crates/pathfinder/src/sync/track.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 52889f74d4..0ef6fd4919 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -737,7 +737,7 @@ mod tests { }; use p2p::libp2p::PeerId; use p2p::PeerData; - use pathfinder_common::{ReceiptCommitment, SignedBlockHeader}; + use pathfinder_common::{BlockHeader, ReceiptCommitment, SignedBlockHeader}; use pathfinder_storage::fake::init::Config; use pathfinder_storage::fake::{self, Block}; use pathfinder_storage::StorageBuilder; @@ -766,7 +766,7 @@ mod tests { ) -> impl Stream> + Send { assert!(!reverse); assert_eq!(start, self.blocks.first().unwrap().header.header.number); - assert_eq!(start, self.blocks.last().unwrap().header.header.number); + assert_eq!(stop, self.blocks.last().unwrap().header.header.number); stream::iter( self.blocks.into_iter().map(|block| { @@ -877,7 +877,7 @@ mod tests { #[tokio::test] async fn happy_path() { let blocks = fake::init::with_n_blocks_and_config( - 1, + 10, Config::new( |sbh: &SignedBlockHeader, rc: ReceiptCommitment| { compute_final_hash(&BlockHeaderData::from_signed_header(sbh, rc)) @@ -888,10 +888,13 @@ mod tests { ), ); + let BlockHeader { hash, number, .. } = blocks.last().unwrap().header.header; + let latest = (number, hash); + let p2p: FakeP2PClient = FakeP2PClient { blocks }; let s = Sync { - latest: futures::stream::iter(vec![(BlockNumber::GENESIS, BlockHash::default())]), + latest: futures::stream::iter(vec![latest]), p2p, storage: StorageBuilder::in_memory().unwrap(), chain: Chain::SepoliaTestnet, From 1b86833c37925bdc07d0822293fca04989c765e5 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Thu, 4 Jul 2024 16:07:08 +0200 Subject: [PATCH 13/18] feat(track): store block --- crates/pathfinder/src/sync/headers.rs | 4 +- crates/pathfinder/src/sync/track.rs | 91 ++++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 4 deletions(-) diff --git a/crates/pathfinder/src/sync/headers.rs b/crates/pathfinder/src/sync/headers.rs index 6dcfd0ac26..356993bb71 100644 --- a/crates/pathfinder/src/sync/headers.rs +++ b/crates/pathfinder/src/sync/headers.rs @@ -323,10 +323,10 @@ impl ProcessStage for Persist { strk_l1_data_gas_price: header.strk_l1_data_gas_price, sequencer_address: header.sequencer_address, starknet_version: header.starknet_version, - class_commitment: ClassCommitment::ZERO, + class_commitment: ClassCommitment::ZERO, // TODO update class tries event_commitment: header.event_commitment, state_commitment: header.state_commitment, - storage_commitment: StorageCommitment::ZERO, + storage_commitment: StorageCommitment::ZERO, // TODO update storage tries transaction_commitment: header.transaction_commitment, transaction_count: header.transaction_count, event_count: header.event_count, diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 0ef6fd4919..b964d83846 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -23,15 +23,21 @@ use pathfinder_common::state_update::{DeclaredClasses, StateUpdateData}; use pathfinder_common::transaction::{Transaction, TransactionVariant}; use pathfinder_common::{ BlockHash, + BlockHeader, BlockNumber, Chain, ChainId, + ClassCommitment, ClassHash, EventCommitment, PublicKey, + ReceiptCommitment, + SierraHash, + SignedBlockHeader, StarknetVersion, StateDiffCommitment, StateUpdate, + StorageCommitment, TransactionCommitment, TransactionHash, }; @@ -706,7 +712,13 @@ impl ProcessStage for StoreBlock { fn map(&mut self, input: Self::Input) -> Result { let BlockData { - header, + header: + P2PSignedBlockHeader { + header, + signature, + state_diff_commitment, + state_diff_length, + }, events, state_diff, transactions, @@ -718,7 +730,82 @@ impl ProcessStage for StoreBlock { .transaction() .context("Creating database connection")?; - // TODO: write all the data to storage + let block_number = header.number; + + let header = BlockHeader { + hash: header.hash, + parent_hash: header.parent_hash, + number: header.number, + timestamp: header.timestamp, + eth_l1_gas_price: header.eth_l1_gas_price, + strk_l1_gas_price: header.strk_l1_gas_price, + eth_l1_data_gas_price: header.eth_l1_data_gas_price, + strk_l1_data_gas_price: header.strk_l1_data_gas_price, + sequencer_address: header.sequencer_address, + starknet_version: header.starknet_version, + class_commitment: ClassCommitment::ZERO, // TODO update class tries + event_commitment: header.event_commitment, + state_commitment: header.state_commitment, + storage_commitment: StorageCommitment::ZERO, // TODO update storage tries + transaction_commitment: header.transaction_commitment, + transaction_count: header.transaction_count, + event_count: header.event_count, + l1_da_mode: header.l1_da_mode, + // TODO receipt_commitment + }; + + db.insert_block_header(&header) + .context("Inserting block header")?; + + db.insert_signature(block_number, &signature) + .context("Inserting signature")?; + + db.update_state_diff_commitment_and_length( + block_number, + state_diff_commitment, + state_diff_length, + ) + .context("Updating state diff commitment and length")?; + + let events = events.into_iter().map(|(_, e)| e).collect::>(); + + db.insert_transaction_data(block_number, &transactions, Some(&events)) + .context("Inserting transaction data")?; + + db.insert_state_update_data(block_number, &state_diff) + .context("Inserting state update data")?; + + classes.into_iter().try_for_each( + |CompiledClass { + block_number, + hash, + definition, + }| { + match definition { + class_definitions::CompiledClassDefinition::Cairo(cairo) => db + .insert_cairo_class(hash, &cairo) + .context("Inserting cairo class definition"), + class_definitions::CompiledClassDefinition::Sierra { + sierra_definition, + casm_definition, + } => { + let sierra_hash = SierraHash(hash.0); + let casm_hash = db + .casm_hash(hash) + .context("Getting casm hash")? + .context("Casm not found")?; + + db.insert_sierra_class( + &sierra_hash, + &sierra_definition, + &casm_hash, + &casm_definition, + ) + .context("Inserting sierra class definition") + } + } + }, + )?; db.commit() .context("Committing transaction") From dbce25c043fe798f33248300c76f85d3ea4ffe3d Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Thu, 4 Jul 2024 17:08:43 +0200 Subject: [PATCH 14/18] test(track): add final check --- crates/pathfinder/src/sync/track.rs | 88 ++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index b964d83846..ce1e71d9db 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -719,7 +719,7 @@ impl ProcessStage for StoreBlock { state_diff_commitment, state_diff_length, }, - events, + mut events, state_diff, transactions, classes, @@ -767,9 +767,13 @@ impl ProcessStage for StoreBlock { ) .context("Updating state diff commitment and length")?; - let events = events.into_iter().map(|(_, e)| e).collect::>(); + let mut ordered_events = Vec::new(); + transactions.iter().for_each(|(t, _)| { + // Some transactions can emit no events, in that case we insert an empty vector. + ordered_events.push(events.remove(&t.hash).unwrap_or_default()); + }); - db.insert_transaction_data(block_number, &transactions, Some(&events)) + db.insert_transaction_data(block_number, &transactions, Some(&ordered_events)) .context("Inserting transaction data")?; db.insert_state_update_data(block_number, &state_diff) @@ -824,6 +828,7 @@ mod tests { }; use p2p::libp2p::PeerId; use p2p::PeerData; + use p2p_proto::common::Hash; use pathfinder_common::{BlockHeader, ReceiptCommitment, SignedBlockHeader}; use pathfinder_storage::fake::init::Config; use pathfinder_storage::fake::{self, Block}; @@ -963,8 +968,9 @@ mod tests { #[tokio::test] async fn happy_path() { + const N: usize = 10; let blocks = fake::init::with_n_blocks_and_config( - 10, + N, Config::new( |sbh: &SignedBlockHeader, rc: ReceiptCommitment| { compute_final_hash(&BlockHeaderData::from_signed_header(sbh, rc)) @@ -978,19 +984,85 @@ mod tests { let BlockHeader { hash, number, .. } = blocks.last().unwrap().header.header; let latest = (number, hash); - let p2p: FakeP2PClient = FakeP2PClient { blocks }; + let p2p: FakeP2PClient = FakeP2PClient { + blocks: blocks.clone(), + }; - let s = Sync { + let storage = StorageBuilder::in_memory().unwrap(); + + let sync = Sync { latest: futures::stream::iter(vec![latest]), p2p, - storage: StorageBuilder::in_memory().unwrap(), + storage: storage.clone(), chain: Chain::SepoliaTestnet, chain_id: ChainId::SEPOLIA_TESTNET, public_key: PublicKey::default(), }; - s.run(BlockNumber::GENESIS, BlockHash::default(), FakeFgw) + sync.run(BlockNumber::GENESIS, BlockHash::default(), FakeFgw) .await .unwrap(); + + let mut db = storage.connection().unwrap(); + let db = db.transaction().unwrap(); + for mut block in blocks { + // TODO p2p sync does not update class and storage tries yet + block.header.header.class_commitment = ClassCommitment::ZERO; + block.header.header.storage_commitment = StorageCommitment::ZERO; + + let block_number = block.header.header.number; + let block_id = block_number.into(); + let header = db.block_header(block_id).unwrap().unwrap(); + let signature = db.signature(block_id).unwrap().unwrap(); + let (state_diff_commitment, state_diff_length) = db + .state_diff_commitment_and_length(block_number) + .unwrap() + .unwrap(); + let transaction_data = db.transaction_data_for_block(block_id).unwrap().unwrap(); + let state_update_data: StateUpdateData = + db.state_update(block_id).unwrap().unwrap().into(); + let declared = db.declared_classes_at(block_id).unwrap().unwrap(); + + let mut cairo_defs = HashMap::new(); + let mut sierra_defs = HashMap::new(); + + for class_hash in declared { + let class = db.class_definition(class_hash).unwrap().unwrap(); + match db.casm_hash(class_hash).unwrap() { + Some(casm_hash) => { + let casm = db.casm_definition(class_hash).unwrap().unwrap(); + sierra_defs.insert(SierraHash(class_hash.0), (class, casm)); + } + None => { + cairo_defs.insert(class_hash, class); + } + } + } + + pretty_assertions_sorted::assert_eq!(header, block.header.header); + pretty_assertions_sorted::assert_eq!(signature, block.header.signature); + pretty_assertions_sorted::assert_eq!( + state_diff_commitment, + block.header.state_diff_commitment + ); + pretty_assertions_sorted::assert_eq!( + state_diff_length as u64, + block.header.state_diff_length + ); + pretty_assertions_sorted::assert_eq!(transaction_data, block.transaction_data); + pretty_assertions_sorted::assert_eq!(state_update_data, block.state_update.into()); + pretty_assertions_sorted::assert_eq!( + cairo_defs, + block.cairo_defs.into_iter().collect::>() + ); + pretty_assertions_sorted::assert_eq!( + sierra_defs, + block + .sierra_defs + .into_iter() + .map(|(h, s, c)| (h, (s, c))) + .collect::>() + ); + } } } From 223e016925aad69010dc463c1845900f79e319fa Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 5 Jul 2024 12:12:19 +0200 Subject: [PATCH 15/18] chore: clippy --- crates/storage/src/fake.rs | 41 +++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/crates/storage/src/fake.rs b/crates/storage/src/fake.rs index c84fbd0a5a..f2b21353b5 100644 --- a/crates/storage/src/fake.rs +++ b/crates/storage/src/fake.rs @@ -136,18 +136,20 @@ pub mod init { use super::Block; + pub type BlockHashFn = + Box anyhow::Result>; + pub type TransactionCommitmentFn = + Box anyhow::Result>; + pub type ReceiptCommitmentFn = Box anyhow::Result>; + pub type EventCommitmentFn = Box< + dyn Fn(&[(TransactionHash, &[Event])], StarknetVersion) -> anyhow::Result, + >; + pub struct Config { - calculate_block_hash: - Box anyhow::Result>, - calculate_transaction_commitment: - Box anyhow::Result>, - calculate_receipt_commitment: Box anyhow::Result>, - calculate_event_commitment: Box< - dyn Fn( - &[(TransactionHash, &[Event])], - StarknetVersion, - ) -> anyhow::Result, - >, + calculate_block_hash: BlockHashFn, + calculate_transaction_commitment: TransactionCommitmentFn, + calculate_receipt_commitment: ReceiptCommitmentFn, + calculate_event_commitment: EventCommitmentFn, } impl Config { @@ -315,19 +317,18 @@ pub mod init { let num_sierra_classes = rng.gen_range(0..=10); let cairo_defs = (0..num_cairo_classes) - .into_iter() .map(|_| { - let def = - serde_json::to_vec(&Faker.fake_with_rng::(rng)) - .unwrap(); + let def = serde_json::to_vec( + &Faker.fake_with_rng::, _>(rng), + ) + .unwrap(); (compute_class_hash(&def).unwrap().hash(), def) }) .collect::>(); let sierra_defs = (0..num_sierra_classes) - .into_iter() .map(|_| { let def = serde_json::to_vec( - &Faker.fake_with_rng::(rng), + &Faker.fake_with_rng::, _>(rng), ) .unwrap(); ( @@ -495,7 +496,7 @@ pub mod init { .filter_map(|(_, update)| match &mut update.class { Some(ContractClassUpdate::Deploy(class_hash)) => { let def = serde_json::to_vec( - &Faker.fake_with_rng::(rng), + &Faker.fake_with_rng::, _>(rng), ) .unwrap(); let new_hash = compute_class_hash(&def).unwrap().hash(); @@ -528,7 +529,7 @@ pub mod init { header.header.parent_hash = BlockHash::ZERO; header.header.hash = - (config.calculate_block_hash)(&header, *receipt_commitment).unwrap(); + (config.calculate_block_hash)(header, *receipt_commitment).unwrap(); state_update.block_hash = header.header.hash; @@ -547,7 +548,7 @@ pub mod init { header.header.parent_hash = parent_hash; header.header.hash = - (config.calculate_block_hash)(&header, *receipt_commitment).unwrap(); + (config.calculate_block_hash)(header, *receipt_commitment).unwrap(); state_update.block_hash = header.header.hash; } From 48ef406ab3c0437d124f33d3f8e7d8ef8eba6976 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 5 Jul 2024 12:13:01 +0200 Subject: [PATCH 16/18] test(track): fixtures should be defined after the test --- crates/pathfinder/src/sync/track.rs | 200 ++++++++++++++-------------- 1 file changed, 100 insertions(+), 100 deletions(-) diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index ce1e71d9db..6ed8674bc4 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -844,6 +844,106 @@ mod tests { BlockHeaderData, }; + #[tokio::test] + async fn happy_path() { + const N: usize = 10; + let blocks = fake::init::with_n_blocks_and_config( + N, + Config::new( + |sbh: &SignedBlockHeader, rc: ReceiptCommitment| { + compute_final_hash(&BlockHeaderData::from_signed_header(sbh, rc)) + }, + calculate_transaction_commitment, + calculate_receipt_commitment, + calculate_event_commitment, + ), + ); + + let BlockHeader { hash, number, .. } = blocks.last().unwrap().header.header; + let latest = (number, hash); + + let p2p: FakeP2PClient = FakeP2PClient { + blocks: blocks.clone(), + }; + + let storage = StorageBuilder::in_memory().unwrap(); + + let sync = Sync { + latest: futures::stream::iter(vec![latest]), + p2p, + storage: storage.clone(), + chain: Chain::SepoliaTestnet, + chain_id: ChainId::SEPOLIA_TESTNET, + public_key: PublicKey::default(), + }; + + sync.run(BlockNumber::GENESIS, BlockHash::default(), FakeFgw) + .await + .unwrap(); + + let mut db = storage.connection().unwrap(); + let db = db.transaction().unwrap(); + for mut block in blocks { + // TODO p2p sync does not update class and storage tries yet + block.header.header.class_commitment = ClassCommitment::ZERO; + block.header.header.storage_commitment = StorageCommitment::ZERO; + + let block_number = block.header.header.number; + let block_id = block_number.into(); + let header = db.block_header(block_id).unwrap().unwrap(); + let signature = db.signature(block_id).unwrap().unwrap(); + let (state_diff_commitment, state_diff_length) = db + .state_diff_commitment_and_length(block_number) + .unwrap() + .unwrap(); + let transaction_data = db.transaction_data_for_block(block_id).unwrap().unwrap(); + let state_update_data: StateUpdateData = + db.state_update(block_id).unwrap().unwrap().into(); + let declared = db.declared_classes_at(block_id).unwrap().unwrap(); + + let mut cairo_defs = HashMap::new(); + let mut sierra_defs = HashMap::new(); + + for class_hash in declared { + let class = db.class_definition(class_hash).unwrap().unwrap(); + match db.casm_hash(class_hash).unwrap() { + Some(casm_hash) => { + let casm = db.casm_definition(class_hash).unwrap().unwrap(); + sierra_defs.insert(SierraHash(class_hash.0), (class, casm)); + } + None => { + cairo_defs.insert(class_hash, class); + } + } + } + + pretty_assertions_sorted::assert_eq!(header, block.header.header); + pretty_assertions_sorted::assert_eq!(signature, block.header.signature); + pretty_assertions_sorted::assert_eq!( + state_diff_commitment, + block.header.state_diff_commitment + ); + pretty_assertions_sorted::assert_eq!( + state_diff_length as u64, + block.header.state_diff_length + ); + pretty_assertions_sorted::assert_eq!(transaction_data, block.transaction_data); + pretty_assertions_sorted::assert_eq!(state_update_data, block.state_update.into()); + pretty_assertions_sorted::assert_eq!( + cairo_defs, + block.cairo_defs.into_iter().collect::>() + ); + pretty_assertions_sorted::assert_eq!( + sierra_defs, + block + .sierra_defs + .into_iter() + .map(|(h, s, c)| (h, (s, c))) + .collect::>() + ); + } + } + #[derive(Clone)] struct FakeP2PClient { pub blocks: Vec, @@ -965,104 +1065,4 @@ mod tests { Ok(bytes::Bytes::from_static(b"I'm from the fgw!")) } } - - #[tokio::test] - async fn happy_path() { - const N: usize = 10; - let blocks = fake::init::with_n_blocks_and_config( - N, - Config::new( - |sbh: &SignedBlockHeader, rc: ReceiptCommitment| { - compute_final_hash(&BlockHeaderData::from_signed_header(sbh, rc)) - }, - calculate_transaction_commitment, - calculate_receipt_commitment, - calculate_event_commitment, - ), - ); - - let BlockHeader { hash, number, .. } = blocks.last().unwrap().header.header; - let latest = (number, hash); - - let p2p: FakeP2PClient = FakeP2PClient { - blocks: blocks.clone(), - }; - - let storage = StorageBuilder::in_memory().unwrap(); - - let sync = Sync { - latest: futures::stream::iter(vec![latest]), - p2p, - storage: storage.clone(), - chain: Chain::SepoliaTestnet, - chain_id: ChainId::SEPOLIA_TESTNET, - public_key: PublicKey::default(), - }; - - sync.run(BlockNumber::GENESIS, BlockHash::default(), FakeFgw) - .await - .unwrap(); - - let mut db = storage.connection().unwrap(); - let db = db.transaction().unwrap(); - for mut block in blocks { - // TODO p2p sync does not update class and storage tries yet - block.header.header.class_commitment = ClassCommitment::ZERO; - block.header.header.storage_commitment = StorageCommitment::ZERO; - - let block_number = block.header.header.number; - let block_id = block_number.into(); - let header = db.block_header(block_id).unwrap().unwrap(); - let signature = db.signature(block_id).unwrap().unwrap(); - let (state_diff_commitment, state_diff_length) = db - .state_diff_commitment_and_length(block_number) - .unwrap() - .unwrap(); - let transaction_data = db.transaction_data_for_block(block_id).unwrap().unwrap(); - let state_update_data: StateUpdateData = - db.state_update(block_id).unwrap().unwrap().into(); - let declared = db.declared_classes_at(block_id).unwrap().unwrap(); - - let mut cairo_defs = HashMap::new(); - let mut sierra_defs = HashMap::new(); - - for class_hash in declared { - let class = db.class_definition(class_hash).unwrap().unwrap(); - match db.casm_hash(class_hash).unwrap() { - Some(casm_hash) => { - let casm = db.casm_definition(class_hash).unwrap().unwrap(); - sierra_defs.insert(SierraHash(class_hash.0), (class, casm)); - } - None => { - cairo_defs.insert(class_hash, class); - } - } - } - - pretty_assertions_sorted::assert_eq!(header, block.header.header); - pretty_assertions_sorted::assert_eq!(signature, block.header.signature); - pretty_assertions_sorted::assert_eq!( - state_diff_commitment, - block.header.state_diff_commitment - ); - pretty_assertions_sorted::assert_eq!( - state_diff_length as u64, - block.header.state_diff_length - ); - pretty_assertions_sorted::assert_eq!(transaction_data, block.transaction_data); - pretty_assertions_sorted::assert_eq!(state_update_data, block.state_update.into()); - pretty_assertions_sorted::assert_eq!( - cairo_defs, - block.cairo_defs.into_iter().collect::>() - ); - pretty_assertions_sorted::assert_eq!( - sierra_defs, - block - .sierra_defs - .into_iter() - .map(|(h, s, c)| (h, (s, c))) - .collect::>() - ); - } - } } From 2c9a344b7d64c2b32fb9a1315f26b6c7f3b4f837 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 5 Jul 2024 12:32:25 +0200 Subject: [PATCH 17/18] fix(track): class definitions not being inserted --- crates/pathfinder/src/sync/track.rs | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 6ed8674bc4..e5318cdc55 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -787,7 +787,7 @@ impl ProcessStage for StoreBlock { }| { match definition { class_definitions::CompiledClassDefinition::Cairo(cairo) => db - .insert_cairo_class(hash, &cairo) + .update_cairo_class(hash, &cairo) .context("Inserting cairo class definition"), class_definitions::CompiledClassDefinition::Sierra { sierra_definition, @@ -798,8 +798,7 @@ impl ProcessStage for StoreBlock { .casm_hash(hash) .context("Getting casm hash")? .context("Casm not found")?; - - db.insert_sierra_class( + db.update_sierra_class( &sierra_hash, &sierra_definition, &casm_hash, @@ -929,18 +928,18 @@ mod tests { ); pretty_assertions_sorted::assert_eq!(transaction_data, block.transaction_data); pretty_assertions_sorted::assert_eq!(state_update_data, block.state_update.into()); - pretty_assertions_sorted::assert_eq!( - cairo_defs, - block.cairo_defs.into_iter().collect::>() - ); - pretty_assertions_sorted::assert_eq!( - sierra_defs, - block - .sierra_defs - .into_iter() - .map(|(h, s, c)| (h, (s, c))) - .collect::>() - ); + // pretty_assertions_sorted::assert_eq!( + // cairo_defs, + // block.cairo_defs.into_iter().collect::>() + // ); + // pretty_assertions_sorted::assert_eq!( + // sierra_defs, + // block + // .sierra_defs + // .into_iter() + // .map(|(h, s, c)| (h, (s, c))) + // .collect::>() + // ); } } From 54e596bb78cf539bf692a0ec57e27ff6dd7733e9 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 5 Jul 2024 16:28:37 +0200 Subject: [PATCH 18/18] test: fix get_headers proptest and track sync test assertions --- .../src/p2p_network/sync_handlers.rs | 17 +++++- .../src/p2p_network/sync_handlers/tests.rs | 15 ++++- crates/pathfinder/src/sync/track.rs | 59 ++++++++++--------- crates/storage/src/fake.rs | 44 ++++++-------- 4 files changed, 74 insertions(+), 61 deletions(-) diff --git a/crates/pathfinder/src/p2p_network/sync_handlers.rs b/crates/pathfinder/src/p2p_network/sync_handlers.rs index e0bbd9d557..a1071ab215 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers.rs @@ -25,10 +25,11 @@ use p2p_proto::state::{ }; use p2p_proto::transaction::{TransactionWithReceipt, TransactionsRequest, TransactionsResponse}; use pathfinder_common::{class_definition, BlockHash, BlockNumber}; -use pathfinder_crypto::Felt; use pathfinder_storage::{Storage, Transaction}; use tokio::sync::mpsc; +use crate::state::block_hash::calculate_receipt_commitment; + #[cfg(test)] mod tests; @@ -140,6 +141,18 @@ fn get_header( if let Some((state_diff_commitment, state_diff_len)) = state_diff_cl { tracing::trace!(?header, "Sending block header"); + // TODO this is a temporary solution until receipt commitment is stored in the + // database + let receipts = db_tx + .transaction_data_for_block(block_number.into()) + .context("Getting receipts")? + .context("No receipts found for block")? + .into_iter() + .map(|(_, r, _)| r) + .collect::>(); + let receipt_commitment = calculate_receipt_commitment(&receipts) + .context("Calculating receipt commitment")?; + let txn_count = header .transaction_count .try_into() @@ -167,7 +180,7 @@ fn get_header( .context("invalid event count")?, root: Hash(header.event_commitment.0), }, - receipts: Hash(Felt::ZERO), // TODO + receipts: Hash(receipt_commitment.0), protocol_version: header.starknet_version.to_string(), gas_price_wei: header.eth_l1_gas_price.0, gas_price_fri: header.strk_l1_gas_price.0, diff --git a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs index 348583724a..f94d2e6c8e 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers/tests.rs @@ -498,10 +498,12 @@ mod prop { /// Fixtures for prop tests mod fixtures { - use pathfinder_storage::fake::{with_n_blocks_and_rng, Block}; + use pathfinder_storage::fake::init::Config; + use pathfinder_storage::fake::{with_n_blocks_rng_and_config, Block}; use pathfinder_storage::{Storage, StorageBuilder}; use crate::p2p_network::sync_handlers::MAX_COUNT_IN_TESTS; + use crate::state::block_hash::calculate_receipt_commitment; pub const MAX_NUM_BLOCKS: u64 = MAX_COUNT_IN_TESTS * 2; @@ -510,8 +512,15 @@ mod prop { let storage = StorageBuilder::in_memory().unwrap(); // Explicitly choose RNG to make sure seeded storage is always reproducible let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(seed); - let initializer = - with_n_blocks_and_rng(&storage, num_blocks.try_into().unwrap(), &mut rng); + let initializer = with_n_blocks_rng_and_config( + &storage, + num_blocks.try_into().unwrap(), + &mut rng, + Config { + calculate_receipt_commitment: Box::new(calculate_receipt_commitment), + ..Default::default() + }, + ); (storage, initializer) } } diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index e5318cdc55..bf6242d907 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -848,14 +848,14 @@ mod tests { const N: usize = 10; let blocks = fake::init::with_n_blocks_and_config( N, - Config::new( - |sbh: &SignedBlockHeader, rc: ReceiptCommitment| { + Config { + calculate_block_hash: Box::new(|sbh: &SignedBlockHeader, rc: ReceiptCommitment| { compute_final_hash(&BlockHeaderData::from_signed_header(sbh, rc)) - }, - calculate_transaction_commitment, - calculate_receipt_commitment, - calculate_event_commitment, - ), + }), + calculate_transaction_commitment: Box::new(calculate_transaction_commitment), + calculate_receipt_commitment: Box::new(calculate_receipt_commitment), + calculate_event_commitment: Box::new(calculate_event_commitment), + }, ); let BlockHeader { hash, number, .. } = blocks.last().unwrap().header.header; @@ -882,12 +882,12 @@ mod tests { let mut db = storage.connection().unwrap(); let db = db.transaction().unwrap(); - for mut block in blocks { + for mut expected in blocks { // TODO p2p sync does not update class and storage tries yet - block.header.header.class_commitment = ClassCommitment::ZERO; - block.header.header.storage_commitment = StorageCommitment::ZERO; + expected.header.header.class_commitment = ClassCommitment::ZERO; + expected.header.header.storage_commitment = StorageCommitment::ZERO; - let block_number = block.header.header.number; + let block_number = expected.header.header.number; let block_id = block_number.into(); let header = db.block_header(block_id).unwrap().unwrap(); let signature = db.signature(block_id).unwrap().unwrap(); @@ -916,30 +916,31 @@ mod tests { } } - pretty_assertions_sorted::assert_eq!(header, block.header.header); - pretty_assertions_sorted::assert_eq!(signature, block.header.signature); + pretty_assertions_sorted::assert_eq!(header, expected.header.header); + pretty_assertions_sorted::assert_eq!(signature, expected.header.signature); pretty_assertions_sorted::assert_eq!( state_diff_commitment, - block.header.state_diff_commitment + expected.header.state_diff_commitment ); pretty_assertions_sorted::assert_eq!( state_diff_length as u64, - block.header.state_diff_length + expected.header.state_diff_length + ); + pretty_assertions_sorted::assert_eq!(transaction_data, expected.transaction_data); + pretty_assertions_sorted::assert_eq!(state_update_data, expected.state_update.into()); + pretty_assertions_sorted::assert_eq!( + cairo_defs, + expected.cairo_defs.into_iter().collect::>() + ); + pretty_assertions_sorted::assert_eq!( + sierra_defs, + expected + .sierra_defs + .into_iter() + // All sierra fixtures are not compile-able + .map(|(h, s, _)| (h, (s, b"I'm from the fgw!".to_vec()))) + .collect::>() ); - pretty_assertions_sorted::assert_eq!(transaction_data, block.transaction_data); - pretty_assertions_sorted::assert_eq!(state_update_data, block.state_update.into()); - // pretty_assertions_sorted::assert_eq!( - // cairo_defs, - // block.cairo_defs.into_iter().collect::>() - // ); - // pretty_assertions_sorted::assert_eq!( - // sierra_defs, - // block - // .sierra_defs - // .into_iter() - // .map(|(h, s, c)| (h, (s, c))) - // .collect::>() - // ); } } diff --git a/crates/storage/src/fake.rs b/crates/storage/src/fake.rs index f2b21353b5..3932677114 100644 --- a/crates/storage/src/fake.rs +++ b/crates/storage/src/fake.rs @@ -99,6 +99,19 @@ pub fn with_n_blocks_and_rng(storage: &Storage, n: usize, rng: &mut R) - blocks } +/// Same as [`with_n_blocks`] except caller can specify the rng and additional +/// configuration +pub fn with_n_blocks_rng_and_config( + storage: &Storage, + n: usize, + rng: &mut R, + config: init::Config, +) -> Vec { + let blocks = init::with_n_blocks_rng_and_config(n, rng, config); + fill(storage, &blocks); + blocks +} + /// Raw _fake state initializers_ pub mod init { use std::collections::{HashMap, HashSet}; @@ -146,33 +159,10 @@ pub mod init { >; pub struct Config { - calculate_block_hash: BlockHashFn, - calculate_transaction_commitment: TransactionCommitmentFn, - calculate_receipt_commitment: ReceiptCommitmentFn, - calculate_event_commitment: EventCommitmentFn, - } - - impl Config { - pub fn new( - calculate_block_hash: impl Fn(&SignedBlockHeader, ReceiptCommitment) -> anyhow::Result - + 'static, - calculate_transaction_commitment: impl Fn(&[Transaction], StarknetVersion) -> anyhow::Result - + 'static, - calculate_receipt_commitment: impl Fn(&[Receipt]) -> anyhow::Result - + 'static, - calculate_event_commitment: impl Fn( - &[(TransactionHash, &[Event])], - StarknetVersion, - ) -> anyhow::Result - + 'static, - ) -> Self { - Self { - calculate_block_hash: Box::new(calculate_block_hash), - calculate_transaction_commitment: Box::new(calculate_transaction_commitment), - calculate_receipt_commitment: Box::new(calculate_receipt_commitment), - calculate_event_commitment: Box::new(calculate_event_commitment), - } - } + pub calculate_block_hash: BlockHashFn, + pub calculate_transaction_commitment: TransactionCommitmentFn, + pub calculate_receipt_commitment: ReceiptCommitmentFn, + pub calculate_event_commitment: EventCommitmentFn, } impl Default for Config {