Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assume Content-Type is json for endpoints that require json #4575

Merged
merged 15 commits into from
Jan 31, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 27 additions & 27 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -726,7 +726,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -1257,7 +1257,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1327,7 +1327,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1404,7 +1404,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1472,7 +1472,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1754,7 +1754,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -1930,7 +1930,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -1988,7 +1988,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("proposer_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -2046,7 +2046,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("voluntary_exits"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -2102,7 +2102,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("sync_committees"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -2139,7 +2139,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("bls_to_execution_changes"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -2533,7 +2533,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attestations"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -2583,7 +2583,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("sync_committee"))
.and(block_id_or_err)
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -3326,7 +3326,7 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand All @@ -3352,7 +3352,7 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -3406,7 +3406,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -3519,7 +3519,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter)
.and(log_filter.clone())
.then(
Expand All @@ -3545,7 +3545,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("beacon_committee_subscriptions"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down Expand Up @@ -3601,7 +3601,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -3652,7 +3652,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -3826,7 +3826,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("sync_committee_subscriptions"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down Expand Up @@ -3872,7 +3872,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("liveness"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -3913,7 +3913,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_liveness = warp::path("lighthouse")
.and(warp::path("liveness"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -4016,7 +4016,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui"))
.and(warp::path("validator_metrics"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand All @@ -4035,7 +4035,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui"))
.and(warp::path("validator_info"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -4368,7 +4368,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
.and(warp::path("block_rewards"))
.and(warp::body::json())
.and(warp_utils::json::json())
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down
23 changes: 22 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,30 @@ impl BeaconNodeHttpClient {
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let response = builder.json(body).send().await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
/// Does not include Content-Type application/json in the request header.
async fn post_generic_json_without_content_type_header<T: Serialize, U: IntoUrl>(
&self,
url: U,
body: &T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?;

let response = builder.body(serialized_body).send().await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self,
Expand Down Expand Up @@ -1250,7 +1270,8 @@ impl BeaconNodeHttpClient {
.push("pool")
.push("attester_slashings");

self.post(path, slashing).await?;
self.post_generic_json_without_content_type_header(path, slashing, None)
.await?;

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions common/warp_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ beacon_chain = { workspace = true }
state_processing = { workspace = true }
safe_arith = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
headers = "0.3.2"
lighthouse_metrics = { workspace = true }
lazy_static = { workspace = true }
serde_array_query = "0.1.0"
bytes = { workspace = true }
22 changes: 22 additions & 0 deletions common/warp_utils/src/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use bytes::Bytes;
use serde::de::DeserializeOwned;
use std::error::Error as StdError;
use warp::{Filter, Rejection};

use crate::reject;

struct Json;

type BoxError = Box<dyn StdError + Send + Sync>;

impl Json {
fn decode<T: DeserializeOwned>(bytes: Bytes) -> Result<T, BoxError> {
serde_json::from_slice(&bytes).map_err(Into::into)
}
}

pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
warp::body::bytes().and_then(|bytes: Bytes| async move {
Json::decode(bytes).map_err(|err| reject::custom_deserialize_error(format!("{:?}", err)))
})
}
Copy link
Collaborator

@dapplion dapplion Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diff from warp::filters::body::json

pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
-    is_content_type::<Json>()
        .and(bytes())
        .and_then(|buf| async move {
-           Json::decode(buf).map_err(|err| {
-               tracing::debug!("request json body error: {}", err);
-               reject::known(BodyDeserializeError { cause: err })
-           })
+           Json::decode(bytes).map_err(|err| reject::custom_bad_request(format!("{:?}", err)))
        })
}
impl Decode for Json {
-    fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
+    fn decode<T: DeserializeOwned>(bytes: Bytes) -> Result<T, BoxError> {
-        serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
+        serde_json::from_slice(&bytes).map_err(Into::into)
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Equivalent code with the suggestion to remove the copy from #4575 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eserilev consider appending some extra message context like

} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e);

1 change: 1 addition & 0 deletions common/warp_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Lighthouse project. E.g., the `http_api` and `http_metrics` crates.

pub mod cors;
pub mod json;
pub mod metrics;
pub mod query;
pub mod reject;
Expand Down
12 changes: 12 additions & 0 deletions common/warp_utils/src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ pub fn custom_bad_request(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomBadRequest(msg))
}

#[derive(Debug)]
pub struct CustomDeserializeError(pub String);

impl Reject for CustomDeserializeError {}

pub fn custom_deserialize_error(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomDeserializeError(msg))
}

#[derive(Debug)]
pub struct CustomServerError(pub String);

Expand Down Expand Up @@ -161,6 +170,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "NOT_FOUND".to_string();
} else if let Some(e) = err.find::<crate::reject::CustomDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e.0);
code = StatusCode::BAD_REQUEST;
} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e);
code = StatusCode::BAD_REQUEST;
Expand Down
Loading