Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add TaskSpawner to spawn validation requests as blocking #12543

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,7 @@ where
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),
Box::new(self.executor.clone()),
)
}
}
Expand Down Expand Up @@ -1416,6 +1417,7 @@ where
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),
Box::new(self.executor.clone()),
)
.into_rpc()
.into(),
Expand Down
144 changes: 89 additions & 55 deletions crates/rpc/rpc/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use alloy_rpc_types_beacon::relay::{
BuilderBlockValidationRequestV3, BuilderBlockValidationRequestV4,
};
use alloy_rpc_types_engine::{
BlobsBundleV1, CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar,
BlobsBundleV1, CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, PayloadError,
};
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_errors::{BlockExecutionError, ConsensusError, ProviderError, RethError};
use reth_errors::{BlockExecutionError, ConsensusError, ProviderError};
use reth_ethereum_consensus::GAS_LIMIT_BOUND_DIVISOR;
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_validator::ExecutionPayloadValidator;
Expand All @@ -22,16 +22,16 @@ use reth_provider::{
};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase};
use reth_rpc_api::BlockSubmissionValidationApiServer;
use reth_rpc_eth_types::EthApiError;
use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult};
use reth_rpc_server_types::result::internal_rpc_err;
use reth_tasks::TaskSpawner;
use reth_trie::HashedPostState;
use revm_primitives::{Address, B256, U256};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
use tokio::sync::{oneshot, RwLock};

/// The type that implements the `validation` rpc namespace trait
#[derive(Debug, derive_more::Deref)]
#[derive(Clone, Debug, derive_more::Deref)]
pub struct ValidationApi<Provider: ChainSpecProvider, E> {
#[deref]
inner: Arc<ValidationApiInner<Provider, E>>,
Expand All @@ -47,6 +47,7 @@ where
consensus: Arc<dyn Consensus>,
executor_provider: E,
config: ValidationApiConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let ValidationApiConfig { disallow } = config;

Expand All @@ -58,6 +59,7 @@ where
executor_provider,
disallow,
cached_state: Default::default(),
task_spawner,
});

Self { inner }
Expand Down Expand Up @@ -338,71 +340,37 @@ where

Ok(versioned_hashes)
}
}

#[async_trait]
impl<Provider, E> BlockSubmissionValidationApiServer for ValidationApi<Provider, E>
where
Provider: BlockReaderIdExt
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ HeaderProvider
+ AccountReader
+ WithdrawalsProvider
+ Clone
+ 'static,
E: BlockExecutorProvider,
{
async fn validate_builder_submission_v1(
&self,
_request: BuilderBlockValidationRequest,
) -> RpcResult<()> {
Err(internal_rpc_err("unimplemented"))
}

async fn validate_builder_submission_v2(
&self,
_request: BuilderBlockValidationRequestV2,
) -> RpcResult<()> {
Err(internal_rpc_err("unimplemented"))
}

/// Validates a block submitted to the relay
/// Core logic for validating the builder submission v3
async fn validate_builder_submission_v3(
&self,
request: BuilderBlockValidationRequestV3,
) -> RpcResult<()> {
) -> Result<(), ValidationApiError> {
let block = self
.payload_validator
.ensure_well_formed_payload(
ExecutionPayload::V3(request.request.execution_payload),
ExecutionPayloadSidecar::v3(CancunPayloadFields {
parent_beacon_block_root: request.parent_beacon_block_root,
versioned_hashes: self
.validate_blobs_bundle(request.request.blobs_bundle)
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()?,
versioned_hashes: self.validate_blobs_bundle(request.request.blobs_bundle)?,
}),
)
.to_rpc_result()?
)?
.try_seal_with_senders()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;
.map_err(|_| ValidationApiError::InvalidTransactionSignature)?;

self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()
}

/// Validates a block submitted to the relay
/// Core logic for validating the builder submission v4
async fn validate_builder_submission_v4(
&self,
request: BuilderBlockValidationRequestV4,
) -> RpcResult<()> {
) -> Result<(), ValidationApiError> {
let block = self
.payload_validator
.ensure_well_formed_payload(
Expand All @@ -411,25 +379,84 @@ where
CancunPayloadFields {
parent_beacon_block_root: request.parent_beacon_block_root,
versioned_hashes: self
.validate_blobs_bundle(request.request.blobs_bundle)
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()?,
.validate_blobs_bundle(request.request.blobs_bundle)?,
},
request.request.execution_requests.into(),
),
)
.to_rpc_result()?
)?
.try_seal_with_senders()
.map_err(|_| EthApiError::InvalidTransactionSignature)?;
.map_err(|_| ValidationApiError::InvalidTransactionSignature)?;

self.validate_message_against_block(
block,
request.request.message,
request.registered_gas_limit,
)
.await
.map_err(|e| RethError::Other(e.into()))
.to_rpc_result()
}
}

#[async_trait]
impl<Provider, E> BlockSubmissionValidationApiServer for ValidationApi<Provider, E>
where
Provider: BlockReaderIdExt
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ HeaderProvider
+ AccountReader
+ WithdrawalsProvider
+ Clone
+ 'static,
E: BlockExecutorProvider,
{
async fn validate_builder_submission_v1(
&self,
_request: BuilderBlockValidationRequest,
) -> RpcResult<()> {
Err(internal_rpc_err("unimplemented"))
}

async fn validate_builder_submission_v2(
&self,
_request: BuilderBlockValidationRequestV2,
) -> RpcResult<()> {
Err(internal_rpc_err("unimplemented"))
}

/// Validates a block submitted to the relay
async fn validate_builder_submission_v3(
&self,
request: BuilderBlockValidationRequestV3,
) -> RpcResult<()> {
let this = self.clone();
let (tx, rx) = oneshot::channel();

self.task_spawner.spawn_blocking(Box::pin(async move {
let result = Self::validate_builder_submission_v3(&this, request)
.await
.map_err(|err| internal_rpc_err(err.to_string()));
let _ = tx.send(result);
}));

rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))?
}

/// Validates a block submitted to the relay
async fn validate_builder_submission_v4(
&self,
request: BuilderBlockValidationRequestV4,
) -> RpcResult<()> {
let this = self.clone();
let (tx, rx) = oneshot::channel();

self.task_spawner.spawn_blocking(Box::pin(async move {
let result = Self::validate_builder_submission_v4(&this, request)
.await
.map_err(|err| internal_rpc_err(err.to_string()));
let _ = tx.send(result);
}));

rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))?
}
}

Expand All @@ -450,6 +477,8 @@ pub struct ValidationApiInner<Provider: ChainSpecProvider, E> {
/// latest head block state. Uses async `RwLock` to safely handle concurrent validation
/// requests.
cached_state: RwLock<(B256, CachedReads)>,
/// Task spawner for blocking operations
task_spawner: Box<dyn TaskSpawner>,
}

/// Configuration for validation API.
Expand All @@ -476,6 +505,9 @@ pub enum ValidationApiError {
ProposerPayment,
#[error("invalid blobs bundle")]
InvalidBlobsBundle,
/// When the transaction signature is invalid
#[error("invalid transaction signature")]
InvalidTransactionSignature,
#[error("block accesses blacklisted address: {_0}")]
Blacklist(Address),
#[error(transparent)]
Expand All @@ -486,4 +518,6 @@ pub enum ValidationApiError {
Provider(#[from] ProviderError),
#[error(transparent)]
Execution(#[from] BlockExecutionError),
#[error(transparent)]
Payload(#[from] PayloadError),
}
Loading