Skip to content

Commit

Permalink
style & code suggestions by onur 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mariocynicys committed Oct 12, 2024
1 parent d2e3734 commit 547fc2b
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 46 deletions.
1 change: 1 addition & 0 deletions mm2src/coins/eth/fee_estimation/eth_fee_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct EthFeeEventStreamer {
}

impl EthFeeEventStreamer {
#[inline(always)]
pub fn new(config: EthFeeStreamingConfig, coin: EthCoin) -> Self { Self { config, coin } }
}

Expand Down
15 changes: 6 additions & 9 deletions mm2src/coins/eth/fee_estimation/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::ser::FeePerGasEstimated;
use crate::{lp_coinfind, MmCoinEnum};
use common::HttpStatusCode;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::mm_error::MmResult;
use mm2_err_handle::mm_error::{MmError, MmResult};

use http::StatusCode;
use std::convert::TryFrom;
Expand Down Expand Up @@ -36,13 +36,8 @@ pub async fn get_eth_estimated_fee_per_gas(
ctx: MmArc,
req: GetFeeEstimationRequest,
) -> MmResult<FeePerGasEstimated, GetFeeEstimationRequestError> {
let coin = lp_coinfind(&ctx, &req.coin)
.await
.map_err(GetFeeEstimationRequestError::Internal)?
.ok_or(GetFeeEstimationRequestError::CoinNotFound)?;

match coin {
MmCoinEnum::EthCoin(coin) => {
match lp_coinfind(&ctx, &req.coin).await {
Ok(Some(MmCoinEnum::EthCoin(coin))) => {
let use_simple = matches!(req.estimator_type, EstimatorType::Simple);
let fee = coin
.get_eip1559_gas_fee(use_simple)
Expand All @@ -52,6 +47,8 @@ pub async fn get_eth_estimated_fee_per_gas(
FeePerGasEstimated::try_from(fee).map_err(|e| GetFeeEstimationRequestError::Internal(e.to_string()))?;
Ok(ser_fee)
},
_ => Err(GetFeeEstimationRequestError::CoinNotSupported)?,
Ok(Some(_)) => MmError::err(GetFeeEstimationRequestError::CoinNotSupported),
Ok(None) => MmError::err(GetFeeEstimationRequestError::CoinNotFound),
Err(e) => MmError::err(GetFeeEstimationRequestError::Internal(e)),
}
}
5 changes: 2 additions & 3 deletions mm2src/coins/tendermint/tendermint_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ where
"could not get rpc client"
);

let streaming_manager = coin.get_ctx().unwrap().event_stream_manager.clone();
loop {
let response = try_or_return_stopped_as_err!(
client
Expand Down Expand Up @@ -755,9 +756,7 @@ where
log::debug!("Tx '{}' successfully parsed.", tx.hash);
}

coin.get_ctx()
.unwrap()
.event_stream_manager
streaming_manager
.send_fn(&TxHistoryEventStreamer::derive_streamer_id(coin.ticker()), || {
tx_details.clone()
})
Expand Down
2 changes: 2 additions & 0 deletions mm2src/coins/utxo/tx_history_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ pub struct TxHistoryEventStreamer {
}

impl TxHistoryEventStreamer {
#[inline(always)]
pub fn new(coin: String) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") }
}

Expand Down
6 changes: 2 additions & 4 deletions mm2src/coins/utxo/utxo_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ where

let my_addresses = try_or_stop_unknown!(ctx.coin.my_addresses().await, "Error on getting my addresses");

let streaming_manager = ctx.coin.get_ctx().unwrap().event_stream_manager.clone();
for (tx_hash, height) in self.all_tx_ids_with_height {
let tx_hash_string = format!("{:02x}", tx_hash);
match ctx.storage.history_has_tx_hash(&wallet_id, &tx_hash_string).await {
Expand Down Expand Up @@ -621,10 +622,7 @@ where
},
};

ctx.coin
.get_ctx()
.unwrap()
.event_stream_manager
streaming_manager
.send_fn(&TxHistoryEventStreamer::derive_streamer_id(ctx.coin.ticker()), || {
tx_details.clone()
})
Expand Down
2 changes: 2 additions & 0 deletions mm2src/coins/z_coin/tx_history_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ pub struct ZCoinTxHistoryEventStreamer {
}

impl ZCoinTxHistoryEventStreamer {
#[inline(always)]
pub fn new(coin: ZCoin) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") }
}

Expand Down
2 changes: 2 additions & 0 deletions mm2src/coins/z_coin/z_balance_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ pub struct ZCoinBalanceEventStreamer {
}

impl ZCoinBalanceEventStreamer {
#[inline(always)]
pub fn new(coin: ZCoin) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> String { format!("BALANCE:{coin}") }
}

Expand Down
8 changes: 5 additions & 3 deletions mm2src/mm2_event_stream/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use serde_json::Value as Json;

// Note `Event` shouldn't be `Clone`able, but rather Arc/Rc wrapped and then shared.
// This is only for testing.
/// Multi-purpose/generic event type that can easily be used over the event streaming
#[cfg_attr(test, derive(Clone, Debug, PartialEq))]
#[derive(Default)]
/// Multi-purpose/generic event type that can easily be used over the event streaming
pub struct Event {
/// The type of the event (balance, network, swap, etc...).
event_type: String,
Expand All @@ -16,7 +16,7 @@ pub struct Event {

impl Event {
/// Creates a new `Event` instance with the specified event type and message.
#[inline]
#[inline(always)]
pub fn new(streamer_id: String, message: Json) -> Self {
Self {
event_type: streamer_id,
Expand All @@ -26,7 +26,7 @@ impl Event {
}

/// Create a new error `Event` instance with the specified error event type and message.
#[inline]
#[inline(always)]
pub fn err(streamer_id: String, message: Json) -> Self {
Self {
event_type: streamer_id,
Expand All @@ -36,8 +36,10 @@ impl Event {
}

/// Returns the `event_type` (the ID of the streamer firing this event).
#[inline(always)]
pub fn origin(&self) -> &str { &self.event_type }

/// Returns the event type and message as a pair.
pub fn get(&self) -> (String, Json) {
let prefix = if self.error { "ERROR:" } else { "" };
(format!("{prefix}{}", self.event_type), self.message.clone())
Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_ordermatch/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use futures::StreamExt;
pub struct OrderStatusStreamer;

impl OrderStatusStreamer {
#[inline(always)]
pub fn new() -> Self { Self }

pub fn derive_streamer_id() -> &'static str { "ORDER_STATUS" }
#[inline(always)]
pub const fn derive_streamer_id() -> &'static str { "ORDER_STATUS" }
}

#[derive(Serialize)]
Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_swap/swap_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use uuid::Uuid;
pub struct SwapStatusStreamer;

impl SwapStatusStreamer {
#[inline(always)]
pub fn new() -> Self { Self }

pub fn derive_streamer_id() -> &'static str { "SWAP_STATUS" }
#[inline(always)]
pub const fn derive_streamer_id() -> &'static str { "SWAP_STATUS" }
}

#[derive(Serialize)]
Expand Down
30 changes: 16 additions & 14 deletions mm2src/mm2_main/src/rpc/streaming_activations/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,32 @@ pub async fn enable_balance(
.map_err(BalanceStreamingRequestError::Internal)?
.ok_or(BalanceStreamingRequestError::CoinNotFound)?;

match coin {
MmCoinEnum::EthCoin(_) => (),
MmCoinEnum::ZCoin(_)
| MmCoinEnum::UtxoCoin(_)
| MmCoinEnum::Bch(_)
| MmCoinEnum::QtumCoin(_)
| MmCoinEnum::Tendermint(_) => {
if req.config.is_some() {
Err(BalanceStreamingRequestError::EnableError(
"Invalid config provided. No config needed".to_string(),
))?
}
},
_ => Err(BalanceStreamingRequestError::CoinNotSupported)?,
}

let enable_result = match coin {
MmCoinEnum::UtxoCoin(coin) => {
check_empty_config(&req.config)?;
let streamer = UtxoBalanceEventStreamer::new(coin.clone().into());
ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await
},
MmCoinEnum::Bch(coin) => {
check_empty_config(&req.config)?;
let streamer = UtxoBalanceEventStreamer::new(coin.clone().into());
ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await
},
MmCoinEnum::QtumCoin(coin) => {
check_empty_config(&req.config)?;
let streamer = UtxoBalanceEventStreamer::new(coin.clone().into());
ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await
},
Expand All @@ -71,12 +84,10 @@ pub async fn enable_balance(
ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await
},
MmCoinEnum::ZCoin(coin) => {
check_empty_config(&req.config)?;
let streamer = ZCoinBalanceEventStreamer::new(coin.clone());
ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await
},
MmCoinEnum::Tendermint(coin) => {
check_empty_config(&req.config)?;
let streamer = TendermintBalanceEventStreamer::new(coin.clone());
ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await
},
Expand All @@ -87,12 +98,3 @@ pub async fn enable_balance(
.map(EnableStreamingResponse::new)
.map_to_mm(|e| BalanceStreamingRequestError::EnableError(format!("{e:?}")))
}

fn check_empty_config(config: &Option<Json>) -> MmResult<(), BalanceStreamingRequestError> {
if config.is_some() {
Err(BalanceStreamingRequestError::EnableError(
"Invalid config provided. No config needed".to_string(),
))?
}
Ok(())
}
4 changes: 2 additions & 2 deletions mm2src/mm2_main/src/rpc/streaming_activations/disable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult};

use http::StatusCode;

#[derive(Deserialize)]
/// The request used for any event streaming deactivation.
#[derive(Deserialize)]
pub struct DisableStreamingRequest {
pub client_id: u64,
pub streamer_id: String,
}

#[derive(Serialize)]
/// The success/ok response for any event streaming deactivation request.
#[derive(Serialize)]
pub struct DisableStreamingResponse {
result: &'static str,
}
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_main/src/rpc/streaming_activations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pub use orders::*;
pub use swaps::*;
pub use tx_history::*;

#[derive(Deserialize)]
/// The general request for enabling any streamer.
/// `client_id` is common in each request, other data is request-specific.
#[derive(Deserialize)]
pub struct EnableStreamingRequest<T> {
// If the client ID isn't included, assume it's 0.
#[serde(default)]
Expand All @@ -28,8 +28,8 @@ pub struct EnableStreamingRequest<T> {
inner: T,
}

#[derive(Serialize)]
/// The success/ok response for any event streaming activation request.
#[derive(Serialize)]
pub struct EnableStreamingResponse {
pub streamer_id: String,
// TODO: If the the streamer was already running, it is probably running with different configuration.
Expand Down
9 changes: 6 additions & 3 deletions mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ pub enum SwapStatusStreamingRequestError {
}

impl HttpStatusCode for SwapStatusStreamingRequestError {
fn status_code(&self) -> StatusCode { StatusCode::BAD_REQUEST }
fn status_code(&self) -> StatusCode {
match self {
SwapStatusStreamingRequestError::EnableError(_) => StatusCode::BAD_REQUEST,
}
}
}

pub async fn enable_swap_status(
ctx: MmArc,
req: EnableStreamingRequest<()>,
) -> MmResult<EnableStreamingResponse, SwapStatusStreamingRequestError> {
let swap_status_streamer = SwapStatusStreamer::new();
ctx.event_stream_manager
.add(req.client_id, swap_status_streamer, ctx.spawner())
.add(req.client_id, SwapStatusStreamer::new(), ctx.spawner())
.await
.map(EnableStreamingResponse::new)
.map_to_mm(|e| SwapStatusStreamingRequestError::EnableError(format!("{e:?}")))
Expand Down
7 changes: 4 additions & 3 deletions mm2src/mm2_net/src/event_streaming/sse_handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use http::header::{ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL, CONTENT_TYPE};
use hyper::{body::Bytes, Body, Request, Response};
use mm2_core::mm_ctx::MmArc;
use serde_json::json;
Expand Down Expand Up @@ -49,10 +50,10 @@ pub async fn handle_sse(request: Request<Body>, ctx_h: u32) -> Result<Response<B

let response = Response::builder()
.status(200)
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.header(CONTENT_TYPE, "text/event-stream")
.header(CACHE_CONTROL, "no-cache")
.header(
"Access-Control-Allow-Origin",
ACCESS_CONTROL_ALLOW_ORIGIN,
event_streaming_config.access_control_allow_origin,
)
.body(body);
Expand Down
2 changes: 1 addition & 1 deletion mm2src/rpc_task/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ pub trait RpcTask: RpcTaskTypes + Sized + Send + 'static {
async fn run(&mut self, task_handle: RpcTaskHandleShared<Self>) -> Result<Self::Item, MmError<Self::Error>>;
}

#[derive(Deserialize)]
/// The general request for initializing an RPC Task.
///
/// `client_id` is used to identify the client to which the task should stream out update events
/// to and is common in each request. Other data is request-specific.
#[derive(Deserialize)]
pub struct RpcInitReq<T> {
// If the client ID isn't included, assume it's 0.
#[serde(default)]
Expand Down

0 comments on commit 547fc2b

Please sign in to comment.