diff --git a/Cargo.lock b/Cargo.lock index 5bf69cbb31..1f7f1ee9f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4115,6 +4115,7 @@ dependencies = [ "gstuff", "hex 0.4.3", "lazy_static", + "mm2_event_stream", "mm2_metrics", "mm2_rpc", "primitives", @@ -4183,6 +4184,19 @@ dependencies = [ "web3", ] +[[package]] +name = "mm2_event_stream" +version = "0.1.0" +dependencies = [ + "async-trait", + "cfg-if 1.0.0", + "common", + "parking_lot 0.12.0", + "serde", + "tokio", + "wasm-bindgen-test", +] + [[package]] name = "mm2_git" version = "0.1.0" @@ -4278,6 +4292,7 @@ dependencies = [ "mm2_core", "mm2_db", "mm2_err_handle", + "mm2_event_stream", "mm2_gui_storage", "mm2_io", "mm2_metrics", @@ -4373,6 +4388,7 @@ dependencies = [ name = "mm2_net" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "bytes 1.1.0", "cfg-if 1.0.0", @@ -4386,9 +4402,13 @@ dependencies = [ "hyper", "js-sys", "lazy_static", + "mm2-libp2p", "mm2_core", "mm2_err_handle", + "mm2_event_stream", "mm2_state_machine", + "mocktopus", + "parking_lot 0.12.0", "prost", "rand 0.7.3", "rustls 0.20.4", diff --git a/Cargo.toml b/Cargo.toml index a3f3e98803..a7cf3badf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,44 +1,45 @@ [workspace] members = [ + "mm2src/coins_activation", "mm2src/coins", "mm2src/coins/utxo_signer", - "mm2src/coins_activation", "mm2src/common/shared_ref_counter", "mm2src/crypto", "mm2src/db_common", "mm2src/derives/enum_from", - "mm2src/derives/ser_error", "mm2src/derives/ser_error_derive", + "mm2src/derives/ser_error", "mm2src/floodsub", "mm2src/gossipsub", - "mm2src/mm2_gui_storage", "mm2src/hw_common", "mm2src/mm2_bin_lib", - "mm2src/mm2_bitcoin/crypto", "mm2src/mm2_bitcoin/chain", + "mm2src/mm2_bitcoin/crypto", "mm2src/mm2_bitcoin/keys", - "mm2src/mm2_bitcoin/rpc", "mm2src/mm2_bitcoin/primitives", + "mm2src/mm2_bitcoin/rpc", "mm2src/mm2_bitcoin/script", - "mm2src/mm2_bitcoin/serialization", "mm2src/mm2_bitcoin/serialization_derive", + "mm2src/mm2_bitcoin/serialization", "mm2src/mm2_bitcoin/test_helpers", "mm2src/mm2_core", "mm2src/mm2_db", "mm2src/mm2_err_handle", "mm2src/mm2_eth", + "mm2src/mm2_event_stream", "mm2src/mm2_git", + "mm2src/mm2_gui_storage", "mm2src/mm2_io", "mm2src/mm2_libp2p", + "mm2src/mm2_main", "mm2src/mm2_metamask", "mm2src/mm2_metrics", - "mm2src/mm2_main", "mm2src/mm2_net", "mm2src/mm2_number", "mm2src/mm2_rpc", "mm2src/mm2_state_machine", - "mm2src/rpc_task", "mm2src/mm2_test_helpers", + "mm2src/rpc_task", "mm2src/trezor", ] diff --git a/examples/sse/README.md b/examples/sse/README.md new file mode 100644 index 0000000000..b43c213d02 --- /dev/null +++ b/examples/sse/README.md @@ -0,0 +1,14 @@ +# Listening event-stream from komodo-defi-framework + +1. Start komodo-defi-framework with event streaming activated +2. Run a local HTTP server + - if you use Python 3, run: + ``` + python3 -m http.server 8000 + ``` + - if you use Python 2, run: + ``` + python -m SimpleHTTPServer 8000 + ``` + +You should now be able to observe events from the komodo-defi-framework through the SSE. diff --git a/examples/sse/index.html b/examples/sse/index.html new file mode 100644 index 0000000000..e780004ccb --- /dev/null +++ b/examples/sse/index.html @@ -0,0 +1,26 @@ + + + + + +

Events

+
+ + + + + + diff --git a/examples/wasm/README.md b/examples/wasm/README.md index 6f2faff136..849ff63773 100644 --- a/examples/wasm/README.md +++ b/examples/wasm/README.md @@ -15,4 +15,4 @@ via [WebAssembly](https://developer.mozilla.org/en-US/docs/WebAssembly) ``` Read more about [running a simple local HTTP server](https://developer.mozilla.org/en-US/docs/Learn/Common_questions/set_up_a_local_testing_server#running_a_simple_local_http_server) -3. Open webpage in your browser http://localhost:8000/wasm_build/index.html +3. Open webpage in your browser http://localhost:8000/wasm_build/index.html \ No newline at end of file diff --git a/examples/wasm/index.html b/examples/wasm/index.html index bd19234020..b9243b9bf8 100644 --- a/examples/wasm/index.html +++ b/examples/wasm/index.html @@ -1,27 +1,32 @@ + MM2 example + -
- -
- -
- - -
-
- -
- -
- -
+
+ +
+ +
+ + +
+
+ +
+ +
+ +
- + + \ No newline at end of file diff --git a/mm2src/mm2_core/Cargo.toml b/mm2src/mm2_core/Cargo.toml index b8e34b34e4..566d778b8b 100644 --- a/mm2src/mm2_core/Cargo.toml +++ b/mm2src/mm2_core/Cargo.toml @@ -7,8 +7,8 @@ edition = "2021" doctest = false [dependencies] -async-trait = "0.1" arrayref = "0.3" +async-trait = "0.1" cfg-if = "1.0" common = { path = "../common" } db_common = { path = "../db_common" } @@ -16,6 +16,7 @@ derive_more = "0.99" futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] } hex = "0.4.2" lazy_static = "1.4" +mm2_event_stream = { path = "../mm2_event_stream" } mm2_metrics = { path = "../mm2_metrics" } primitives = { path = "../mm2_bitcoin/primitives" } rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index d2bd527d2b..fc7fc5f0d9 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -6,6 +6,7 @@ use common::log::{self, LogLevel, LogOnError, LogState}; use common::{cfg_native, cfg_wasm32, small_rng}; use gstuff::{try_s, Constructible, ERR, ERRL}; use lazy_static::lazy_static; +use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration}; use mm2_metrics::{MetricsArc, MetricsOps}; use primitives::hash::H160; use rand::Rng; @@ -72,6 +73,10 @@ pub struct MmCtx { pub initialized: Constructible, /// True if the RPC HTTP server was started. pub rpc_started: Constructible, + /// Controller for continuously streaming data using streaming channels of `mm2_event_stream`. + pub stream_channel_controller: Controller, + /// Configuration of event streaming used for SSE. + pub event_stream_configuration: Option, /// True if the MarketMaker instance needs to stop. pub stop: Constructible, /// Unique context identifier, allowing us to more easily pass the context through the FFI boundaries. @@ -133,6 +138,8 @@ impl MmCtx { metrics: MetricsArc::new(), initialized: Constructible::default(), rpc_started: Constructible::default(), + stream_channel_controller: Controller::new(), + event_stream_configuration: None, stop: Constructible::default(), ffi_handle: Constructible::default(), ordermatch_ctx: Mutex::new(None), @@ -680,8 +687,17 @@ impl MmCtxBuilder { let mut ctx = MmCtx::with_log_state(log); ctx.mm_version = self.version; ctx.datetime = self.datetime; + if let Some(conf) = self.conf { - ctx.conf = conf + ctx.conf = conf; + + let event_stream_configuration = &ctx.conf["event_stream_configuration"]; + if !event_stream_configuration.is_null() { + let event_stream_configuration: EventStreamConfiguration = + json::from_value(event_stream_configuration.clone()) + .expect("Invalid json value in 'event_stream_configuration'."); + ctx.event_stream_configuration = Some(event_stream_configuration); + } } #[cfg(target_arch = "wasm32")] diff --git a/mm2src/mm2_event_stream/Cargo.toml b/mm2src/mm2_event_stream/Cargo.toml new file mode 100644 index 0000000000..2865e0a01f --- /dev/null +++ b/mm2src/mm2_event_stream/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mm2_event_stream" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +cfg-if = "1.0" +common = { path = "../common" } +parking_lot = "0.12" +serde = { version = "1", features = ["derive", "rc"] } +tokio = { version = "1", features = ["sync"] } + +[dev-dependencies] +tokio = { version = "1", features = ["sync", "macros", "time", "rt"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-test = { version = "0.3.2" } diff --git a/mm2src/mm2_event_stream/src/behaviour.rs b/mm2src/mm2_event_stream/src/behaviour.rs new file mode 100644 index 0000000000..bb905af3fc --- /dev/null +++ b/mm2src/mm2_event_stream/src/behaviour.rs @@ -0,0 +1,15 @@ +use crate::EventStreamConfiguration; +use async_trait::async_trait; + +#[async_trait] +pub trait EventBehaviour { + /// Unique name of the event. + const EVENT_NAME: &'static str; + + /// Event handler that is responsible for broadcasting event data to the streaming channels. + async fn handle(self, interval: f64); + + /// Spawns the `Self::handle` in a separate thread if the event is active according to the mm2 configuration. + /// Does nothing if the event is not active. + fn spawn_if_active(self, config: &EventStreamConfiguration); +} diff --git a/mm2src/mm2_event_stream/src/controller.rs b/mm2src/mm2_event_stream/src/controller.rs new file mode 100644 index 0000000000..098c6e4bb7 --- /dev/null +++ b/mm2src/mm2_event_stream/src/controller.rs @@ -0,0 +1,210 @@ +use parking_lot::Mutex; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc::{self, Receiver, Sender}; + +type ChannelId = u64; + +/// Root controller of streaming channels +pub struct Controller(Arc>>); + +impl Clone for Controller { + fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } +} + +/// Inner part of the controller +pub struct ChannelsInner { + last_id: u64, + channels: HashMap>, +} + +struct Channel { + tx: Sender>, +} + +/// guard to trace channels disconnection +pub struct ChannelGuard { + channel_id: ChannelId, + controller: Controller, +} + +/// Receiver to cleanup resources on `Drop` +pub struct GuardedReceiver { + rx: Receiver>, + #[allow(dead_code)] + guard: ChannelGuard, +} + +impl Controller { + /// Creates a new channels controller + pub fn new() -> Self { Default::default() } + + /// Creates a new channel and returns it's events receiver + pub fn create_channel(&mut self, concurrency: usize) -> GuardedReceiver { + let (tx, rx) = mpsc::channel::>(concurrency); + let channel = Channel { tx }; + + let mut inner = self.0.lock(); + let channel_id = inner.last_id.overflowing_add(1).0; + inner.channels.insert(channel_id, channel); + inner.last_id = channel_id; + + let guard = ChannelGuard::new(channel_id, self.clone()); + GuardedReceiver { rx, guard } + } + + /// Returns number of active channels + pub fn num_connections(&self) -> usize { self.0.lock().channels.len() } + + /// Broadcast message to all channels + pub async fn broadcast(&self, message: M) { + let msg = Arc::new(message); + for rx in self.all_senders() { + rx.send(Arc::clone(&msg)).await.ok(); + } + } + + /// Removes the channel from the controller + fn remove_channel(&mut self, channel_id: &ChannelId) { + let mut inner = self.0.lock(); + inner.channels.remove(channel_id); + } + + /// Returns all the active channels + fn all_senders(&self) -> Vec>> { self.0.lock().channels.values().map(|c| c.tx.clone()).collect() } +} + +impl Default for Controller { + fn default() -> Self { + let inner = ChannelsInner { + last_id: 0, + channels: HashMap::new(), + }; + Self(Arc::new(Mutex::new(inner))) + } +} + +impl ChannelGuard { + fn new(channel_id: ChannelId, controller: Controller) -> Self { Self { channel_id, controller } } +} + +impl Drop for ChannelGuard { + fn drop(&mut self) { + common::log::debug!("Dropping event channel with id: {}", self.channel_id); + + self.controller.remove_channel(&self.channel_id); + } +} + +impl GuardedReceiver { + /// Receives the next event from the channel + pub async fn recv(&mut self) -> Option> { self.rx.recv().await } +} + +#[cfg(any(test, target_arch = "wasm32"))] +mod tests { + use super::*; + + common::cfg_wasm32! { + use wasm_bindgen_test::*; + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + } + + macro_rules! cross_test { + ($test_name:ident, $test_code:block) => { + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test(flavor = "multi_thread")] + async fn $test_name() { $test_code } + + #[cfg(target_arch = "wasm32")] + #[wasm_bindgen_test] + async fn $test_name() { $test_code } + }; + } + + cross_test!(test_create_channel_and_broadcast, { + let mut controller = Controller::new(); + let mut guard_receiver = controller.create_channel(1); + + controller.broadcast("Message".to_string()).await; + + let received_msg = guard_receiver.recv().await.unwrap(); + assert_eq!(*received_msg, "Message".to_string()); + }); + + cross_test!(test_multiple_channels_and_broadcast, { + let mut controller = Controller::new(); + + let mut receivers = Vec::new(); + for _ in 0..3 { + receivers.push(controller.create_channel(1)); + } + + controller.broadcast("Message".to_string()).await; + + for receiver in &mut receivers { + let received_msg = receiver.recv().await.unwrap(); + assert_eq!(*received_msg, "Message".to_string()); + } + }); + + cross_test!(test_channel_cleanup_on_drop, { + let mut controller: Controller<()> = Controller::new(); + let guard_receiver = controller.create_channel(1); + + assert_eq!(controller.num_connections(), 1); + + drop(guard_receiver); + + common::executor::Timer::sleep(0.1).await; // Give time for the drop to execute + + assert_eq!(controller.num_connections(), 0); + }); + + cross_test!(test_broadcast_across_channels, { + let mut controller = Controller::new(); + + let mut receivers = Vec::new(); + for _ in 0..3 { + receivers.push(controller.create_channel(1)); + } + + controller.broadcast("Message".to_string()).await; + + for receiver in &mut receivers { + let received_msg = receiver.recv().await.unwrap(); + assert_eq!(*received_msg, "Message".to_string()); + } + }); + + cross_test!(test_multiple_messages_and_drop, { + let mut controller = Controller::new(); + let mut guard_receiver = controller.create_channel(6); + + controller.broadcast("Message 1".to_string()).await; + controller.broadcast("Message 2".to_string()).await; + controller.broadcast("Message 3".to_string()).await; + controller.broadcast("Message 4".to_string()).await; + controller.broadcast("Message 5".to_string()).await; + controller.broadcast("Message 6".to_string()).await; + + let mut received_msgs = Vec::new(); + for _ in 0..6 { + let received_msg = guard_receiver.recv().await.unwrap(); + received_msgs.push(received_msg); + } + + assert_eq!(*received_msgs[0], "Message 1".to_string()); + assert_eq!(*received_msgs[1], "Message 2".to_string()); + assert_eq!(*received_msgs[2], "Message 3".to_string()); + assert_eq!(*received_msgs[3], "Message 4".to_string()); + assert_eq!(*received_msgs[4], "Message 5".to_string()); + assert_eq!(*received_msgs[5], "Message 6".to_string()); + + // Consume the GuardedReceiver to trigger drop and channel cleanup + drop(guard_receiver); + + common::executor::Timer::sleep(0.1).await; // Give time for the drop to execute + + assert_eq!(controller.num_connections(), 0); + }); +} diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs new file mode 100644 index 0000000000..afa5c7e1be --- /dev/null +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -0,0 +1,66 @@ +use serde::Deserialize; +use std::collections::HashMap; + +/// Multi-purpose/generic event type that can easily be used over the event streaming +pub struct Event { + _type: String, + message: String, +} + +impl Event { + /// Creates a new `Event` instance with the specified event type and message. + #[inline] + pub fn new(event_type: String, message: String) -> Self { + Self { + _type: event_type, + message, + } + } + + /// Gets the event type. + #[inline] + pub fn event_type(&self) -> &str { &self._type } + + /// Gets the event message. + #[inline] + pub fn message(&self) -> &str { &self.message } +} + +/// Configuration for event streaming +#[derive(Deserialize)] +pub struct EventStreamConfiguration { + /// The value to set for the `Access-Control-Allow-Origin` header. + #[serde(default)] + pub access_control_allow_origin: String, + #[serde(default)] + active_events: HashMap, +} + +/// Represents the configuration for a specific event within the event stream. +#[derive(Clone, Default, Deserialize)] +pub struct EventConfig { + /// The interval in seconds at which the event should be streamed. + pub stream_interval_seconds: f64, +} + +impl Default for EventStreamConfiguration { + fn default() -> Self { + Self { + access_control_allow_origin: String::from("*"), + active_events: Default::default(), + } + } +} + +impl EventStreamConfiguration { + /// Retrieves the configuration for a specific event by its name. + #[inline] + pub fn get_event(&self, event_name: &str) -> Option { self.active_events.get(event_name).cloned() } + + /// Gets the total number of active events in the configuration. + #[inline] + pub fn total_active_events(&self) -> usize { self.active_events.len() } +} + +pub mod behaviour; +pub mod controller; diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index 48976dda2b..2416e93a58 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -57,6 +57,7 @@ lazy_static = "1.4" libc = "0.2" mm2_core = { path = "../mm2_core" } mm2_err_handle = { path = "../mm2_err_handle" } +mm2_event_stream = { path = "../mm2_event_stream" } mm2_gui_storage = { path = "../mm2_gui_storage" } mm2_io = { path = "../mm2_io" } mm2-libp2p = { path = "../mm2_libp2p" } diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 8401f2a0fd..45f7a2f126 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -31,6 +31,7 @@ use mm2_err_handle::prelude::*; use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SwarmRuntime, WssCerts}; use mm2_metrics::mm_gauge; +use mm2_net::p2p::P2PContext; use rpc_task::RpcTaskError; use serde_json::{self as json}; use std::fs; @@ -42,7 +43,7 @@ use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use crate::mm2::database::init_and_migrate_db; use crate::mm2::lp_message_service::{init_message_service, InitMessageServiceError}; -use crate::mm2::lp_network::{lp_network_ports, p2p_event_process_loop, NetIdError, P2PContext}; +use crate::mm2::lp_network::{lp_network_ports, p2p_event_process_loop, NetIdError}; use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context, lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError}; @@ -50,9 +51,11 @@ use crate::mm2::lp_swap::{running_swaps_num, swap_kick_starts}; use crate::mm2::rpc::spawn_rpc; cfg_native! { + use db_common::sqlite::rusqlite::Error as SqlError; + use mm2_event_stream::behaviour::EventBehaviour; use mm2_io::fs::{ensure_dir_is_writable, ensure_file_is_writable}; use mm2_net::ip_addr::myipaddr; - use db_common::sqlite::rusqlite::Error as SqlError; + use mm2_net::network_event::NetworkEvent; } #[path = "lp_init/init_context.rs"] mod init_context; @@ -376,6 +379,15 @@ fn migrate_db(ctx: &MmArc) -> MmInitResult<()> { #[cfg(not(target_arch = "wasm32"))] fn migration_1(_ctx: &MmArc) {} +#[cfg(not(target_arch = "wasm32"))] +fn init_event_streaming(ctx: &MmArc) { + // This condition only executed if events were enabled in mm2 configuration. + if let Some(config) = &ctx.event_stream_configuration { + // Network event handling + NetworkEvent::new(ctx.clone()).spawn_if_active(config); + } +} + pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> { init_ordermatch_context(&ctx)?; init_p2p(ctx.clone()).await?; @@ -406,11 +418,15 @@ pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> { // an order and start new swap that might get started 2 times because of kick-start kick_start(ctx.clone()).await?; + #[cfg(not(target_arch = "wasm32"))] + init_event_streaming(&ctx); + ctx.spawner().spawn(lp_ordermatch_loop(ctx.clone())); ctx.spawner().spawn(broadcast_maker_orders_keep_alive_loop(ctx.clone())); ctx.spawner().spawn(clean_memory_loop(ctx.weak())); + Ok(()) } @@ -439,11 +455,13 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes spawn_rpc(ctx_id); let ctx_c = ctx.clone(); + ctx.spawner().spawn(async move { if let Err(err) = ctx_c.init_metrics() { warn!("Couldn't initialize metrics system: {}", err); } }); + // In the mobile version we might depend on `lp_init` staying around until the context stops. loop { if ctx.is_stopping() { diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index ab4232e07e..859434026b 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -1,3 +1,5 @@ +// TODO: a lof of these implementations should be handled in `mm2_net` + /****************************************************************************** * Copyright © 2022 Atomic Private Limited and its contributors * * * @@ -27,17 +29,15 @@ use instant::Instant; use keys::KeyPair; use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; -use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse, +use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse, AdexResponseChannel}; use mm2_libp2p::peers_exchange::PeerAddresses; use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR}; use mm2_metrics::{mm_label, mm_timing}; -#[cfg(test)] use mocktopus::macros::*; -use parking_lot::Mutex as PaMutex; +use mm2_net::p2p::P2PContext; use serde::de; use std::net::ToSocketAddrs; -use std::sync::Arc; use crate::mm2::lp_ordermatch; use crate::mm2::{lp_stats, lp_swap}; @@ -89,33 +89,6 @@ pub enum P2PRequest { NetworkInfo(lp_stats::NetworkInfoRequest), } -pub struct P2PContext { - /// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message. - pub cmd_tx: PaMutex, -} - -#[cfg_attr(test, mockable)] -impl P2PContext { - pub fn new(cmd_tx: AdexCmdTx) -> Self { - P2PContext { - cmd_tx: PaMutex::new(cmd_tx), - } - } - - pub fn store_to_mm_arc(self, ctx: &MmArc) { *ctx.p2p_ctx.lock().unwrap() = Some(Arc::new(self)) } - - pub fn fetch_from_mm_arc(ctx: &MmArc) -> Arc { - ctx.p2p_ctx - .lock() - .unwrap() - .as_ref() - .unwrap() - .clone() - .downcast() - .unwrap() - } -} - pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay: bool) { loop { let adex_event = rx.next().await; diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index 6f2af8a757..ec96d8f484 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -1,5 +1,4 @@ use super::*; -use crate::mm2::lp_network::P2PContext; use crate::mm2::lp_ordermatch::new_protocol::{MakerOrderUpdated, PubkeyKeepAlive}; use coins::{MmCoin, TestCoin}; use common::{block_on, executor::spawn}; @@ -9,6 +8,7 @@ use futures::{channel::mpsc, StreamExt}; use mm2_core::mm_ctx::{MmArc, MmCtx}; use mm2_libp2p::atomicdex_behaviour::AdexBehaviourCmd; use mm2_libp2p::{decode_message, PeerId}; +use mm2_net::p2p::P2PContext; use mm2_test_helpers::for_tests::mm_ctx_with_iguana; use mocktopus::mocking::*; use rand::{seq::SliceRandom, thread_rng, Rng}; diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index 8ca80d5274..001f8bea28 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -28,8 +28,6 @@ use futures::future::{join_all, FutureExt}; use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, CONTENT_TYPE}; use http::request::Parts; use http::{Method, Request, Response, StatusCode}; -#[cfg(not(target_arch = "wasm32"))] -use hyper::{self, Body, Server}; use lazy_static::lazy_static; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; @@ -40,6 +38,11 @@ use serde_json::{self as json, Value as Json}; use std::borrow::Cow; use std::net::SocketAddr; +cfg_native! { + use hyper::{self, Body, Server}; + use mm2_net::sse_handler::{handle_sse, SSE_ENDPOINT}; +} + #[path = "rpc/dispatcher/dispatcher.rs"] mod dispatcher; #[path = "rpc/dispatcher/dispatcher_legacy.rs"] mod dispatcher_legacy; @@ -301,6 +304,8 @@ async fn rpc_service(req: Request, ctx_h: u32, client: SocketAddr) -> Resp Response::from_parts(parts, Body::from(body_escaped)) } +// TODO: This should exclude TCP internals, as including them results in having to +// handle various protocols within this function. #[cfg(not(target_arch = "wasm32"))] pub extern "C" fn spawn_rpc(ctx_h: u32) { use common::now_sec; @@ -351,8 +356,17 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { // then we might want to refactor into starting it ideomatically in order to benefit from a more graceful shutdown, // cf. https://github.com/hyperium/hyper/pull/1640. + let ctx = MmArc::from_ffi_handle(ctx_h).expect("No context"); + + let is_event_stream_enabled = ctx.event_stream_configuration.is_some(); + let make_svc_fut = move |remote_addr: SocketAddr| async move { Ok::<_, Infallible>(service_fn(move |req: Request| async move { + if is_event_stream_enabled && req.uri().path() == SSE_ENDPOINT { + let res = handle_sse(req, ctx_h).await?; + return Ok::<_, Infallible>(res); + } + let res = rpc_service(req, ctx_h, remote_addr).await; Ok::<_, Infallible>(res) })) @@ -417,8 +431,6 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { }; } - let ctx = MmArc::from_ffi_handle(ctx_h).expect("No context"); - let rpc_ip_port = ctx .rpc_ip_port() .unwrap_or_else(|err| panic!("Invalid RPC port: {}", err)); diff --git a/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs b/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs index 7b5d68b8d0..e060175858 100644 --- a/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs +++ b/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs @@ -27,6 +27,7 @@ use futures::compat::Future01CompatExt; use http::Response; use mm2_core::mm_ctx::MmArc; use mm2_metrics::MetricsOps; +use mm2_net::p2p::P2PContext; use mm2_number::construct_detailed; use mm2_rpc::data::legacy::{BalanceResponse, CoinInitResponse, Mm2RpcResult, MmVersionResponse, Status}; use serde_json::{self as json, Value as Json}; @@ -306,7 +307,6 @@ pub fn version(ctx: MmArc) -> HyRes { } pub async fn get_peers_info(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_peers_info; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -319,7 +319,6 @@ pub async fn get_peers_info(ctx: MmArc) -> Result>, String> { } pub async fn get_gossip_mesh(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_mesh; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -332,7 +331,6 @@ pub async fn get_gossip_mesh(ctx: MmArc) -> Result>, String> { } pub async fn get_gossip_peer_topics(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_peer_topics; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -345,7 +343,6 @@ pub async fn get_gossip_peer_topics(ctx: MmArc) -> Result>, Str } pub async fn get_gossip_topic_peers(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_topic_peers; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -358,7 +355,6 @@ pub async fn get_gossip_topic_peers(ctx: MmArc) -> Result>, Str } pub async fn get_relay_mesh(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_relay_mesh; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); diff --git a/mm2src/mm2_net/Cargo.toml b/mm2src/mm2_net/Cargo.toml index c2da301fa6..62da4d0446 100644 --- a/mm2src/mm2_net/Cargo.toml +++ b/mm2src/mm2_net/Cargo.toml @@ -7,22 +7,26 @@ edition = "2018" doctest = false [dependencies] +async-stream = "0.3" async-trait = "0.1" -serde = "1" -serde_json = { version = "1", features = ["preserve_order", "raw_value"] } bytes = "1.1" cfg-if = "1.0" common = { path = "../common" } +derive_more = "0.99" ethkey = { git = "https://github.com/KomodoPlatform/mm2-parity-ethereum.git" } +futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] } +http = "0.2" +lazy_static = "1.4" mm2_core = { path = "../mm2_core" } mm2_err_handle = { path = "../mm2_err_handle" } +mm2_event_stream = { path = "../mm2_event_stream"} +mm2-libp2p = { path = "../mm2_libp2p" } mm2_state_machine = { path = "../mm2_state_machine" } -derive_more = "0.99" -http = "0.2" -rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } -futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] } -lazy_static = "1.4" +parking_lot = { version = "0.12.0", features = ["nightly"] } prost = "0.10" +rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } +serde = "1" +serde_json = { version = "1", features = ["preserve_order", "raw_value"] } [target.'cfg(target_arch = "wasm32")'.dependencies] gstuff = { version = "0.7", features = ["nightly"] } @@ -34,8 +38,11 @@ js-sys = "0.3.27" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] futures-util = { version = "0.3" } -hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp"] } +hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp", "stream"] } gstuff = { version = "0.7", features = ["nightly"] } rustls = { version = "0.20", default-features = false } tokio = { version = "1.20" } tokio-rustls = { version = "0.23", default-features = false } + +[dev-dependencies] +mocktopus = "0.8.0" diff --git a/mm2src/mm2_net/src/lib.rs b/mm2src/mm2_net/src/lib.rs index 99935bd25b..ed70e093fa 100644 --- a/mm2src/mm2_net/src/lib.rs +++ b/mm2src/mm2_net/src/lib.rs @@ -1,8 +1,11 @@ pub mod grpc_web; +pub mod p2p; pub mod transport; #[cfg(not(target_arch = "wasm32"))] pub mod ip_addr; #[cfg(not(target_arch = "wasm32"))] pub mod native_http; #[cfg(not(target_arch = "wasm32"))] pub mod native_tls; +#[cfg(not(target_arch = "wasm32"))] pub mod network_event; +#[cfg(not(target_arch = "wasm32"))] pub mod sse_handler; #[cfg(target_arch = "wasm32")] pub mod wasm_http; #[cfg(target_arch = "wasm32")] pub mod wasm_ws; diff --git a/mm2src/mm2_net/src/network_event.rs b/mm2src/mm2_net/src/network_event.rs new file mode 100644 index 0000000000..6dd5ee4396 --- /dev/null +++ b/mm2src/mm2_net/src/network_event.rs @@ -0,0 +1,61 @@ +use crate::p2p::P2PContext; +use async_trait::async_trait; +use common::{executor::{SpawnFuture, Timer}, + log::info}; +use mm2_core::mm_ctx::MmArc; +pub use mm2_event_stream::behaviour::EventBehaviour; +use mm2_event_stream::{Event, EventStreamConfiguration}; +use mm2_libp2p::atomicdex_behaviour; +use serde_json::json; + +pub struct NetworkEvent { + ctx: MmArc, +} + +impl NetworkEvent { + pub fn new(ctx: MmArc) -> Self { Self { ctx } } +} + +#[async_trait] +impl EventBehaviour for NetworkEvent { + const EVENT_NAME: &'static str = "NETWORK"; + + async fn handle(self, interval: f64) { + let p2p_ctx = P2PContext::fetch_from_mm_arc(&self.ctx); + + loop { + let p2p_cmd_tx = p2p_ctx.cmd_tx.lock().clone(); + + let peers_info = atomicdex_behaviour::get_peers_info(p2p_cmd_tx.clone()).await; + let gossip_mesh = atomicdex_behaviour::get_gossip_mesh(p2p_cmd_tx.clone()).await; + let gossip_peer_topics = atomicdex_behaviour::get_gossip_peer_topics(p2p_cmd_tx.clone()).await; + let gossip_topic_peers = atomicdex_behaviour::get_gossip_topic_peers(p2p_cmd_tx.clone()).await; + let relay_mesh = atomicdex_behaviour::get_relay_mesh(p2p_cmd_tx).await; + + let event_data = json!({ + "peers_info": peers_info, + "gossip_mesh": gossip_mesh, + "gossip_peer_topics": gossip_peer_topics, + "gossip_topic_peers": gossip_topic_peers, + "relay_mesh": relay_mesh, + }); + + self.ctx + .stream_channel_controller + .broadcast(Event::new(Self::EVENT_NAME.to_string(), event_data.to_string())) + .await; + + Timer::sleep(interval).await; + } + } + + fn spawn_if_active(self, config: &EventStreamConfiguration) { + if let Some(event) = config.get_event(Self::EVENT_NAME) { + info!( + "NETWORK event is activated with {} seconds interval.", + event.stream_interval_seconds + ); + self.ctx.spawner().spawn(self.handle(event.stream_interval_seconds)); + } + } +} diff --git a/mm2src/mm2_net/src/p2p.rs b/mm2src/mm2_net/src/p2p.rs new file mode 100644 index 0000000000..74ee9dd9f1 --- /dev/null +++ b/mm2src/mm2_net/src/p2p.rs @@ -0,0 +1,40 @@ +use mm2_core::mm_ctx::MmArc; +use mm2_libp2p::atomicdex_behaviour::AdexCmdTx; +#[cfg(test)] use mocktopus::macros::*; +use parking_lot::Mutex; +use std::sync::Arc; + +pub struct P2PContext { + /// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message. + pub cmd_tx: Mutex, +} + +// `mockable` violates these +#[allow( + clippy::forget_ref, + clippy::forget_copy, + clippy::swap_ptr_to_ref, + clippy::forget_non_drop, + clippy::let_unit_value +)] +#[cfg_attr(test, mockable)] +impl P2PContext { + pub fn new(cmd_tx: AdexCmdTx) -> Self { + P2PContext { + cmd_tx: Mutex::new(cmd_tx), + } + } + + pub fn store_to_mm_arc(self, ctx: &MmArc) { *ctx.p2p_ctx.lock().unwrap() = Some(Arc::new(self)) } + + pub fn fetch_from_mm_arc(ctx: &MmArc) -> Arc { + ctx.p2p_ctx + .lock() + .unwrap() + .as_ref() + .unwrap() + .clone() + .downcast() + .unwrap() + } +} diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs new file mode 100644 index 0000000000..f57018ca04 --- /dev/null +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -0,0 +1,78 @@ +use hyper::{body::Bytes, Body, Request, Response}; +use mm2_core::mm_ctx::MmArc; +use serde_json::json; +use std::convert::Infallible; + +pub const SSE_ENDPOINT: &str = "/event-stream"; + +/// Handles broadcasted messages from `mm2_event_stream` continuously. +pub async fn handle_sse(request: Request, ctx_h: u32) -> Result, Infallible> { + // This is only called once for per client on the initialization, + // meaning this is not a resource intensive computation. + let ctx = match MmArc::from_ffi_handle(ctx_h) { + Ok(ctx) => ctx, + Err(err) => return handle_internal_error(err).await, + }; + + let config = match &ctx.event_stream_configuration { + Some(config) => config, + None => { + return handle_internal_error( + "Event stream configuration couldn't be found. This should never happen.".to_string(), + ) + .await + }, + }; + + let filtered_events = request + .uri() + .query() + .and_then(|query| { + query + .split('&') + .find(|param| param.starts_with("filter=")) + .map(|param| param.trim_start_matches("filter=")) + }) + .map_or(Vec::new(), |events_param| { + events_param.split(',').map(|event| event.to_string()).collect() + }); + + let mut channel_controller = ctx.stream_channel_controller.clone(); + let mut rx = channel_controller.create_channel(config.total_active_events()); + let body = Body::wrap_stream(async_stream::stream! { + while let Some(event) = rx.recv().await { + // If there are no filtered events, that means we want to + // stream out all the events. + if filtered_events.is_empty() || filtered_events.contains(&event.event_type().to_owned()) { + let data = json!({ + "_type": event.event_type(), + "message": event.message(), + }); + + yield Ok::<_, hyper::Error>(Bytes::from(format!("data: {data} \n\n"))); + } + } + }); + + let response = Response::builder() + .status(200) + .header("Content-Type", "text/event-stream") + .header("Cache-Control", "no-cache") + .header("Access-Control-Allow-Origin", &config.access_control_allow_origin) + .body(body); + + match response { + Ok(res) => Ok(res), + Err(err) => return handle_internal_error(err.to_string()).await, + } +} + +/// Fallback function for handling errors in SSE connections +async fn handle_internal_error(message: String) -> Result, Infallible> { + let response = Response::builder() + .status(500) + .body(Body::from(message)) + .expect("Returning 500 should never fail."); + + Ok(response) +}