Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge queue: embarking unstable (0f345c7) and #4575 together #5150

Closed
wants to merge 16 commits into from
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 27 additions & 27 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -726,7 +726,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -1257,7 +1257,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1327,7 +1327,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1404,7 +1404,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1472,7 +1472,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1754,7 +1754,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -1930,7 +1930,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -1988,7 +1988,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("proposer_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -2046,7 +2046,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("voluntary_exits"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -2102,7 +2102,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("sync_committees"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -2139,7 +2139,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("bls_to_execution_changes"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -2533,7 +2533,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attestations"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -2583,7 +2583,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("sync_committee"))
.and(block_id_or_err)
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -3326,7 +3326,7 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand All @@ -3352,7 +3352,7 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -3406,7 +3406,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -3519,7 +3519,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter)
.and(log_filter.clone())
.then(
Expand All @@ -3545,7 +3545,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("beacon_committee_subscriptions"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down Expand Up @@ -3601,7 +3601,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -3652,7 +3652,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -3826,7 +3826,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("sync_committee_subscriptions"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down Expand Up @@ -3872,7 +3872,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("liveness"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -3913,7 +3913,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_liveness = warp::path("lighthouse")
.and(warp::path("liveness"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -4016,7 +4016,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui"))
.and(warp::path("validator_metrics"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand All @@ -4035,7 +4035,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui"))
.and(warp::path("validator_info"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -4338,7 +4338,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
.and(warp::path("block_rewards"))
.and(warp::body::json())
.and(warp_utils::json::json())
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down
23 changes: 22 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,30 @@ impl BeaconNodeHttpClient {
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let response = builder.json(body).send().await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
/// Does not include Content-Type application/json in the request header.
async fn post_generic_json_without_content_type_header<T: Serialize, U: IntoUrl>(
&self,
url: U,
body: &T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?;

let response = builder.body(serialized_body).send().await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self,
Expand Down Expand Up @@ -1250,7 +1270,8 @@ impl BeaconNodeHttpClient {
.push("pool")
.push("attester_slashings");

self.post(path, slashing).await?;
self.post_generic_json_without_content_type_header(path, slashing, None)
.await?;

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions common/warp_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ beacon_chain = { workspace = true }
state_processing = { workspace = true }
safe_arith = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
headers = "0.3.2"
lighthouse_metrics = { workspace = true }
lazy_static = { workspace = true }
serde_array_query = "0.1.0"
bytes = { workspace = true }
22 changes: 22 additions & 0 deletions common/warp_utils/src/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use bytes::Bytes;
use serde::de::DeserializeOwned;
use std::error::Error as StdError;
use warp::{Filter, Rejection};

use crate::reject;

struct Json;

type BoxError = Box<dyn StdError + Send + Sync>;

impl Json {
fn decode<T: DeserializeOwned>(bytes: Bytes) -> Result<T, BoxError> {
serde_json::from_slice(&bytes).map_err(Into::into)
}
}

pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
warp::body::bytes().and_then(|bytes: Bytes| async move {
Json::decode(bytes).map_err(|err| reject::custom_deserialize_error(format!("{:?}", err)))
})
}
1 change: 1 addition & 0 deletions common/warp_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Lighthouse project. E.g., the `http_api` and `http_metrics` crates.

pub mod cors;
pub mod json;
pub mod metrics;
pub mod query;
pub mod reject;
Expand Down
12 changes: 12 additions & 0 deletions common/warp_utils/src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ pub fn custom_bad_request(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomBadRequest(msg))
}

#[derive(Debug)]
pub struct CustomDeserializeError(pub String);

impl Reject for CustomDeserializeError {}

pub fn custom_deserialize_error(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomDeserializeError(msg))
}

#[derive(Debug)]
pub struct CustomServerError(pub String);

Expand Down Expand Up @@ -161,6 +170,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "NOT_FOUND".to_string();
} else if let Some(e) = err.find::<crate::reject::CustomDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e.0);
code = StatusCode::BAD_REQUEST;
} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e);
code = StatusCode::BAD_REQUEST;
Expand Down
Loading