diff --git a/mm2src/coins/eth/fee_estimation/eth_fee_events.rs b/mm2src/coins/eth/fee_estimation/eth_fee_events.rs index 852988ef9e..8d91790374 100644 --- a/mm2src/coins/eth/fee_estimation/eth_fee_events.rs +++ b/mm2src/coins/eth/fee_estimation/eth_fee_events.rs @@ -43,6 +43,7 @@ pub struct EthFeeEventStreamer { } impl EthFeeEventStreamer { + #[inline(always)] pub fn new(config: EthFeeStreamingConfig, coin: EthCoin) -> Self { Self { config, coin } } } diff --git a/mm2src/coins/eth/fee_estimation/rpc.rs b/mm2src/coins/eth/fee_estimation/rpc.rs index 111d8c7979..6fb3b84498 100644 --- a/mm2src/coins/eth/fee_estimation/rpc.rs +++ b/mm2src/coins/eth/fee_estimation/rpc.rs @@ -3,7 +3,7 @@ use super::ser::FeePerGasEstimated; use crate::{lp_coinfind, MmCoinEnum}; use common::HttpStatusCode; use mm2_core::mm_ctx::MmArc; -use mm2_err_handle::mm_error::MmResult; +use mm2_err_handle::mm_error::{MmError, MmResult}; use http::StatusCode; use std::convert::TryFrom; @@ -36,13 +36,8 @@ pub async fn get_eth_estimated_fee_per_gas( ctx: MmArc, req: GetFeeEstimationRequest, ) -> MmResult { - let coin = lp_coinfind(&ctx, &req.coin) - .await - .map_err(GetFeeEstimationRequestError::Internal)? - .ok_or(GetFeeEstimationRequestError::CoinNotFound)?; - - match coin { - MmCoinEnum::EthCoin(coin) => { + match lp_coinfind(&ctx, &req.coin).await { + Ok(Some(MmCoinEnum::EthCoin(coin))) => { let use_simple = matches!(req.estimator_type, EstimatorType::Simple); let fee = coin .get_eip1559_gas_fee(use_simple) @@ -52,6 +47,8 @@ pub async fn get_eth_estimated_fee_per_gas( FeePerGasEstimated::try_from(fee).map_err(|e| GetFeeEstimationRequestError::Internal(e.to_string()))?; Ok(ser_fee) }, - _ => Err(GetFeeEstimationRequestError::CoinNotSupported)?, + Ok(Some(_)) => MmError::err(GetFeeEstimationRequestError::CoinNotSupported), + Ok(None) => MmError::err(GetFeeEstimationRequestError::CoinNotFound), + Err(e) => MmError::err(GetFeeEstimationRequestError::Internal(e)), } } diff --git a/mm2src/coins/tendermint/tendermint_tx_history_v2.rs b/mm2src/coins/tendermint/tendermint_tx_history_v2.rs index ece9b6a89c..9e83f707cc 100644 --- a/mm2src/coins/tendermint/tendermint_tx_history_v2.rs +++ b/mm2src/coins/tendermint/tendermint_tx_history_v2.rs @@ -588,6 +588,7 @@ where "could not get rpc client" ); + let streaming_manager = coin.get_ctx().unwrap().event_stream_manager.clone(); loop { let response = try_or_return_stopped_as_err!( client @@ -755,9 +756,7 @@ where log::debug!("Tx '{}' successfully parsed.", tx.hash); } - coin.get_ctx() - .unwrap() - .event_stream_manager + streaming_manager .send_fn(&TxHistoryEventStreamer::derive_streamer_id(coin.ticker()), || { tx_details.clone() }) diff --git a/mm2src/coins/utxo/tx_history_events.rs b/mm2src/coins/utxo/tx_history_events.rs index 902fb5baeb..c336e6fbb0 100644 --- a/mm2src/coins/utxo/tx_history_events.rs +++ b/mm2src/coins/utxo/tx_history_events.rs @@ -10,8 +10,10 @@ pub struct TxHistoryEventStreamer { } impl TxHistoryEventStreamer { + #[inline(always)] pub fn new(coin: String) -> Self { Self { coin } } + #[inline(always)] pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") } } diff --git a/mm2src/coins/utxo/utxo_tx_history_v2.rs b/mm2src/coins/utxo/utxo_tx_history_v2.rs index 8670ceaac8..c60588e226 100644 --- a/mm2src/coins/utxo/utxo_tx_history_v2.rs +++ b/mm2src/coins/utxo/utxo_tx_history_v2.rs @@ -590,6 +590,7 @@ where let my_addresses = try_or_stop_unknown!(ctx.coin.my_addresses().await, "Error on getting my addresses"); + let streaming_manager = ctx.coin.get_ctx().unwrap().event_stream_manager.clone(); for (tx_hash, height) in self.all_tx_ids_with_height { let tx_hash_string = format!("{:02x}", tx_hash); match ctx.storage.history_has_tx_hash(&wallet_id, &tx_hash_string).await { @@ -621,10 +622,7 @@ where }, }; - ctx.coin - .get_ctx() - .unwrap() - .event_stream_manager + streaming_manager .send_fn(&TxHistoryEventStreamer::derive_streamer_id(ctx.coin.ticker()), || { tx_details.clone() }) diff --git a/mm2src/coins/z_coin/tx_history_events.rs b/mm2src/coins/z_coin/tx_history_events.rs index 9097d581c7..beb57a2b1a 100644 --- a/mm2src/coins/z_coin/tx_history_events.rs +++ b/mm2src/coins/z_coin/tx_history_events.rs @@ -19,8 +19,10 @@ pub struct ZCoinTxHistoryEventStreamer { } impl ZCoinTxHistoryEventStreamer { + #[inline(always)] pub fn new(coin: ZCoin) -> Self { Self { coin } } + #[inline(always)] pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") } } diff --git a/mm2src/coins/z_coin/z_balance_streaming.rs b/mm2src/coins/z_coin/z_balance_streaming.rs index 2b29f093ed..18b2c6ef38 100644 --- a/mm2src/coins/z_coin/z_balance_streaming.rs +++ b/mm2src/coins/z_coin/z_balance_streaming.rs @@ -19,8 +19,10 @@ pub struct ZCoinBalanceEventStreamer { } impl ZCoinBalanceEventStreamer { + #[inline(always)] pub fn new(coin: ZCoin) -> Self { Self { coin } } + #[inline(always)] pub fn derive_streamer_id(coin: &str) -> String { format!("BALANCE:{coin}") } } diff --git a/mm2src/mm2_event_stream/src/event.rs b/mm2src/mm2_event_stream/src/event.rs index 93b1918d6a..57931b9817 100644 --- a/mm2src/mm2_event_stream/src/event.rs +++ b/mm2src/mm2_event_stream/src/event.rs @@ -2,9 +2,9 @@ use serde_json::Value as Json; // Note `Event` shouldn't be `Clone`able, but rather Arc/Rc wrapped and then shared. // This is only for testing. +/// Multi-purpose/generic event type that can easily be used over the event streaming #[cfg_attr(test, derive(Clone, Debug, PartialEq))] #[derive(Default)] -/// Multi-purpose/generic event type that can easily be used over the event streaming pub struct Event { /// The type of the event (balance, network, swap, etc...). event_type: String, @@ -16,7 +16,7 @@ pub struct Event { impl Event { /// Creates a new `Event` instance with the specified event type and message. - #[inline] + #[inline(always)] pub fn new(streamer_id: String, message: Json) -> Self { Self { event_type: streamer_id, @@ -26,7 +26,7 @@ impl Event { } /// Create a new error `Event` instance with the specified error event type and message. - #[inline] + #[inline(always)] pub fn err(streamer_id: String, message: Json) -> Self { Self { event_type: streamer_id, @@ -36,8 +36,10 @@ impl Event { } /// Returns the `event_type` (the ID of the streamer firing this event). + #[inline(always)] pub fn origin(&self) -> &str { &self.event_type } + /// Returns the event type and message as a pair. pub fn get(&self) -> (String, Json) { let prefix = if self.error { "ERROR:" } else { "" }; (format!("{prefix}{}", self.event_type), self.message.clone()) diff --git a/mm2src/mm2_main/src/lp_ordermatch/order_events.rs b/mm2src/mm2_main/src/lp_ordermatch/order_events.rs index 0ba4dc3905..547ee7df4e 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/order_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/order_events.rs @@ -8,9 +8,11 @@ use futures::StreamExt; pub struct OrderStatusStreamer; impl OrderStatusStreamer { + #[inline(always)] pub fn new() -> Self { Self } - pub fn derive_streamer_id() -> &'static str { "ORDER_STATUS" } + #[inline(always)] + pub const fn derive_streamer_id() -> &'static str { "ORDER_STATUS" } } #[derive(Serialize)] diff --git a/mm2src/mm2_main/src/lp_swap/swap_events.rs b/mm2src/mm2_main/src/lp_swap/swap_events.rs index a7dcfef3b8..7f4aaa90eb 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_events.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_events.rs @@ -12,9 +12,11 @@ use uuid::Uuid; pub struct SwapStatusStreamer; impl SwapStatusStreamer { + #[inline(always)] pub fn new() -> Self { Self } - pub fn derive_streamer_id() -> &'static str { "SWAP_STATUS" } + #[inline(always)] + pub const fn derive_streamer_id() -> &'static str { "SWAP_STATUS" } } #[derive(Serialize)] diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/balance.rs b/mm2src/mm2_main/src/rpc/streaming_activations/balance.rs index fa49f0a8d6..76f9d594e1 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/balance.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/balance.rs @@ -49,19 +49,32 @@ pub async fn enable_balance( .map_err(BalanceStreamingRequestError::Internal)? .ok_or(BalanceStreamingRequestError::CoinNotFound)?; + match coin { + MmCoinEnum::EthCoin(_) => (), + MmCoinEnum::ZCoin(_) + | MmCoinEnum::UtxoCoin(_) + | MmCoinEnum::Bch(_) + | MmCoinEnum::QtumCoin(_) + | MmCoinEnum::Tendermint(_) => { + if req.config.is_some() { + Err(BalanceStreamingRequestError::EnableError( + "Invalid config provided. No config needed".to_string(), + ))? + } + }, + _ => Err(BalanceStreamingRequestError::CoinNotSupported)?, + } + let enable_result = match coin { MmCoinEnum::UtxoCoin(coin) => { - check_empty_config(&req.config)?; let streamer = UtxoBalanceEventStreamer::new(coin.clone().into()); ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await }, MmCoinEnum::Bch(coin) => { - check_empty_config(&req.config)?; let streamer = UtxoBalanceEventStreamer::new(coin.clone().into()); ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await }, MmCoinEnum::QtumCoin(coin) => { - check_empty_config(&req.config)?; let streamer = UtxoBalanceEventStreamer::new(coin.clone().into()); ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await }, @@ -71,12 +84,10 @@ pub async fn enable_balance( ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await }, MmCoinEnum::ZCoin(coin) => { - check_empty_config(&req.config)?; let streamer = ZCoinBalanceEventStreamer::new(coin.clone()); ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await }, MmCoinEnum::Tendermint(coin) => { - check_empty_config(&req.config)?; let streamer = TendermintBalanceEventStreamer::new(coin.clone()); ctx.event_stream_manager.add(client_id, streamer, coin.spawner()).await }, @@ -87,12 +98,3 @@ pub async fn enable_balance( .map(EnableStreamingResponse::new) .map_to_mm(|e| BalanceStreamingRequestError::EnableError(format!("{e:?}"))) } - -fn check_empty_config(config: &Option) -> MmResult<(), BalanceStreamingRequestError> { - if config.is_some() { - Err(BalanceStreamingRequestError::EnableError( - "Invalid config provided. No config needed".to_string(), - ))? - } - Ok(()) -} diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/disable.rs b/mm2src/mm2_main/src/rpc/streaming_activations/disable.rs index ab1155378e..9643e9e652 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/disable.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/disable.rs @@ -8,15 +8,15 @@ use mm2_err_handle::{map_to_mm::MapToMmResult, mm_error::MmResult}; use http::StatusCode; -#[derive(Deserialize)] /// The request used for any event streaming deactivation. +#[derive(Deserialize)] pub struct DisableStreamingRequest { pub client_id: u64, pub streamer_id: String, } -#[derive(Serialize)] /// The success/ok response for any event streaming deactivation request. +#[derive(Serialize)] pub struct DisableStreamingResponse { result: &'static str, } diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs b/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs index c0a9bdf800..607876725f 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/mod.rs @@ -17,9 +17,9 @@ pub use orders::*; pub use swaps::*; pub use tx_history::*; -#[derive(Deserialize)] /// The general request for enabling any streamer. /// `client_id` is common in each request, other data is request-specific. +#[derive(Deserialize)] pub struct EnableStreamingRequest { // If the client ID isn't included, assume it's 0. #[serde(default)] @@ -28,8 +28,8 @@ pub struct EnableStreamingRequest { inner: T, } -#[derive(Serialize)] /// The success/ok response for any event streaming activation request. +#[derive(Serialize)] pub struct EnableStreamingResponse { pub streamer_id: String, // TODO: If the the streamer was already running, it is probably running with different configuration. diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs b/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs index 3c3cb5d248..3d4aa2b93e 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/swaps.rs @@ -14,16 +14,19 @@ pub enum SwapStatusStreamingRequestError { } impl HttpStatusCode for SwapStatusStreamingRequestError { - fn status_code(&self) -> StatusCode { StatusCode::BAD_REQUEST } + fn status_code(&self) -> StatusCode { + match self { + SwapStatusStreamingRequestError::EnableError(_) => StatusCode::BAD_REQUEST, + } + } } pub async fn enable_swap_status( ctx: MmArc, req: EnableStreamingRequest<()>, ) -> MmResult { - let swap_status_streamer = SwapStatusStreamer::new(); ctx.event_stream_manager - .add(req.client_id, swap_status_streamer, ctx.spawner()) + .add(req.client_id, SwapStatusStreamer::new(), ctx.spawner()) .await .map(EnableStreamingResponse::new) .map_to_mm(|e| SwapStatusStreamingRequestError::EnableError(format!("{e:?}"))) diff --git a/mm2src/mm2_net/src/event_streaming/sse_handler.rs b/mm2src/mm2_net/src/event_streaming/sse_handler.rs index 3fc1c5c333..3dfa6264d7 100644 --- a/mm2src/mm2_net/src/event_streaming/sse_handler.rs +++ b/mm2src/mm2_net/src/event_streaming/sse_handler.rs @@ -1,3 +1,4 @@ +use http::header::{ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL, CONTENT_TYPE}; use hyper::{body::Bytes, Body, Request, Response}; use mm2_core::mm_ctx::MmArc; use serde_json::json; @@ -49,10 +50,10 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result) -> Result>; } -#[derive(Deserialize)] /// The general request for initializing an RPC Task. /// /// `client_id` is used to identify the client to which the task should stream out update events /// to and is common in each request. Other data is request-specific. +#[derive(Deserialize)] pub struct RpcInitReq { // If the client ID isn't included, assume it's 0. #[serde(default)]