Skip to content

Commit

Permalink
Support for concurrent/parallel verification using stateless verifier…
Browse files Browse the repository at this point in the history
…s with ability to control concurrency
  • Loading branch information
nazar-pc committed Jul 17, 2024
1 parent 8b8e60f commit 61e3ae3
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sp-consensus = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
sp-state-machine = { workspace = true, default-features = true }
tokio = { workspace = true, default-features = true, features = ["macros"] }

[dev-dependencies]
sp-test-primitives = { workspace = true }
24 changes: 22 additions & 2 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use log::{debug, trace};
use std::{
fmt,
future::Future,
num::NonZeroUsize,
ops::Deref,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -129,6 +130,18 @@ pub struct IncomingBlock<B: BlockT> {
/// Verify a justification of a block
#[async_trait::async_trait]
pub trait Verifier<B: BlockT>: Send + Sync {
/// How many blocks can be verified concurrently.
///
/// Defaults to 1, which means blocks are verified sequentially, one at a time.
///
/// value higher than one means verification on blocks can be done in arbitrary order,
/// doesn't expect parent block to be imported first, etc. This significantly improves sync
/// speed by leveraging multiple CPU cores. Good value here is to make concurrency equal to
/// number of CPU cores available.
fn verification_concurrency(&self) -> NonZeroUsize {
NonZeroUsize::new(1).expect("Not zero; qed")
}

/// Verify the given block data and return the `BlockImportParams` to
/// continue the block import process.
async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
Expand All @@ -138,6 +151,10 @@ impl<Block> Verifier<Block> for Arc<dyn Verifier<Block>>
where
Block: BlockT,
{
fn verification_concurrency(&self) -> NonZeroUsize {
(**self).verification_concurrency()
}

fn verify<'life0, 'async_trait>(
&'life0 self,
block: BlockImportParams<Block>,
Expand Down Expand Up @@ -278,7 +295,9 @@ where
Block: BlockT,
BI: BlockImport<Block, Error = ConsensusError>,
{
match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None)
.await?
{
SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
SingleBlockVerificationOutcome::Verified(import_parameters) =>
import_single_block_metered(import_handle, import_parameters, None).await,
Expand Down Expand Up @@ -347,6 +366,7 @@ pub(crate) async fn verify_single_block_metered<Block, BI>(
block_origin: BlockOrigin,
block: IncomingBlock<Block>,
verifier: &dyn Verifier<Block>,
allow_missing_parent: bool,
metrics: Option<&Metrics>,
) -> Result<SingleBlockVerificationOutcome<Block>, BlockImportError>
where
Expand Down Expand Up @@ -383,7 +403,7 @@ where
parent_hash,
allow_missing_state: block.allow_missing_state,
import_existing: block.import_existing,
allow_missing_parent: block.state.is_some(),
allow_missing_parent: allow_missing_parent || block.state.is_some(),
})
.await,
)? {
Expand Down
137 changes: 96 additions & 41 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::{
prelude::*,
stream::FuturesOrdered,
task::{Context, Poll},
};
use log::{debug, trace};
Expand All @@ -27,7 +28,15 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
Justification, Justifications,
};
use std::pin::Pin;
use std::{
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{runtime::Handle, task};

use crate::{
import_queue::{
Expand Down Expand Up @@ -223,11 +232,12 @@ mod worker_messages {
/// Returns when `block_import` ended.
async fn block_import_process<B: BlockT>(
mut block_import: SharedBlockImport<B>,
mut verifier: impl Verifier<B>,
verifier: impl Verifier<B> + 'static,
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
) {
let verifier: Arc<dyn Verifier<B>> = Arc::new(verifier);
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
{
Expand All @@ -241,9 +251,15 @@ async fn block_import_process<B: BlockT>(
},
};

let res =
import_many_blocks(&mut block_import, origin, blocks, &mut verifier, metrics.clone())
.await;
let res = import_many_blocks_with_verification_concurrency(
&mut block_import,
origin,
blocks,
&verifier,
metrics.clone(),
verifier.verification_concurrency(),
)
.await;

result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
Expand Down Expand Up @@ -385,12 +401,16 @@ struct ImportManyBlocksResult<B: BlockT> {
///
/// This will yield after each imported block once, to ensure that other futures can
/// be called as well.
async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
///
/// When verification concurrency is set to value higher than 1, block verification will happen in
/// parallel to block import, reducing overall time required.
async fn import_many_blocks_with_verification_concurrency<B: BlockT>(
import_handle: &mut SharedBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &mut V,
verifier: &Arc<dyn Verifier<B>>,
metrics: Option<Metrics>,
verification_concurrency: NonZeroUsize,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();

Expand All @@ -405,41 +425,67 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(

trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range);

let mut imported = 0;
let mut results = vec![];
let mut has_error = false;
let mut blocks = blocks.into_iter();
let has_error = Arc::new(AtomicBool::new(false));

let verify_block_task =
|index, block: IncomingBlock<B>, import_handle: &SharedBlockImport<B>| {
let import_handle = import_handle.clone();
let verifier = Arc::clone(verifier);
let metrics = metrics.clone();
let has_error = Arc::clone(&has_error);

async move {
let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;

let result = if has_error.load(Ordering::Acquire) {
Err(BlockImportError::Cancelled)
} else {
task::spawn_blocking(move || {
Handle::current().block_on(verify_single_block_metered(
&import_handle,
blocks_origin,
block,
&verifier,
// Check parent for the first block, but skip for others since blocks
// are verified concurrently before being imported.
index != 0,
metrics.as_ref(),
))
})
.await
.unwrap_or_else(|error| {
Err(BlockImportError::Other(sp_consensus::Error::Other(
format!("Failed to join on block verification: {error}").into(),
)))
})
};

(block_number, block_hash, result)
}
};

// Blocks in the response/drain should be in ascending order.
loop {
// Is there any block left to import?
let block = match blocks.next() {
Some(b) => b,
None => {
// No block left to import, success!
return ImportManyBlocksResult { block_count: count, imported, results }
},
};
let mut blocks_to_verify = blocks.into_iter().enumerate();
let mut verified_blocks = blocks_to_verify
.by_ref()
.take(verification_concurrency.get())
.map(|(index, block)| verify_block_task(index, block, import_handle))
.collect::<FuturesOrdered<_>>();

let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;
let import_result = if has_error {
let mut imported = 0;
let mut results = vec![];

while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await {
let import_result = if has_error.load(Ordering::Acquire) {
Err(BlockImportError::Cancelled)
} else {
let verification_fut = verify_single_block_metered(
import_handle,
blocks_origin,
block,
verifier,
metrics.as_ref(),
);
match verification_fut.await {
// The actual import.
match verification_result {
Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
// The actual import.
Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) =>
import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
.await
},
.await,
Err(e) => Err(e),
}
};
Expand All @@ -457,13 +503,21 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
);
imported += 1;
} else {
has_error = true;
has_error.store(true, Ordering::Release);
}

results.push((import_result, block_hash));

// Add more blocks into verification queue if there are any
if let Some((index, block)) = blocks_to_verify.next() {
verified_blocks.push_back(verify_block_task(index, block, import_handle));
}

Yield::new().await
}

// No block left to import, success!
ImportManyBlocksResult { block_count: count, imported, results }
}

/// A future that will always `yield` on the first call of `poll` but schedules the
Expand Down Expand Up @@ -502,7 +556,7 @@ mod tests {
},
import_queue::Verifier,
};
use futures::{executor::block_on, Future};
use futures::Future;
use sp_test_primitives::{Block, BlockNumber, Hash, Header};

#[async_trait::async_trait]
Expand Down Expand Up @@ -586,8 +640,8 @@ mod tests {
}
}

#[test]
fn prioritizes_finality_work_over_block_import() {
#[tokio::test]
async fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);

let (worker, finality_sender, block_import_sender) = BlockImportWorker::new(
Expand Down Expand Up @@ -659,7 +713,7 @@ mod tests {
let justification3 = import_justification();

// we poll the worker until we have processed 9 events
block_on(futures::future::poll_fn(|cx| {
futures::future::poll_fn(|cx| {
while link.events.len() < 9 {
match Future::poll(Pin::new(&mut worker), cx) {
Poll::Pending => {},
Expand All @@ -670,7 +724,8 @@ mod tests {
}

Poll::Ready(())
}));
})
.await;

// all justification tasks must be done before any block import work
assert_eq!(
Expand Down

0 comments on commit 61e3ae3

Please sign in to comment.