diff --git a/zebra-consensus/src/primitives.rs b/zebra-consensus/src/primitives.rs index 333ff1156f9..e3ab3a4f865 100644 --- a/zebra-consensus/src/primitives.rs +++ b/zebra-consensus/src/primitives.rs @@ -1,5 +1,9 @@ //! Asynchronous verification of cryptographic primitives. +use tokio::sync::oneshot::error::RecvError; + +use crate::BoxError; + pub mod ed25519; pub mod groth16; pub mod halo2; @@ -11,3 +15,37 @@ const MAX_BATCH_SIZE: usize = 64; /// The maximum latency bound for any of the batch verifiers. const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100); + +/// Fires off a task into the Rayon threadpool, awaits the result through a oneshot channel, +/// then converts the error to a [`BoxError`]. +pub async fn spawn_fifo_and_convert< + E: 'static + std::error::Error + Into + Sync + Send, + F: 'static + FnOnce() -> Result<(), E> + Send, +>( + f: F, +) -> Result<(), BoxError> { + spawn_fifo(f) + .await + .map_err(|_| { + "threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?" + })? + .map_err(BoxError::from) +} + +/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel. +pub async fn spawn_fifo< + E: 'static + std::error::Error + Sync + Send, + F: 'static + FnOnce() -> Result<(), E> + Send, +>( + f: F, +) -> Result, RecvError> { + // Rayon doesn't have a spawn function that returns a value, + // so we use a oneshot channel instead. + let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel(); + + rayon::spawn_fifo(move || { + let _ = rsp_tx.send(f()); + }); + + rsp_rx.await +} diff --git a/zebra-consensus/src/primitives/ed25519.rs b/zebra-consensus/src/primitives/ed25519.rs index 49bb6c4ac1d..7a17ac9e14a 100644 --- a/zebra-consensus/src/primitives/ed25519.rs +++ b/zebra-consensus/src/primitives/ed25519.rs @@ -11,13 +11,16 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch_control::{Batch, BatchControl}; use tower_fallback::Fallback; use zebra_chain::primitives::ed25519::{batch, *}; +use crate::BoxError; + +use super::{spawn_fifo, spawn_fifo_and_convert}; + #[cfg(test)] mod tests; @@ -43,7 +46,10 @@ pub type Item = batch::Item; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static VERIFIER: Lazy< - Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, + Fallback< + Batch, + ServiceFn BoxFuture<'static, Result<(), BoxError>>>, + >, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -120,43 +126,22 @@ impl Verifier { /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. - fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { + async fn flush_spawning(batch: BatchVerifier, tx: Sender) { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(|| { - // TODO: - // - spawn batches so rayon executes them in FIFO order - // possible implementation: return a closure in a Future, - // then run it using scope_fifo() in the worker task, - // limiting the number of concurrent batches to the number of rayon threads - rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx))) - }) - .map(|join_result| join_result.expect("panic in ed25519 batch verifier")) + let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok()); } /// Verify a single item using a thread pool, and return the result. - /// This function returns a future that becomes ready when the item is completed. - fn verify_single_spawning(item: Item) -> impl Future { + async fn verify_single_spawning(item: Item) -> Result<(), BoxError> { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(|| { - // Rayon doesn't have a spawn function that returns a value, - // so we use a parallel iterator instead. - // - // TODO: - // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() - // - spawn fallback individual verifications so rayon executes them in FIFO order, - // if possible - rayon::iter::once(item) - .map(|item| item.verify_single()) - .collect() - }) - .map(|join_result| join_result.expect("panic in ed25519 fallback verifier")) + spawn_fifo_and_convert(move || item.verify_single()).await } } impl Service> for Verifier { type Response = (); - type Error = Error; - type Future = Pin + Send + 'static>>; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -174,7 +159,8 @@ impl Service> for Verifier { Ok(()) => { // We use a new channel for each batch, // so we always get the correct batch result here. - let result = rx.borrow().expect("completed batch must send a value"); + let result = rx.borrow() + .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?; if result.is_ok() { tracing::trace!(?result, "validated ed25519 signature"); @@ -183,7 +169,7 @@ impl Service> for Verifier { tracing::trace!(?result, "invalid ed25519 signature"); metrics::counter!("signatures.ed25519.invalid", 1); } - result + result.map_err(BoxError::from) } Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"), } diff --git a/zebra-consensus/src/primitives/groth16.rs b/zebra-consensus/src/primitives/groth16.rs index 0013c048b15..e6d7ad17a35 100644 --- a/zebra-consensus/src/primitives/groth16.rs +++ b/zebra-consensus/src/primitives/groth16.rs @@ -18,7 +18,6 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; @@ -34,6 +33,10 @@ use zebra_chain::{ sprout::{JoinSplit, Nullifier, RandomSeed}, }; +use crate::BoxError; + +use super::{spawn_fifo, spawn_fifo_and_convert}; + mod params; #[cfg(test)] mod tests; @@ -74,7 +77,10 @@ pub type ItemVerifyingKey = PreparedVerifyingKey; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static SPEND_VERIFIER: Lazy< - Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, + Fallback< + Batch, + ServiceFn BoxFuture<'static, Result<(), BoxError>>>, + >, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -113,7 +119,10 @@ pub static SPEND_VERIFIER: Lazy< /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static OUTPUT_VERIFIER: Lazy< - Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, + Fallback< + Batch, + ServiceFn BoxFuture<'static, Result<(), BoxError>>>, + >, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -417,43 +426,22 @@ impl Verifier { /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. - fn flush_spawning( - batch: BatchVerifier, - vk: &'static BatchVerifyingKey, - tx: Sender, - ) -> impl Future { + async fn flush_spawning(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(move || { - // TODO: - // - spawn batches so rayon executes them in FIFO order - // possible implementation: return a closure in a Future, - // then run it using scope_fifo() in the worker task, - // limiting the number of concurrent batches to the number of rayon threads - rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx))) - }) - .map(|join_result| join_result.expect("panic in groth16 batch verifier")) + let _ = tx.send( + spawn_fifo(move || batch.verify(thread_rng(), vk)) + .await + .ok(), + ); } /// Verify a single item using a thread pool, and return the result. - /// This function returns a future that becomes ready when the item is completed. - fn verify_single_spawning( + async fn verify_single_spawning( item: Item, pvk: &'static ItemVerifyingKey, - ) -> impl Future { + ) -> Result<(), BoxError> { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(move || { - // Rayon doesn't have a spawn function that returns a value, - // so we use a parallel iterator instead. - // - // TODO: - // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() - // - spawn fallback individual verifications so rayon executes them in FIFO order, - // if possible - rayon::iter::once(item) - .map(move |item| item.verify_single(pvk)) - .collect() - }) - .map(|join_result| join_result.expect("panic in groth16 fallback verifier")) + spawn_fifo_and_convert(move || item.verify_single(pvk)).await } } @@ -470,8 +458,8 @@ impl fmt::Debug for Verifier { impl Service> for Verifier { type Response = (); - type Error = VerificationError; - type Future = Pin + Send + 'static>>; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -492,7 +480,7 @@ impl Service> for Verifier { let result = rx .borrow() .as_ref() - .expect("completed batch must send a value") + .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")? .clone(); if result.is_ok() { @@ -503,7 +491,7 @@ impl Service> for Verifier { metrics::counter!("proofs.groth16.invalid", 1); } - result + result.map_err(BoxError::from) } Err(_recv_error) => panic!("verifier was dropped without flushing"), } diff --git a/zebra-consensus/src/primitives/halo2.rs b/zebra-consensus/src/primitives/halo2.rs index b747b4b0cf0..e9cbc4262e6 100644 --- a/zebra-consensus/src/primitives/halo2.rs +++ b/zebra-consensus/src/primitives/halo2.rs @@ -13,13 +13,16 @@ use once_cell::sync::Lazy; use orchard::circuit::VerifyingKey; use rand::{thread_rng, CryptoRng, RngCore}; -use rayon::prelude::*; use thiserror::Error; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch_control::{Batch, BatchControl}; use tower_fallback::Fallback; +use crate::BoxError; + +use super::{spawn_fifo, spawn_fifo_and_convert}; + #[cfg(test)] mod tests; @@ -199,7 +202,10 @@ impl From for Halo2Error { /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static VERIFIER: Lazy< - Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, + Fallback< + Batch, + ServiceFn BoxFuture<'static, Result<(), BoxError>>>, + >, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -284,43 +290,22 @@ impl Verifier { /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. - fn flush_spawning( - batch: BatchVerifier, - vk: &'static BatchVerifyingKey, - tx: Sender, - ) -> impl Future { + async fn flush_spawning(batch: BatchVerifier, vk: &'static BatchVerifyingKey, tx: Sender) { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(move || { - // TODO: - // - spawn batches so rayon executes them in FIFO order - // possible implementation: return a closure in a Future, - // then run it using scope_fifo() in the worker task, - // limiting the number of concurrent batches to the number of rayon threads - rayon::scope_fifo(move |s| s.spawn_fifo(move |_s| Self::verify(batch, vk, tx))) - }) - .map(|join_result| join_result.expect("panic in halo2 batch verifier")) + let _ = tx.send( + spawn_fifo(move || batch.verify(thread_rng(), vk).map_err(Halo2Error::from)) + .await + .ok(), + ); } /// Verify a single item using a thread pool, and return the result. - /// This function returns a future that becomes ready when the item is completed. - fn verify_single_spawning( + async fn verify_single_spawning( item: Item, pvk: &'static ItemVerifyingKey, - ) -> impl Future { + ) -> Result<(), BoxError> { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(move || { - // Rayon doesn't have a spawn function that returns a value, - // so we use a parallel iterator instead. - // - // TODO: - // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() - // - spawn fallback individual verifications so rayon executes them in FIFO order, - // if possible - rayon::iter::once(item) - .map(move |item| item.verify_single(pvk).map_err(Halo2Error::from)) - .collect() - }) - .map(|join_result| join_result.expect("panic in halo2 fallback verifier")) + spawn_fifo_and_convert(move || item.verify_single(pvk).map_err(Halo2Error::from)).await } } @@ -337,8 +322,8 @@ impl fmt::Debug for Verifier { impl Service> for Verifier { type Response = (); - type Error = Halo2Error; - type Future = Pin + Send + 'static>>; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -358,7 +343,7 @@ impl Service> for Verifier { let result = rx .borrow() .as_ref() - .expect("completed batch must send a value") + .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")? .clone(); if result.is_ok() { @@ -369,7 +354,7 @@ impl Service> for Verifier { metrics::counter!("proofs.halo2.invalid", 1); } - result + result.map_err(BoxError::from) } Err(_recv_error) => panic!("verifier was dropped without flushing"), } diff --git a/zebra-consensus/src/primitives/redjubjub.rs b/zebra-consensus/src/primitives/redjubjub.rs index b7f65a2c176..94be0cdb5f8 100644 --- a/zebra-consensus/src/primitives/redjubjub.rs +++ b/zebra-consensus/src/primitives/redjubjub.rs @@ -11,7 +11,6 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch_control::{Batch, BatchControl}; @@ -19,6 +18,10 @@ use tower_fallback::Fallback; use zebra_chain::primitives::redjubjub::{batch, *}; +use crate::BoxError; + +use super::{spawn_fifo, spawn_fifo_and_convert}; + #[cfg(test)] mod tests; @@ -44,7 +47,10 @@ pub type Item = batch::Item; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static VERIFIER: Lazy< - Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, + Fallback< + Batch, + ServiceFn BoxFuture<'static, Result<(), BoxError>>>, + >, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -121,43 +127,22 @@ impl Verifier { /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. - fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { + async fn flush_spawning(batch: BatchVerifier, tx: Sender) { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(|| { - // TODO: - // - spawn batches so rayon executes them in FIFO order - // possible implementation: return a closure in a Future, - // then run it using scope_fifo() in the worker task, - // limiting the number of concurrent batches to the number of rayon threads - rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx))) - }) - .map(|join_result| join_result.expect("panic in redjubjub batch verifier")) + let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok()); } /// Verify a single item using a thread pool, and return the result. - /// This function returns a future that becomes ready when the item is completed. - fn verify_single_spawning(item: Item) -> impl Future { + async fn verify_single_spawning(item: Item) -> Result<(), BoxError> { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(|| { - // Rayon doesn't have a spawn function that returns a value, - // so we use a parallel iterator instead. - // - // TODO: - // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() - // - spawn fallback individual verifications so rayon executes them in FIFO order, - // if possible - rayon::iter::once(item) - .map(|item| item.verify_single()) - .collect() - }) - .map(|join_result| join_result.expect("panic in redjubjub fallback verifier")) + spawn_fifo_and_convert(move || item.verify_single()).await } } impl Service> for Verifier { type Response = (); - type Error = Error; - type Future = Pin + Send + 'static>>; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -175,7 +160,8 @@ impl Service> for Verifier { Ok(()) => { // We use a new channel for each batch, // so we always get the correct batch result here. - let result = rx.borrow().expect("completed batch must send a value"); + let result = rx.borrow() + .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?; if result.is_ok() { tracing::trace!(?result, "validated redjubjub signature"); @@ -185,7 +171,7 @@ impl Service> for Verifier { metrics::counter!("signatures.redjubjub.invalid", 1); } - result + result.map_err(BoxError::from) } Err(_recv_error) => panic!("verifier was dropped without flushing"), } diff --git a/zebra-consensus/src/primitives/redpallas.rs b/zebra-consensus/src/primitives/redpallas.rs index 77b6b08bc9d..5064fa817fb 100644 --- a/zebra-consensus/src/primitives/redpallas.rs +++ b/zebra-consensus/src/primitives/redpallas.rs @@ -11,7 +11,6 @@ use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rand::thread_rng; -use rayon::prelude::*; use tokio::sync::watch; use tower::{util::ServiceFn, Service}; use tower_batch_control::{Batch, BatchControl}; @@ -19,6 +18,10 @@ use tower_fallback::Fallback; use zebra_chain::primitives::reddsa::{batch, orchard, Error}; +use crate::BoxError; + +use super::{spawn_fifo, spawn_fifo_and_convert}; + #[cfg(test)] mod tests; @@ -44,7 +47,10 @@ pub type Item = batch::Item; /// you should call `.clone()` on the global handle to create a local, mutable /// handle. pub static VERIFIER: Lazy< - Fallback, ServiceFn BoxFuture<'static, VerifyResult>>>, + Fallback< + Batch, + ServiceFn BoxFuture<'static, Result<(), BoxError>>>, + >, > = Lazy::new(|| { Fallback::new( Batch::new( @@ -121,43 +127,22 @@ impl Verifier { /// Flush the batch using a thread pool, and return the result via the channel. /// This function returns a future that becomes ready when the batch is completed. - fn flush_spawning(batch: BatchVerifier, tx: Sender) -> impl Future { + async fn flush_spawning(batch: BatchVerifier, tx: Sender) { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(|| { - // TODO: - // - spawn batches so rayon executes them in FIFO order - // possible implementation: return a closure in a Future, - // then run it using scope_fifo() in the worker task, - // limiting the number of concurrent batches to the number of rayon threads - rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::verify(batch, tx))) - }) - .map(|join_result| join_result.expect("panic in ed25519 batch verifier")) + let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok()); } /// Verify a single item using a thread pool, and return the result. - /// This function returns a future that becomes ready when the item is completed. - fn verify_single_spawning(item: Item) -> impl Future { + async fn verify_single_spawning(item: Item) -> Result<(), BoxError> { // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - tokio::task::spawn_blocking(|| { - // Rayon doesn't have a spawn function that returns a value, - // so we use a parallel iterator instead. - // - // TODO: - // - when a batch fails, spawn all its individual items into rayon using Vec::par_iter() - // - spawn fallback individual verifications so rayon executes them in FIFO order, - // if possible - rayon::iter::once(item) - .map(|item| item.verify_single()) - .collect() - }) - .map(|join_result| join_result.expect("panic in redpallas fallback verifier")) + spawn_fifo_and_convert(move || item.verify_single()).await } } impl Service> for Verifier { type Response = (); - type Error = Error; - type Future = Pin + Send + 'static>>; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -174,7 +159,8 @@ impl Service> for Verifier { Ok(()) => { // We use a new channel for each batch, // so we always get the correct batch result here. - let result = rx.borrow().expect("completed batch must send a value"); + let result = rx.borrow() + .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?; if result.is_ok() { tracing::trace!(?result, "validated redpallas signature"); @@ -184,7 +170,7 @@ impl Service> for Verifier { metrics::counter!("signatures.redpallas.invalid", 1); } - result + result.map_err(BoxError::from) } Err(_recv_error) => panic!("verifier was dropped without flushing"), }