Skip to content

Commit

Permalink
Assume Content-Type is json for endpoints that require json (#4575)
Browse files Browse the repository at this point in the history
* added default content type filter

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into unstable

* create custom warp json filter that ignores content type header

* cargo fmt and linting

* updated test

* updated test

* merge unstable

* merge conflicts

* workspace=true

* use Bytes instead of Buf

* resolve merge conflict

* resolve merge conflicts

* add extra error message context

* merge conflicts

* lint
  • Loading branch information
eserilev authored Jan 31, 2024
1 parent b8db3e4 commit 1d87edb
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 28 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 27 additions & 27 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -726,7 +726,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -1257,7 +1257,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1327,7 +1327,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1404,7 +1404,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blinded_blocks"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1472,7 +1472,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -1754,7 +1754,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -1930,7 +1930,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -1988,7 +1988,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("proposer_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -2046,7 +2046,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("voluntary_exits"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -2102,7 +2102,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("sync_committees"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -2139,7 +2139,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("bls_to_execution_changes"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -2533,7 +2533,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attestations"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -2583,7 +2583,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("sync_committee"))
.and(block_id_or_err)
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
Expand Down Expand Up @@ -3326,7 +3326,7 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand All @@ -3352,7 +3352,7 @@ pub fn serve<T: BeaconChainTypes>(
}))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -3406,7 +3406,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
Expand Down Expand Up @@ -3519,7 +3519,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(network_tx_filter)
.and(log_filter.clone())
.then(
Expand All @@ -3545,7 +3545,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("beacon_committee_subscriptions"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter.clone())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down Expand Up @@ -3601,7 +3601,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -3652,7 +3652,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and(warp_utils::json::json())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down Expand Up @@ -3826,7 +3826,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("sync_committee_subscriptions"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(validator_subscription_tx_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down Expand Up @@ -3872,7 +3872,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("liveness"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -3913,7 +3913,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_liveness = warp::path("lighthouse")
.and(warp::path("liveness"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -4016,7 +4016,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui"))
.and(warp::path("validator_metrics"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand All @@ -4035,7 +4035,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui"))
.and(warp::path("validator_info"))
.and(warp::path::end())
.and(warp::body::json())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
Expand Down Expand Up @@ -4338,7 +4338,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
.and(warp::path("block_rewards"))
.and(warp::body::json())
.and(warp_utils::json::json())
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
Expand Down
23 changes: 22 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,30 @@ impl BeaconNodeHttpClient {
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let response = builder.json(body).send().await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
/// Does not include Content-Type application/json in the request header.
async fn post_generic_json_without_content_type_header<T: Serialize, U: IntoUrl>(
&self,
url: U,
body: &T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}

let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?;

let response = builder.body(serialized_body).send().await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self,
Expand Down Expand Up @@ -1250,7 +1270,8 @@ impl BeaconNodeHttpClient {
.push("pool")
.push("attester_slashings");

self.post(path, slashing).await?;
self.post_generic_json_without_content_type_header(path, slashing, None)
.await?;

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions common/warp_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ beacon_chain = { workspace = true }
state_processing = { workspace = true }
safe_arith = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
headers = "0.3.2"
lighthouse_metrics = { workspace = true }
lazy_static = { workspace = true }
serde_array_query = "0.1.0"
bytes = { workspace = true }
22 changes: 22 additions & 0 deletions common/warp_utils/src/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use bytes::Bytes;
use serde::de::DeserializeOwned;
use std::error::Error as StdError;
use warp::{Filter, Rejection};

use crate::reject;

struct Json;

type BoxError = Box<dyn StdError + Send + Sync>;

impl Json {
fn decode<T: DeserializeOwned>(bytes: Bytes) -> Result<T, BoxError> {
serde_json::from_slice(&bytes).map_err(Into::into)
}
}

pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
warp::body::bytes().and_then(|bytes: Bytes| async move {
Json::decode(bytes).map_err(|err| reject::custom_deserialize_error(format!("{:?}", err)))
})
}
1 change: 1 addition & 0 deletions common/warp_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Lighthouse project. E.g., the `http_api` and `http_metrics` crates.

pub mod cors;
pub mod json;
pub mod metrics;
pub mod query;
pub mod reject;
Expand Down
12 changes: 12 additions & 0 deletions common/warp_utils/src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ pub fn custom_bad_request(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomBadRequest(msg))
}

#[derive(Debug)]
pub struct CustomDeserializeError(pub String);

impl Reject for CustomDeserializeError {}

pub fn custom_deserialize_error(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomDeserializeError(msg))
}

#[derive(Debug)]
pub struct CustomServerError(pub String);

Expand Down Expand Up @@ -161,6 +170,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "NOT_FOUND".to_string();
} else if let Some(e) = err.find::<crate::reject::CustomDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e.0);
code = StatusCode::BAD_REQUEST;
} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e);
code = StatusCode::BAD_REQUEST;
Expand Down

0 comments on commit 1d87edb

Please sign in to comment.