Skip to content

Commit

Permalink
Use Result when handling web socket message.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Sep 20, 2023
1 parent dfbfa2b commit 1639b9a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 75 deletions.
21 changes: 5 additions & 16 deletions src/api/v2/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
convert::Infallible,
sync::{Arc, Mutex},
};
use tracing::{error, info};
use tracing::error;
use uuid::Uuid;
use warp::{ws::Ws, Rejection, Reply};

Expand All @@ -40,7 +40,7 @@ pub async fn submit(

submitter.submit(transaction).await.map_err(|error| {
error!(%error, "Submit transaction failed");
Error::internal_server_error()
Error::internal_server_error(error)
})
}

Expand Down Expand Up @@ -73,20 +73,9 @@ pub async fn ws(
}))
}

pub async fn status(
config: RuntimeConfig,
node: Node,
state: Arc<Mutex<State>>,
) -> Result<impl Reply, impl Reply> {
let state = match state.lock() {
Ok(state) => state,
Err(error) => {
info!("Cannot acquire lock for last_block: {error}");
return Err(Error::internal_server_error());
},
};

Ok(Status::new(&config, &node, &state))
pub fn status(config: RuntimeConfig, node: Node, state: Arc<Mutex<State>>) -> impl Reply {
let state = state.lock().expect("Lock should be acquired");
Status::new(&config, &node, &state)
}

pub async fn handle_rejection(error: Rejection) -> Result<impl Reply, Rejection> {
Expand Down
9 changes: 3 additions & 6 deletions src/api/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ fn status_route(
.and(warp::any().map(move || config.clone()))
.and(warp::any().map(move || node.clone()))
.and(warp::any().map(move || state.clone()))
.then(handlers::status)
.map(types::handle_result)
.map(handlers::status)
}

fn submit_route(
Expand Down Expand Up @@ -135,7 +134,7 @@ mod tests {
use crate::{
api::v2::types::{
Clients, DataFields, ErrorCode, SubmitResponse, Subscription, SubscriptionId, Topics,
Version, WsResponse,
Version, WsError, WsResponse,
},
rpc::Node,
types::{RuntimeConfig, State},
Expand Down Expand Up @@ -472,9 +471,7 @@ mod tests {
let expected_request_id = expected_request_id.map(to_uuid);
let mut test = MockSetup::new(RuntimeConfig::default(), submitter).await;
let response = test.ws_send_text(request).await;
let WsResponse::Error(error) = serde_json::from_str(&response).unwrap() else {
panic!("Invalid response");
};
let WsError::Error(error) = serde_json::from_str(&response).unwrap();
assert_eq!(error.error_code, ErrorCode::BadRequest);
assert_eq!(error.request_id, expected_request_id);
assert!(error.message.contains(expected));
Expand Down
26 changes: 20 additions & 6 deletions src/api/v2/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,37 +293,46 @@ pub enum ErrorCode {
pub struct Error {
#[serde(skip_serializing_if = "Option::is_none")]
pub request_id: Option<Uuid>,
#[serde(skip)]
pub cause: Option<anyhow::Error>,
pub error_code: ErrorCode,
pub message: String,
}

impl Error {
fn new(request_id: Option<Uuid>, error_code: ErrorCode, message: &str) -> Self {
fn new(
request_id: Option<Uuid>,
cause: Option<anyhow::Error>,
error_code: ErrorCode,
message: &str,
) -> Self {
Error {
request_id,
cause,
error_code,
message: message.to_string(),
}
}

pub fn not_found() -> Self {
Self::new(None, ErrorCode::NotFound, "Not Found")
Self::new(None, None, ErrorCode::NotFound, "Not Found")
}

pub fn internal_server_error() -> Self {
pub fn internal_server_error(cause: anyhow::Error) -> Self {
Self::new(
None,
Some(cause),
ErrorCode::InternalServerError,
"Internal Server Error",
)
}

pub fn bad_request_unknown(message: &str) -> Self {
Self::new(None, ErrorCode::BadRequest, message)
Self::new(None, None, ErrorCode::BadRequest, message)
}

pub fn bad_request(request_id: Uuid, message: &str) -> Self {
Self::new(Some(request_id), ErrorCode::BadRequest, message)
Self::new(Some(request_id), None, ErrorCode::BadRequest, message)
}

fn status(&self) -> StatusCode {
Expand Down Expand Up @@ -361,8 +370,13 @@ pub fn handle_result(result: Result<impl Reply, impl Reply>) -> impl Reply {
#[derive(Serialize, Deserialize, From)]
#[serde(tag = "topic", rename_all = "kebab-case")]
pub enum WsResponse {
Error(Error),
Version(Response<Version>),
Status(Response<Status>),
DataTransactionSubmitted(Response<SubmitResponse>),
}

#[derive(Serialize, Deserialize, From)]
#[serde(tag = "topic", rename_all = "kebab-case")]
pub enum WsError {
Error(Error),
}
90 changes: 43 additions & 47 deletions src/api/v2/ws.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use super::{
transactions,
types::{Clients, Payload, Request, Response, Status, Transaction, Version, WsResponse},
types::{
Clients, Payload, Request, Response, Status, Transaction, Version, WsError, WsResponse,
},
};
use crate::{
api::v2::types::Error,
api::v2::types::{Error, Sender},
rpc::Node,
types::{RuntimeConfig, State},
};
use anyhow::Context;
use futures::{FutureExt, StreamExt};
use serde::Serialize;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{error, info, warn};
use tracing::{error, info};
use warp::ws::{self, Message, WebSocket};

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -43,6 +46,16 @@ pub async fn connect(
}
}));

fn send<T: Serialize>(sender: Sender, message: T) -> anyhow::Result<()> {
let ws_message = serde_json::to_string(&message)
.map(ws::Message::text)
.context("Failed to serialize message")?;

sender
.send(Ok(ws_message))
.context("Failed to send message")
}

while let Some(result) = web_socket_receiver.next().await {
let message = match result {
Err(error) => {
Expand All @@ -53,58 +66,38 @@ pub async fn connect(
Ok(message) => message,
};

let request = match Request::try_from(message) {
Err(error) => {
warn!("Error handling web socket message: {error}");
let error =
Error::bad_request_unknown(&format!("Failed to parse request: {error}"));
if let Err(error) = send_response(sender.clone(), error.into()) {
error!("Failed to send error: {error}");
}
continue;
},
Ok(request) => request,
};

let submitter = submitter.clone();
let state = state.clone();

let response = handle_request(request, &version, &config, &node, submitter, state)
.await
.unwrap_or_else(|error| {
error!("Error handling web socket message: {error}");
Error::internal_server_error().into()
});
let send_result =
match handle_request(message, &version, &config, &node, submitter, state).await {
Ok(response) => send(sender.clone(), response),
Err(error) => {
if let Some(cause) = error.cause.as_ref() {
error!("Failed to handle request: {cause}");
};
send::<WsError>(sender.clone(), error.into())
},
};

if let Err(error) = send_response(sender.clone(), response) {
error!("Failed to send response: {error}");
if let Err(error) = send_result {
error!("Error sending message: {error}");
}
}
}

fn send_response(
sender: UnboundedSender<Result<Message, warp::Error>>,
response: WsResponse,
) -> anyhow::Result<()> {
let message = serde_json::to_string(&response)
.map(ws::Message::text)
.map(Ok)
.context("Failed to serialize response message")?;

sender
.send(message)
.context("Failed to send response message")?;
Ok(())
}

async fn handle_request(
request: Request,
message: Message,
version: &Version,
config: &RuntimeConfig,
node: &Node,
submitter: Option<Arc<impl transactions::Submit>>,
state: Arc<Mutex<State>>,
) -> anyhow::Result<WsResponse> {
) -> Result<WsResponse, Error> {
let request = Request::try_from(message).map_err(|error| {
Error::bad_request_unknown(&format!("Failed to parse request: {error}"))
})?;

let request_id = request.request_id;
match request.payload {
Payload::Version => Ok(Response::new(request_id, version.clone()).into()),
Expand All @@ -115,17 +108,20 @@ async fn handle_request(
},
Payload::Submit(transaction) => {
let Some(submitter) = submitter else {
return Ok(Error::bad_request(request_id, "Submit is not configured.").into());
return Err(Error::bad_request(request_id, "Submit is not configured."));
};
if transaction.is_empty() {
return Ok(Error::bad_request(request_id, "Transaction is empty.").into());
return Err(Error::bad_request(request_id, "Transaction is empty."));
}
if matches!(transaction, Transaction::Data(_)) && !submitter.has_signer() {
return Ok(Error::bad_request(request_id, "Signer is not configured.").into());
return Err(Error::bad_request(request_id, "Signer is not configured."));
};

let submit_response = submitter.submit(transaction).await?;
Ok(Response::new(request_id, submit_response).into())
submitter
.submit(transaction)
.await
.map(|response| Response::new(request_id, response).into())
.map_err(Error::internal_server_error)
},
}
}

0 comments on commit 1639b9a

Please sign in to comment.