Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audit p2p sync errors #2353

Merged
merged 23 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5eda931
refactor(block_hash): compute_final_hash is infallible
CHr15F0x Oct 31, 2024
cc2e631
refactor: rename variable
CHr15F0x Oct 31, 2024
0b33f1f
feat(sync/checkpoint): report chunk tail when syncing headers
CHr15F0x Oct 31, 2024
c9de76c
fix(sync): fetching casm error is recoverable
CHr15F0x Oct 31, 2024
d0e8dbe
doc(sync): add comment
CHr15F0x Nov 4, 2024
fff3243
refactor(sync): remove VerifyCommitment2
CHr15F0x Nov 4, 2024
a1921c2
refactor(sync): deduplicate UpdateStarknetState impl
CHr15F0x Nov 4, 2024
4aedd1d
fix(sync): state diff commitment, starknet version and transaction co…
CHr15F0x Nov 4, 2024
213d8f0
refactor(sync): rename SyncError::Other to Fatal
CHr15F0x Nov 4, 2024
ad738e5
refactor(sync): sort SyncError variants
CHr15F0x Nov 4, 2024
b57f755
refactor(sync): add missing variants to SyncError
CHr15F0x Nov 4, 2024
1114357
feat(sync/error): add PartialEq impl to SyncError
CHr15F0x Nov 4, 2024
c94f441
refactor(sync/error): remove SyncError2
CHr15F0x Nov 4, 2024
c98e3e0
refactor(p2p): p2p client stream apis only report fatal errors
CHr15F0x Nov 4, 2024
0b802c9
refactor: report proper peer upon error when using the pipe api
CHr15F0x Nov 4, 2024
77fdcb9
refactor(sync): use SyncError directly where possible
CHr15F0x Nov 4, 2024
6811fb3
feat(sync): add missing logs
CHr15F0x Nov 5, 2024
41a238c
test(pathfinder/sync/stream): update tests
CHr15F0x Nov 5, 2024
24b1f1e
test(pathfinder/sync/checkpoint): update tests
CHr15F0x Nov 5, 2024
b5b4158
chore: clippy
CHr15F0x Nov 5, 2024
307117e
chore: remove outdated todos and a stray empty comment
CHr15F0x Nov 5, 2024
b8e417c
doc: fix typos
CHr15F0x Nov 7, 2024
98340ec
chore: remove unused import
CHr15F0x Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions crates/p2p/src/client/peer_agnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,9 @@ mod transaction_stream {
tracing::trace!(?start, ?stop, "Streaming Transactions");

make_stream::from_future(move |tx| async move {
let mut counts_and_commitments_stream = Box::pin(counts_stream);
let mut expected_transaction_counts_stream = Box::pin(counts_stream);

let cnt = match try_next(&mut counts_and_commitments_stream).await {
let cnt = match try_next(&mut expected_transaction_counts_stream).await {
Ok(x) => x,
Err(e) => {
_ = tx.send(Err(e)).await;
Expand Down Expand Up @@ -826,7 +826,7 @@ mod transaction_stream {
if yield_block(
peer,
&mut progress,
&mut counts_and_commitments_stream,
&mut expected_transaction_counts_stream,
transactions,
&mut start,
stop,
Expand Down Expand Up @@ -1557,14 +1557,15 @@ mod event_stream {

async fn try_next<T>(
count_stream: &mut (impl Stream<Item = anyhow::Result<T>> + Unpin + Send + 'static),
) -> Result<T, PeerData<anyhow::Error>> {
) -> Result<T, anyhow::Error> {
match count_stream.next().await {
Some(Ok(cnt)) => Ok(cnt),
Some(Err(e)) => Err(PeerData::new(PeerId::random(), e)),
None => Err(PeerData::new(
PeerId::random(),
anyhow::anyhow!("Count stream terminated prematurely"),
)),
// This is a non-recoverable error, because "Counter" streams fail only if the underlying
// database fails.
Some(Err(e)) => Err(e),
// This is a non-recoverable error, because we expect all the necessary headers that are the
// source of the stream to be in the database.
None => Err(anyhow::anyhow!("Count stream terminated prematurely")),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/p2p/src/client/peer_agnostic/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::client::types::{
};
use crate::PeerData;

pub type StreamItem<T> = Result<PeerData<T>, PeerData<anyhow::Error>>;
pub type StreamItem<T> = Result<PeerData<T>, anyhow::Error>;

pub trait HeaderStream {
fn header_stream(
Expand Down
6 changes: 0 additions & 6 deletions crates/p2p/src/peer_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ impl<T> PeerData<T> {
Self { peer, data }
}

pub fn from_result<E>(peer: PeerId, result: Result<T, E>) -> Result<PeerData<T>, PeerData<E>> {
result
.map(|x| Self::new(peer, x))
.map_err(|e| PeerData::<E>::new(peer, e))
}

pub fn for_tests(data: T) -> Self {
vbar marked this conversation as resolved.
Show resolved Hide resolved
Self {
peer: PeerId::random(),
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/examples/compute_pre0132_hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ fn main() -> anyhow::Result<()> {

// Compute the block hash in the 0.13.2 style
let header_data = get_header_data(&header);
let new_block_hash = compute_final_hash(&header_data).context("Computing block hash")?;
let new_block_hash = compute_final_hash(&header_data);

// Write to the CSV file
writeln!(csv_file, "{},{}", block_number, new_block_hash)?;
Expand Down
21 changes: 9 additions & 12 deletions crates/pathfinder/src/state/block_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn verify_block_hash(
false
}
} else {
let computed_hash = compute_final_hash(&header)?;
let computed_hash = compute_final_hash(&header);
computed_hash == header.hash
};

Expand Down Expand Up @@ -405,7 +405,7 @@ fn compute_final_hash_pre_0_13_2(header: &BlockHeaderData) -> BlockHash {
BlockHash(chain.finalize())
}

fn compute_final_hash_v0(header: &BlockHeaderData) -> Result<BlockHash> {
fn compute_final_hash_v0(header: &BlockHeaderData) -> BlockHash {
// Hash the block header.
let mut hasher = PoseidonHasher::new();
hasher.write(felt_bytes!(b"STARKNET_BLOCK_HASH0").into());
Expand All @@ -429,12 +429,12 @@ fn compute_final_hash_v0(header: &BlockHeaderData) -> Result<BlockHash> {
);
hasher.write(MontFelt::ZERO);
hasher.write(header.parent_hash.0.into());
Ok(BlockHash(hasher.finish().into()))
BlockHash(hasher.finish().into())
}

// Bumps the initial STARKNET_BLOCK_HASH0 to STARKNET_BLOCK_HASH1,
// replaces gas price elements with gas_prices_hash.
fn compute_final_hash_v1(header: &BlockHeaderData) -> Result<BlockHash> {
fn compute_final_hash_v1(header: &BlockHeaderData) -> BlockHash {
// Hash the block header.
let mut hasher = PoseidonHasher::new();
hasher.write(felt_bytes!(b"STARKNET_BLOCK_HASH1").into());
Expand All @@ -455,10 +455,10 @@ fn compute_final_hash_v1(header: &BlockHeaderData) -> Result<BlockHash> {
);
hasher.write(MontFelt::ZERO);
hasher.write(header.parent_hash.0.into());
Ok(BlockHash(hasher.finish().into()))
BlockHash(hasher.finish().into())
}

pub fn compute_final_hash(header: &BlockHeaderData) -> Result<BlockHash> {
pub fn compute_final_hash(header: &BlockHeaderData) -> BlockHash {
if header.starknet_version < StarknetVersion::V_0_13_4 {
compute_final_hash_v0(header)
} else {
Expand Down Expand Up @@ -927,7 +927,7 @@ mod tests {
}
});

assert_eq!(compute_final_hash(&block_header).unwrap(), expected_hash);
assert_eq!(compute_final_hash(&block_header), expected_hash);
}

#[test]
Expand Down Expand Up @@ -1235,7 +1235,7 @@ mod tests {
let expected_hash = BlockHash(felt!(
"0x061e4998d51a248f1d0288d7e17f6287757b0e5e6c5e1e58ddf740616e312134"
));
assert_eq!(compute_final_hash(&header).unwrap(), expected_hash);
assert_eq!(compute_final_hash(&header), expected_hash);
}

// Source
Expand Down Expand Up @@ -1271,9 +1271,6 @@ mod tests {
)
.unwrap();

assert_eq!(
compute_final_hash(&block_header_data).unwrap(),
expected_hash
);
assert_eq!(compute_final_hash(&block_header_data), expected_hash);
}
}
29 changes: 13 additions & 16 deletions crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use error::{SyncError, SyncError2};
use error::SyncError;
use futures::{pin_mut, Stream, StreamExt};
use p2p::client::peer_agnostic::Client as P2PClient;
use p2p::PeerData;
Expand Down Expand Up @@ -63,9 +63,9 @@ impl Sync {
self.track_sync(next, parent_hash).await
}

async fn handle_recoverable_error(&self, err: &PeerData<error::SyncError2>) {
async fn handle_recoverable_error(&self, err: &error::SyncError) {
// TODO
tracing::debug!(?err, "Log and punish as appropriate");
tracing::debug!(%err, "Log and punish as appropriate");
}

/// Retry forever until a valid L1 checkpoint is retrieved
Expand Down Expand Up @@ -126,13 +126,13 @@ impl Sync {
tracing::debug!(?continue_from, "Checkpoint sync complete");
continue_from
}
Err(SyncError::Other(error)) => {
tracing::error!(?error, "Stopping checkpoint sync");
return Err(error);
Err(SyncError::Fatal(mut error)) => {
tracing::error!(%error, "Stopping checkpoint sync");
return Err(error.take_or_deep_clone());
}
Err(error) => {
tracing::debug!(?error, "Restarting checkpoint sync");
self.handle_recoverable_error(&error.into_v2()).await;
tracing::debug!(%error, "Restarting checkpoint sync");
self.handle_recoverable_error(&error).await;
continue;
}
};
Expand Down Expand Up @@ -180,19 +180,16 @@ impl Sync {
.run(next, parent_hash, self.fgw_client.clone())
.await;

match &mut result {
match result {
Ok(_) => tracing::debug!("Restarting track sync: unexpected end of Block stream"),
Err(PeerData {
data: SyncError2::Other(error),
..
}) => {
tracing::error!(?error, "Stopping track sync");
Err(SyncError::Fatal(mut error)) => {
tracing::error!(%error, "Stopping track sync");
use pathfinder_common::error::AnyhowExt;
return Err(error.take_or_deep_clone());
}
Err(error) => {
tracing::debug!(error=?error.data, "Restarting track sync");
self.handle_recoverable_error(error).await;
tracing::debug!(%error, "Restarting track sync");
self.handle_recoverable_error(&error).await;
}
}
}
Expand Down
Loading