Skip to content

Commit

Permalink
Merge of #6887
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 12, 2023
2 parents f9a4826 + b825958 commit 5be9262
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 166 deletions.
38 changes: 38 additions & 0 deletions zebra-consensus/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! Asynchronous verification of cryptographic primitives.
use tokio::sync::oneshot::error::RecvError;

use crate::BoxError;

pub mod ed25519;
pub mod groth16;
pub mod halo2;
Expand All @@ -11,3 +15,37 @@ const MAX_BATCH_SIZE: usize = 64;

/// The maximum latency bound for any of the batch verifiers.
const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100);

/// Fires off a task into the Rayon threadpool, awaits the result through a oneshot channel,
/// then converts the error to a [`BoxError`].
pub async fn spawn_fifo_and_convert<
E: 'static + std::error::Error + Into<BoxError> + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
f: F,
) -> Result<(), BoxError> {
spawn_fifo(f)
.await
.map_err(|_| {
"threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?"
})?
.map_err(BoxError::from)
}

/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel.
pub async fn spawn_fifo<
E: 'static + std::error::Error + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
f: F,
) -> Result<Result<(), E>, RecvError> {
// Rayon doesn't have a spawn function that returns a value,
// so we use a oneshot channel instead.
let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel();

rayon::spawn_fifo(move || {
let _ = rsp_tx.send(f());
});

rsp_rx.await
}
48 changes: 17 additions & 31 deletions zebra-consensus/src/primitives/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;

use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::{batch, *};

use crate::BoxError;

use super::{spawn_fifo, spawn_fifo_and_convert};

#[cfg(test)]
mod tests;

Expand All @@ -43,7 +46,10 @@ pub type Item = batch::Item;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Expand Down Expand Up @@ -120,43 +126,22 @@ impl Verifier {

/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx)))
})
.map(|join_result| join_result.expect("panic in ed25519 batch verifier"))
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}

/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(item: Item) -> impl Future<Output = VerifyResult> {
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(|| {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(|item| item.verify_single())
.collect()
})
.map(|join_result| join_result.expect("panic in ed25519 fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single()).await
}
}

impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -174,7 +159,8 @@ impl Service<BatchControl<Item>> for Verifier {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow().expect("completed batch must send a value");
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;

if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
Expand All @@ -183,7 +169,7 @@ impl Service<BatchControl<Item>> for Verifier {
tracing::trace!(?result, "invalid ed25519 signature");
metrics::counter!("signatures.ed25519.invalid", 1);
}
result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}
Expand Down
62 changes: 25 additions & 37 deletions zebra-consensus/src/primitives/groth16.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;

use rayon::prelude::*;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};

Expand All @@ -34,6 +33,10 @@ use zebra_chain::{
sprout::{JoinSplit, Nullifier, RandomSeed},
};

use crate::BoxError;

use super::{spawn_fifo, spawn_fifo_and_convert};

mod params;
#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -74,7 +77,10 @@ pub type ItemVerifyingKey = PreparedVerifyingKey<Bls12>;
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static SPEND_VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Expand Down Expand Up @@ -113,7 +119,10 @@ pub static SPEND_VERIFIER: Lazy<
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static OUTPUT_VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Expand Down Expand Up @@ -417,43 +426,22 @@ impl Verifier {

/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(
batch: BatchVerifier,
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx)))
})
.map(|join_result| join_result.expect("panic in groth16 batch verifier"))
let _ = tx.send(
spawn_fifo(move || batch.verify(thread_rng(), vk))
.await
.ok(),
);
}

/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(
async fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(move |item| item.verify_single(pvk))
.collect()
})
.map(|join_result| join_result.expect("panic in groth16 fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single(pvk)).await
}
}

Expand All @@ -470,8 +458,8 @@ impl fmt::Debug for Verifier {

impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = VerificationError;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -492,7 +480,7 @@ impl Service<BatchControl<Item>> for Verifier {
let result = rx
.borrow()
.as_ref()
.expect("completed batch must send a value")
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?
.clone();

if result.is_ok() {
Expand All @@ -503,7 +491,7 @@ impl Service<BatchControl<Item>> for Verifier {
metrics::counter!("proofs.groth16.invalid", 1);
}

result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
Expand Down
57 changes: 21 additions & 36 deletions zebra-consensus/src/primitives/halo2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ use once_cell::sync::Lazy;
use orchard::circuit::VerifyingKey;
use rand::{thread_rng, CryptoRng, RngCore};

use rayon::prelude::*;
use thiserror::Error;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;

use crate::BoxError;

use super::{spawn_fifo, spawn_fifo_and_convert};

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -199,7 +202,10 @@ impl From<halo2::plonk::Error> for Halo2Error {
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<Batch<Verifier, Item>, ServiceFn<fn(Item) -> BoxFuture<'static, VerifyResult>>>,
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Expand Down Expand Up @@ -284,43 +290,22 @@ impl Verifier {

/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
fn flush_spawning(
batch: BatchVerifier,
vk: &'static BatchVerifyingKey,
tx: Sender,
) -> impl Future<Output = ()> {
async fn flush_spawning(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// TODO:
// - spawn batches so rayon executes them in FIFO order
// possible implementation: return a closure in a Future,
// then run it using scope_fifo() in the worker task,
// limiting the number of concurrent batches to the number of rayon threads
rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx)))
})
.map(|join_result| join_result.expect("panic in halo2 batch verifier"))
let _ = tx.send(
spawn_fifo(move || batch.verify(thread_rng(), vk).map_err(Halo2Error::from))
.await
.ok(),
);
}

/// Verify a single item using a thread pool, and return the result.
/// This function returns a future that becomes ready when the item is completed.
fn verify_single_spawning(
async fn verify_single_spawning(
item: Item,
pvk: &'static ItemVerifyingKey,
) -> impl Future<Output = VerifyResult> {
) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
tokio::task::spawn_blocking(move || {
// Rayon doesn't have a spawn function that returns a value,
// so we use a parallel iterator instead.
//
// TODO:
// - when a batch fails, spawn all its individual items into rayon using Vec::par_iter()
// - spawn fallback individual verifications so rayon executes them in FIFO order,
// if possible
rayon::iter::once(item)
.map(move |item| item.verify_single(pvk).map_err(Halo2Error::from))
.collect()
})
.map(|join_result| join_result.expect("panic in halo2 fallback verifier"))
spawn_fifo_and_convert(move || item.verify_single(pvk).map_err(Halo2Error::from)).await
}
}

Expand All @@ -337,8 +322,8 @@ impl fmt::Debug for Verifier {

impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = Halo2Error;
type Future = Pin<Box<dyn Future<Output = VerifyResult> + Send + 'static>>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -358,7 +343,7 @@ impl Service<BatchControl<Item>> for Verifier {
let result = rx
.borrow()
.as_ref()
.expect("completed batch must send a value")
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?
.clone();

if result.is_ok() {
Expand All @@ -369,7 +354,7 @@ impl Service<BatchControl<Item>> for Verifier {
metrics::counter!("proofs.halo2.invalid", 1);
}

result
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("verifier was dropped without flushing"),
}
Expand Down
Loading

0 comments on commit 5be9262

Please sign in to comment.