diff --git a/Cargo.lock b/Cargo.lock index 924c762445..80688f9e9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3580,6 +3580,18 @@ dependencies = [ "num-traits 0.2.17", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -5547,6 +5559,15 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -6084,6 +6105,7 @@ dependencies = [ "const-decoder", "fake", "flate2", + "flume", "futures", "http", "ipnet", @@ -6778,7 +6800,7 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -6812,7 +6834,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.48", @@ -7848,6 +7870,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index ef2d628084..8c0d0ba5df 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -26,6 +26,7 @@ cairo-lang-starknet = "2.4.0" clap = { workspace = true, features = ["derive", "env", "wrap_help"] } console-subscriber = { version = "0.1.10", optional = true } fake = { workspace = true } +flume = "0.11.0" futures = { workspace = true } ipnet = { workspace = true } lazy_static = { workspace = true } diff --git a/crates/pathfinder/src/p2p_network/sync_handlers.rs b/crates/pathfinder/src/p2p_network/sync_handlers.rs index c7900b8f16..26a507c2cd 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers.rs @@ -1,6 +1,6 @@ use anyhow::Context; use futures::channel::mpsc; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use p2p_proto::class::{Class, ClassesRequest, ClassesResponse}; use p2p_proto::common::{ Address, BlockNumberOrHash, ConsensusSignature, Direction, Hash, Iteration, Merkle, Patricia, @@ -36,8 +36,7 @@ pub async fn get_headers( request: BlockHeadersRequest, tx: mpsc::Sender, ) -> anyhow::Result<()> { - let responses = spawn_blocking_get(request, storage, blocking::get_headers).await?; - send(tx, responses).await + spawn_blocking_get(request, storage, blocking::get_headers, tx).await } pub async fn get_classes( @@ -45,8 +44,7 @@ pub async fn get_classes( request: ClassesRequest, tx: mpsc::Sender, ) -> anyhow::Result<()> { - let responses = spawn_blocking_get(request, storage, blocking::get_classes).await?; - send(tx, responses).await + spawn_blocking_get(request, storage, blocking::get_classes, tx).await } pub async fn get_state_diffs( @@ -54,8 +52,7 @@ pub async fn get_state_diffs( request: StateDiffsRequest, tx: mpsc::Sender, ) -> anyhow::Result<()> { - let responses = spawn_blocking_get(request, storage, blocking::get_state_diffs).await?; - send(tx, responses).await + spawn_blocking_get(request, storage, blocking::get_state_diffs, tx).await } pub async fn get_transactions( @@ -63,8 +60,7 @@ pub async fn get_transactions( request: TransactionsRequest, tx: mpsc::Sender, ) -> anyhow::Result<()> { - let responses = spawn_blocking_get(request, storage, blocking::get_transactions).await?; - send(tx, responses).await + spawn_blocking_get(request, storage, blocking::get_transactions, tx).await } pub async fn get_receipts( @@ -72,8 +68,7 @@ pub async fn get_receipts( request: ReceiptsRequest, tx: mpsc::Sender, ) -> anyhow::Result<()> { - let responses = spawn_blocking_get(request, storage, blocking::get_receipts).await?; - send(tx, responses).await + spawn_blocking_get(request, storage, blocking::get_receipts, tx).await } pub async fn get_events( @@ -81,69 +76,74 @@ pub async fn get_events( request: EventsRequest, tx: mpsc::Sender, ) -> anyhow::Result<()> { - let responses = spawn_blocking_get(request, storage, blocking::get_events).await?; - send(tx, responses).await + spawn_blocking_get(request, storage, blocking::get_events, tx).await } pub(crate) mod blocking { use super::*; pub(crate) fn get_headers( - tx: Transaction<'_>, + db_tx: Transaction<'_>, request: BlockHeadersRequest, - ) -> anyhow::Result> { - iterate(tx, request.iteration, get_header) + tx: flume::Sender, + ) -> anyhow::Result<()> { + iterate(db_tx, request.iteration, get_header, tx) } pub(crate) fn get_classes( - tx: Transaction<'_>, + db_tx: Transaction<'_>, request: ClassesRequest, - ) -> anyhow::Result> { - iterate(tx, request.iteration, get_classes_for_block) + tx: flume::Sender, + ) -> anyhow::Result<()> { + iterate(db_tx, request.iteration, get_classes_for_block, tx) } pub(crate) fn get_state_diffs( - tx: Transaction<'_>, + db_tx: Transaction<'_>, request: StateDiffsRequest, - ) -> anyhow::Result> { - iterate(tx, request.iteration, get_state_diff) + tx: flume::Sender, + ) -> anyhow::Result<()> { + iterate(db_tx, request.iteration, get_state_diff, tx) } pub(crate) fn get_transactions( - tx: Transaction<'_>, + db_tx: Transaction<'_>, request: TransactionsRequest, - ) -> anyhow::Result> { - iterate(tx, request.iteration, get_transactions_for_block) + tx: flume::Sender, + ) -> anyhow::Result<()> { + iterate(db_tx, request.iteration, get_transactions_for_block, tx) } pub(crate) fn get_receipts( - tx: Transaction<'_>, + db_tx: Transaction<'_>, request: ReceiptsRequest, - ) -> anyhow::Result> { - iterate(tx, request.iteration, get_receipts_for_block) + tx: flume::Sender, + ) -> anyhow::Result<()> { + iterate(db_tx, request.iteration, get_receipts_for_block, tx) } pub(crate) fn get_events( - tx: Transaction<'_>, + db_tx: Transaction<'_>, request: EventsRequest, - ) -> anyhow::Result> { - iterate(tx, request.iteration, get_events_for_block) + tx: flume::Sender, + ) -> anyhow::Result<()> { + iterate(db_tx, request.iteration, get_events_for_block, tx) } } fn get_header( - tx: &Transaction<'_>, + db_tx: &Transaction<'_>, block_number: BlockNumber, - responses: &mut Vec, + tx: &flume::Sender, ) -> anyhow::Result { - if let Some(header) = tx.block_header(block_number.into())? { - if let Some(signature) = tx.signature(block_number.into())? { + if let Some(header) = db_tx.block_header(block_number.into())? { + if let Some(signature) = db_tx.signature(block_number.into())? { let txn_count = header .transaction_count .try_into() .context("invalid transaction count")?; - responses.push(BlockHeadersResponse::Header(Box::new(SignedBlockHeader { + tx.send(BlockHeadersResponse::Header(Box::new(SignedBlockHeader { block_hash: Hash(header.hash.0), parent_hash: Hash(header.parent_hash.0), number: header.number.get(), @@ -179,13 +179,14 @@ fn get_header( r: signature.r.0, s: signature.s.0, }], - }))); - } + }))) + .map_err(|_| anyhow::anyhow!("Sending header"))?; - Ok(true) - } else { - Ok(false) + return Ok(true); + } } + + Ok(false) } #[derive(Debug, Clone)] @@ -195,13 +196,13 @@ enum ClassDefinition { } fn get_classes_for_block( - tx: &Transaction<'_>, + db_tx: &Transaction<'_>, block_number: BlockNumber, - responses: &mut Vec, + tx: &flume::Sender, ) -> anyhow::Result { let get_definition = |block_number: BlockNumber, class_hash| -> anyhow::Result { - let definition = tx + let definition = db_tx .class_definition_at(block_number.into(), class_hash)? .ok_or_else(|| { anyhow::anyhow!( @@ -210,7 +211,7 @@ fn get_classes_for_block( block_number ) })?; - let casm_definition = tx.casm_definition(class_hash)?; + let casm_definition = db_tx.casm_definition(class_hash)?; Ok(match casm_definition { Some(casm) => ClassDefinition::Sierra { sierra: definition, @@ -220,8 +221,9 @@ fn get_classes_for_block( }) }; - let declared_classes = tx.declared_classes_at(block_number.into())?; - let mut classes = Vec::new(); + let Some(declared_classes) = db_tx.declared_classes_at(block_number.into())? else { + return Ok(false); + }; for class_hash in declared_classes { let class_definition = get_definition(block_number, class_hash)?; @@ -246,132 +248,126 @@ fn get_classes_for_block( } } }; - classes.push(ClassesResponse::Class(class)); - } - responses.extend(classes); + tx.send(ClassesResponse::Class(class)) + .map_err(|_| anyhow::anyhow!("Sending class"))?; + } Ok(true) } fn get_state_diff( - tx: &Transaction<'_>, + db_tx: &Transaction<'_>, block_number: BlockNumber, - responses: &mut Vec, + tx: &flume::Sender, ) -> anyhow::Result { - let Some(state_diff) = tx.state_update(block_number.into())? else { + let Some(state_diff) = db_tx.state_update(block_number.into())? else { return Ok(false); }; - state_diff - .contract_updates - .into_iter() - .for_each(|(address, update)| { - responses.push(StateDiffsResponse::ContractDiff(ContractDiff { - address: Address(address.0), - nonce: update.nonce.map(|n| n.0), - class_hash: update.class.as_ref().map(|c| c.class_hash().0), - is_replaced: update.class.map(|c| c.is_replaced()), - values: update - .storage - .into_iter() - .map(|(k, v)| ContractStoredValue { - key: k.0, - value: v.0, - }) - .collect(), - domain: 0, // TODO - })) - }); - - state_diff - .system_contract_updates - .into_iter() - .for_each(|(address, update)| { - responses.push(StateDiffsResponse::ContractDiff(ContractDiff { - address: Address(address.0), - nonce: None, - class_hash: None, - is_replaced: None, - values: update - .storage - .into_iter() - .map(|(k, v)| ContractStoredValue { - key: k.0, - value: v.0, - }) - .collect(), - domain: 0, // TODO - })) - }); + for (address, update) in state_diff.contract_updates { + tx.send(StateDiffsResponse::ContractDiff(ContractDiff { + address: Address(address.0), + nonce: update.nonce.map(|n| n.0), + class_hash: update.class.as_ref().map(|c| c.class_hash().0), + is_replaced: update.class.map(|c| c.is_replaced()), + values: update + .storage + .into_iter() + .map(|(k, v)| ContractStoredValue { + key: k.0, + value: v.0, + }) + .collect(), + domain: 0, // TODO + })) + .map_err(|_| anyhow::anyhow!("Sending state diff"))?; + } + + for (address, update) in state_diff.system_contract_updates { + tx.send(StateDiffsResponse::ContractDiff(ContractDiff { + address: Address(address.0), + nonce: None, + class_hash: None, + is_replaced: None, + values: update + .storage + .into_iter() + .map(|(k, v)| ContractStoredValue { + key: k.0, + value: v.0, + }) + .collect(), + domain: 0, // TODO + })) + .map_err(|_| anyhow::anyhow!("Sending state diff"))?; + } Ok(true) } fn get_transactions_for_block( - tx: &Transaction<'_>, + db_tx: &Transaction<'_>, block_number: BlockNumber, - responses: &mut Vec, + tx: &flume::Sender, ) -> anyhow::Result { - let Some(txn_data) = tx.transaction_data_for_block(block_number.into())? else { + let Some(txn_data) = db_tx.transaction_data_for_block(block_number.into())? else { return Ok(false); }; - responses.extend( - txn_data - .into_iter() - .map(|(tnx, _)| TransactionsResponse::Transaction(tnx.to_dto())), - ); + for (txn, _) in txn_data { + tx.send(TransactionsResponse::Transaction(txn.to_dto())) + .map_err(|_| anyhow::anyhow!("Sending transaction"))?; + } Ok(true) } fn get_receipts_for_block( - tx: &Transaction<'_>, + db_tx: &Transaction<'_>, block_number: BlockNumber, - responses: &mut Vec, + tx: &flume::Sender, ) -> anyhow::Result { - let Some(txn_data) = tx.transaction_data_for_block(block_number.into())? else { + let Some(txn_data) = db_tx.transaction_data_for_block(block_number.into())? else { return Ok(false); }; - responses.extend( - txn_data - .into_iter() - .map(ToDto::to_dto) - .map(ReceiptsResponse::Receipt), - ); + for tr in txn_data { + tx.send(ReceiptsResponse::Receipt(tr.to_dto())) + .map_err(|_| anyhow::anyhow!("Sending receipt"))?; + } Ok(true) } fn get_events_for_block( - tx: &Transaction<'_>, + db_tx: &Transaction<'_>, block_number: BlockNumber, - responses: &mut Vec, + tx: &flume::Sender, ) -> anyhow::Result { - let Some(txn_data) = tx.transaction_data_for_block(block_number.into())? else { + let Some(txn_data) = db_tx.transaction_data_for_block(block_number.into())? else { return Ok(false); }; - responses.extend(txn_data.into_iter().flat_map(|(_, r)| { - std::iter::repeat(r.transaction_hash) - .zip(r.events) - .map(ToDto::to_dto) - .map(EventsResponse::Event) - })); + for (_, r) in txn_data { + for event in r.events { + tx.send(EventsResponse::Event((r.transaction_hash, event).to_dto())) + .map_err(|_| anyhow::anyhow!("Sending event"))?; + } + } Ok(true) } /// Assupmtions: -/// - `block_handler` returns `Ok(true)` if the iteration should continue. +/// - `block_handler` returns `Ok(true)` if the iteration should continue, /// - `T::default()` always returns the `Fin` variant of the implementing type. fn iterate( - tx: Transaction<'_>, + db_tx: Transaction<'_>, iteration: Iteration, - block_handler: impl Fn(&Transaction<'_>, BlockNumber, &mut Vec) -> anyhow::Result, -) -> anyhow::Result> { + block_handler: impl Fn(&Transaction<'_>, BlockNumber, &flume::Sender) -> anyhow::Result, + tx: flume::Sender, +) -> anyhow::Result<()> { let Iteration { start, direction, @@ -380,26 +376,27 @@ fn iterate( } = iteration; if limit == 0 { - return Ok(vec![T::default()]); + tx.send(T::default()) + .map_err(|_| anyhow::anyhow!("Sending Fin"))?; + return Ok(()); } - let mut block_number = match get_start_block_number(start, &tx)? { + let mut block_number = match get_start_block_number(start, &db_tx)? { Some(x) => x, None => { - return Ok(vec![T::default()]); + tx.send(T::default()) + .map_err(|_| anyhow::anyhow!("Sending Fin"))?; + return Ok(()); } }; - let mut responses = Vec::new(); let limit = limit.min(MAX_BLOCKS_COUNT); for i in 0..limit { - if block_handler(&tx, block_number, &mut responses)? { - // Block data retrieved successfully - } else { + if !block_handler(&db_tx, block_number, &tx)? { // No such block break; - } + }; if i < limit - 1 { block_number = match get_next_block_number(block_number, step, direction) { @@ -408,13 +405,14 @@ fn iterate( // Out of range block number value break; } - }; + } } } - responses.push(T::default()); + tx.send(T::default()) + .map_err(|_| anyhow::anyhow!("Sending Fin"))?; - Ok(responses) + Ok(()) } fn get_start_block_number( @@ -427,41 +425,52 @@ fn get_start_block_number( }) } +/// Spawns a blocking task and forwards the result to the given channel. +/// Bails out early if the database operation fails or sending fails. +/// The `getter` function is expected to send partial results through the flume channel as soon as possible, +/// ideally after each database read operation. async fn spawn_blocking_get( request: Request, storage: Storage, getter: Getter, -) -> anyhow::Result + mut tx: mpsc::Sender, +) -> anyhow::Result<()> where Request: Send + 'static, Response: Send + 'static, - Getter: FnOnce(Transaction<'_>, Request) -> anyhow::Result + Send + 'static, + Getter: FnOnce(Transaction<'_>, Request, flume::Sender) -> anyhow::Result<()> + + Send + + 'static, { let span = tracing::Span::current(); - tokio::task::spawn_blocking(move || { - let _g = span.enter(); - let mut connection = storage - .connection() - .context("Opening database connection")?; - let tx = connection - .transaction() - .context("Creating database transaction")?; - getter(tx, request) - }) - .await - .context("Database read panic or shutting down")? -} - -async fn send(mut tx: mpsc::Sender, seq: Vec) -> anyhow::Result<()> -where - T: Send + 'static, - tokio::sync::mpsc::error::SendError: Sync, -{ - for elem in seq { - tx.send(elem).await.context("Sending response")?; - } + let (sync_tx, rx) = flume::bounded(0); // For backpressure + + let db_fut = async { + tokio::task::spawn_blocking(move || { + let _g = span.enter(); + let mut connection = storage + .connection() + .context("Opening database connection")?; + let db_tx = connection + .transaction() + .context("Creating database transaction")?; + getter(db_tx, request, sync_tx) + }) + .await + .context("Database read panic or shutting down")? + .context("Database read") + }; + let fwd_fut = async move { + let mut rx = rx.into_stream(); + while let Some(x) = rx.next().await { + tx.send(x).await.context("Sending item")?; + } + Ok::<_, anyhow::Error>(()) + }; + // Bail out early, either when db fails or sending fails + tokio::try_join!(db_fut, fwd_fut)?; Ok(()) } diff --git a/crates/storage/src/connection.rs b/crates/storage/src/connection.rs index bb162367ae..d0ee9ff8a9 100644 --- a/crates/storage/src/connection.rs +++ b/crates/storage/src/connection.rs @@ -387,7 +387,7 @@ impl<'inner> Transaction<'inner> { } /// Returns hashes of Cairo and Sierra classes declared at a given block. - pub fn declared_classes_at(&self, block: BlockId) -> anyhow::Result> { + pub fn declared_classes_at(&self, block: BlockId) -> anyhow::Result>> { state_update::declared_classes_at(self, block) } diff --git a/crates/storage/src/connection/state_update.rs b/crates/storage/src/connection/state_update.rs index 80a2ac22d3..bbf9b3931c 100644 --- a/crates/storage/src/connection/state_update.rs +++ b/crates/storage/src/connection/state_update.rs @@ -278,9 +278,9 @@ pub(super) fn state_update( pub(super) fn declared_classes_at( tx: &Transaction<'_>, block: BlockId, -) -> anyhow::Result> { +) -> anyhow::Result>> { let Some((block_number, _)) = block_id(tx, block).context("Querying block header")? else { - return Ok(Vec::new()); + return Ok(None); }; let mut stmt = tx @@ -305,7 +305,7 @@ pub(super) fn declared_classes_at( result.push(class_hash); } - Ok(result) + Ok(Some(result)) } pub(super) fn storage_value(