Skip to content

Commit

Permalink
Send success code for duplicate blocks on HTTP (sigp#4655)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes sigp#4473 (take 3)

## Proposed Changes

- Send a 202 status code by default for duplicate blocks, instead of 400. This conveys to the caller that the block was published, but makes no guarantees about its validity. Block relays can count this as a success or a failure as they wish.
- For users wanting finer-grained control over which status is returned for duplicates, a flag `--http-duplicate-block-status` can be used to adjust the behaviour. A 400 status can be supplied to restore the old (spec-compliant) behaviour, or a 200 status can be used to silence VCs that warn loudly for non-200 codes (e.g. Lighthouse prior to v4.4.0).
- Update the Lighthouse VC to gracefully handle success codes other than 200. The info message isn't the nicest thing to read, but it covers all bases and isn't a nasty `ERRO`/`CRIT` that will wake anyone up.

## Additional Info

I'm planning to raise a PR to `beacon-APIs` to specify that clients may return 202 for duplicate blocks. Really it would be nice to use some 2xx code that _isn't_ the same as the code for "published but invalid". I think unfortunately there aren't any suitable codes, and maybe the best fit is `409 CONFLICT`. Given that we need to fix this promptly for our release, I think using the 202 code temporarily with configuration strikes a nice compromise.
  • Loading branch information
michaelsproul authored and jxs committed Aug 28, 2023
1 parent 8b55fe7 commit 304acd9
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 181 deletions.
185 changes: 83 additions & 102 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ pub struct Config {
pub data_dir: PathBuf,
pub sse_capacity_multiplier: usize,
pub enable_beacon_processor: bool,
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
}

impl Default for Config {
Expand All @@ -154,6 +156,7 @@ impl Default for Config {
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
}
}
}
Expand Down Expand Up @@ -510,6 +513,8 @@ pub fn serve<T: BeaconChainTypes>(
let task_spawner_filter =
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));

let duplicate_block_status_code = ctx.config.duplicate_block_status_code;

/*
*
* Start of HTTP method definitions.
Expand Down Expand Up @@ -1284,11 +1289,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
Expand All @@ -1297,9 +1302,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1314,11 +1319,11 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBeaconBlock::<T::EthSpec>::from_ssz_bytes(&block_bytes, &chain.spec)
Expand All @@ -1334,9 +1339,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1352,12 +1357,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |validation_level: api_types::BroadcastValidationQuery,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
None,
Expand All @@ -1366,9 +1371,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1384,12 +1389,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block =
SignedBeaconBlock::<T::EthSpec>::from_ssz_bytes(&block_bytes, &chain.spec)
Expand All @@ -1405,9 +1410,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1427,21 +1432,21 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block: SignedBeaconBlock<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block: SignedBlindedBeaconBlock<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1457,13 +1462,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBeaconBlock::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
Expand All @@ -1476,9 +1481,9 @@ pub fn serve<T: BeaconChainTypes>(
&network_tx,
log,
BroadcastValidation::default(),
duplicate_block_status_code,
)
.await
.map(|()| warp::reply().into_response())
})
},
);
Expand All @@ -1494,87 +1499,63 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block: SignedBeaconBlock<T::EthSpec, BlindedPayload<_>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async(Priority::P0, async move {
match publish_blocks::publish_blinded_block(
move |validation_level: api_types::BroadcastValidationQuery,
block: SignedBlindedBeaconBlock<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
})
},
);

let post_beacon_blinded_blocks_v2_ssz =
eth_v2
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block =
match SignedBeaconBlock::<T::EthSpec, BlindedPayload<_>>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(_) => {
return warp::reply::with_status(
StatusCode::BAD_REQUEST,
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};
match publish_blocks::publish_blinded_block(
let post_beacon_blinded_blocks_v2_ssz = eth_v2
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_blocks::publish_blinded_block(
block,
chain,
&network_tx,
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
},
);
})
},
);

let block_id_or_err = warp::path::param::<BlockId>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
Expand Down
Loading

0 comments on commit 304acd9

Please sign in to comment.