Skip to content

Commit

Permalink
feat: add taskspawner
Browse files Browse the repository at this point in the history
  • Loading branch information
ftupas committed Nov 14, 2024
1 parent ff6b78a commit ac8c19b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
2 changes: 2 additions & 0 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,7 @@ where
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),
Box::new(self.executor.clone()),
)
}
}
Expand Down Expand Up @@ -1416,6 +1417,7 @@ where
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),
Box::new(self.executor.clone()),
)
.into_rpc()
.into(),
Expand Down
55 changes: 37 additions & 18 deletions crates/rpc/rpc/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use reth_revm::{cached::CachedReads, database::StateProviderDatabase};
use reth_rpc_api::BlockSubmissionValidationApiServer;
use reth_rpc_eth_types::EthApiError;
use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult};
use reth_tasks::TaskSpawner;
use reth_trie::HashedPostState;
use revm_primitives::{Address, B256, U256};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
use tokio::sync::{oneshot, RwLock};

/// The type that implements the `validation` rpc namespace trait
#[derive(Debug, derive_more::Deref)]
#[derive(Clone, Debug, derive_more::Deref)]
pub struct ValidationApi<Provider: ChainSpecProvider, E> {
#[deref]
inner: Arc<ValidationApiInner<Provider, E>>,
Expand All @@ -47,6 +48,7 @@ where
consensus: Arc<dyn Consensus>,
executor_provider: E,
config: ValidationApiConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let ValidationApiConfig { disallow } = config;

Expand All @@ -58,6 +60,7 @@ where
executor_provider,
disallow,
cached_state: Default::default(),
task_spawner,
});

Self { inner }
Expand Down Expand Up @@ -388,14 +391,21 @@ where
.try_seal_with_senders()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;

self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()
let (tx, rx) = oneshot::channel();
let this = self.clone();

self.task_spawner.spawn_blocking(Box::pin(async move {
let result = this
.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await;
let _ = tx.send(result.map_err(|e| RethError::Other(e.into())).to_rpc_result());
}));

rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))?
}

/// Validates a block submitted to the relay
Expand All @@ -422,14 +432,21 @@ where
.try_seal_with_senders()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;

self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()
let (tx, rx) = oneshot::channel();
let this = self.clone();

self.task_spawner.spawn_blocking(Box::pin(async move {
let result = this
.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await;
let _ = tx.send(result.map_err(|e| RethError::Other(e.into())).to_rpc_result());
}));

rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))?
}
}

Expand All @@ -450,6 +467,8 @@ pub struct ValidationApiInner<Provider: ChainSpecProvider, E> {
/// latest head block state. Uses async `RwLock` to safely handle concurrent validation
/// requests.
cached_state: RwLock<(B256, CachedReads)>,
/// Task spawner for blocking operations
task_spawner: Box<dyn TaskSpawner>,
}

/// Configuration for validation API.
Expand Down

0 comments on commit ac8c19b

Please sign in to comment.