From 9f74dcf58cdbf5059cef15951355f1cfae3eb954 Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 13:49:14 +0300 Subject: [PATCH 01/19] implement stremaing infrastructure Signed-off-by: ozkanonur --- Cargo.lock | 14 ++ Cargo.toml | 17 +- mm2src/mm2_core/Cargo.toml | 3 +- mm2src/mm2_core/src/mm_ctx.rs | 4 + mm2src/mm2_event_stream/Cargo.toml | 14 ++ mm2src/mm2_event_stream/src/controller.rs | 196 ++++++++++++++++++++++ mm2src/mm2_event_stream/src/lib.rs | 19 +++ mm2src/mm2_main/Cargo.toml | 2 + mm2src/mm2_main/src/rpc.rs | 9 + mm2src/mm2_main/src/sse.rs | 47 ++++++ 10 files changed, 316 insertions(+), 9 deletions(-) create mode 100644 mm2src/mm2_event_stream/Cargo.toml create mode 100644 mm2src/mm2_event_stream/src/controller.rs create mode 100644 mm2src/mm2_event_stream/src/lib.rs create mode 100644 mm2src/mm2_main/src/sse.rs diff --git a/Cargo.lock b/Cargo.lock index 8bdd4d52cd..236fffa37a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4114,6 +4114,7 @@ dependencies = [ "gstuff", "hex 0.4.3", "lazy_static", + "mm2_event_stream", "mm2_metrics", "mm2_rpc", "primitives", @@ -4182,6 +4183,17 @@ dependencies = [ "web3", ] +[[package]] +name = "mm2_event_stream" +version = "0.1.0" +dependencies = [ + "async-stream", + "parking_lot 0.12.0", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "mm2_git" version = "0.1.0" @@ -4237,6 +4249,7 @@ name = "mm2_main" version = "0.1.0" dependencies = [ "async-std", + "async-stream", "async-trait", "bitcrypto", "blake2", @@ -4277,6 +4290,7 @@ dependencies = [ "mm2_core", "mm2_db", "mm2_err_handle", + "mm2_event_stream", "mm2_gui_storage", "mm2_io", "mm2_metrics", diff --git a/Cargo.toml b/Cargo.toml index 7a92ac1426..7da3bcb1a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,43 +1,44 @@ [workspace] members = [ + "mm2src/coins_activation", "mm2src/coins", "mm2src/coins/utxo_signer", - "mm2src/coins_activation", "mm2src/common/shared_ref_counter", "mm2src/crypto", "mm2src/db_common", "mm2src/derives/enum_from", - "mm2src/derives/ser_error", "mm2src/derives/ser_error_derive", + "mm2src/derives/ser_error", "mm2src/floodsub", "mm2src/gossipsub", - "mm2src/mm2_gui_storage", "mm2src/hw_common", "mm2src/mm2_bin_lib", - "mm2src/mm2_bitcoin/crypto", "mm2src/mm2_bitcoin/chain", + "mm2src/mm2_bitcoin/crypto", "mm2src/mm2_bitcoin/keys", - "mm2src/mm2_bitcoin/rpc", "mm2src/mm2_bitcoin/primitives", + "mm2src/mm2_bitcoin/rpc", "mm2src/mm2_bitcoin/script", - "mm2src/mm2_bitcoin/serialization", "mm2src/mm2_bitcoin/serialization_derive", + "mm2src/mm2_bitcoin/serialization", "mm2src/mm2_bitcoin/test_helpers", "mm2src/mm2_core", "mm2src/mm2_db", "mm2src/mm2_err_handle", "mm2src/mm2_eth", + "mm2src/mm2_event_stream", "mm2src/mm2_git", + "mm2src/mm2_gui_storage", "mm2src/mm2_io", "mm2src/mm2_libp2p", + "mm2src/mm2_main", "mm2src/mm2_metamask", "mm2src/mm2_metrics", - "mm2src/mm2_main", "mm2src/mm2_net", "mm2src/mm2_number", "mm2src/mm2_rpc", - "mm2src/rpc_task", "mm2src/mm2_test_helpers", + "mm2src/rpc_task", "mm2src/trezor", ] diff --git a/mm2src/mm2_core/Cargo.toml b/mm2src/mm2_core/Cargo.toml index b8e34b34e4..566d778b8b 100644 --- a/mm2src/mm2_core/Cargo.toml +++ b/mm2src/mm2_core/Cargo.toml @@ -7,8 +7,8 @@ edition = "2021" doctest = false [dependencies] -async-trait = "0.1" arrayref = "0.3" +async-trait = "0.1" cfg-if = "1.0" common = { path = "../common" } db_common = { path = "../db_common" } @@ -16,6 +16,7 @@ derive_more = "0.99" futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] } hex = "0.4.2" lazy_static = "1.4" +mm2_event_stream = { path = "../mm2_event_stream" } mm2_metrics = { path = "../mm2_metrics" } primitives = { path = "../mm2_bitcoin/primitives" } rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 3717b76606..49b570d21c 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -6,6 +6,7 @@ use common::log::{self, LogLevel, LogOnError, LogState}; use common::{cfg_native, cfg_wasm32, small_rng}; use gstuff::{try_s, Constructible, ERR, ERRL}; use lazy_static::lazy_static; +use mm2_event_stream::{controller::Controller, Event}; use mm2_metrics::{MetricsArc, MetricsOps}; use primitives::hash::H160; use rand::Rng; @@ -72,6 +73,8 @@ pub struct MmCtx { pub initialized: Constructible, /// True if the RPC HTTP server was started. pub rpc_started: Constructible, + /// Channels for continuously streaming data to clients via SSE. + pub stream_channel_controller: Controller, /// True if the MarketMaker instance needs to stop. pub stop: Constructible, /// Unique context identifier, allowing us to more easily pass the context through the FFI boundaries. @@ -133,6 +136,7 @@ impl MmCtx { metrics: MetricsArc::new(), initialized: Constructible::default(), rpc_started: Constructible::default(), + stream_channel_controller: Controller::new(), stop: Constructible::default(), ffi_handle: Constructible::default(), ordermatch_ctx: Mutex::new(None), diff --git a/mm2src/mm2_event_stream/Cargo.toml b/mm2src/mm2_event_stream/Cargo.toml new file mode 100644 index 0000000000..6a16a7829f --- /dev/null +++ b/mm2src/mm2_event_stream/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "mm2_event_stream" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-stream = "0.3" +parking_lot = "0.12" +serde = { version = "1", features = ["derive", "rc"] } +serde_json = "1" +tokio = { version = "1", features = ["sync"] } + +[dev-dependencies] +tokio = { version = "1", features = ["sync", "macros", "time", "rt"] } diff --git a/mm2src/mm2_event_stream/src/controller.rs b/mm2src/mm2_event_stream/src/controller.rs new file mode 100644 index 0000000000..e5fa105966 --- /dev/null +++ b/mm2src/mm2_event_stream/src/controller.rs @@ -0,0 +1,196 @@ +use parking_lot::Mutex; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc::{self, Receiver, Sender}; + +type ChannelId = u64; + +/// Root controller of streaming channels +pub struct Controller(Arc>>); + +impl Clone for Controller { + fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } +} + +/// Inner part of the controller +pub struct ChannelsInner { + last_id: u64, + channels: HashMap>, +} + +struct Channel { + tx: Sender>, +} + +/// guard to trace channels disconnection +pub struct ChannelGuard { + channel_id: ChannelId, + controller: Controller, +} + +/// Receiver to cleanup resources on `Drop` +pub struct GuardedReceiver { + rx: Receiver>, + #[allow(dead_code)] + guard: ChannelGuard, +} + +impl Controller { + /// Creates a new channels controller + pub fn new() -> Self { Default::default() } + + /// Creates a new channel and returns it's events receiver + pub fn create_channel(&mut self, concurrency: usize) -> GuardedReceiver { + let (tx, rx) = mpsc::channel::>(concurrency); + let channel = Channel { tx }; + + let mut inner = self.0.lock(); + let channel_id = inner.last_id.overflowing_add(1).0; + inner.channels.insert(channel_id, channel); + inner.last_id = channel_id; + + let guard = ChannelGuard::new(channel_id, self.clone()); + GuardedReceiver { rx, guard } + } + + /// Returns number of active channels + pub fn num_connections(&self) -> usize { self.0.lock().channels.len() } + + /// Broadcast message to all channels + pub async fn broadcast(&self, message: M) { + let msg = Arc::new(message); + for rx in self.all_senders() { + rx.send(Arc::clone(&msg)).await.ok(); + } + } + + /// Removes the channel from the controller + fn remove_channel(&mut self, channel_id: &ChannelId) { + let mut inner = self.0.lock(); + inner.channels.remove(channel_id); + } + + /// Returns all the active channels + fn all_senders(&self) -> Vec>> { self.0.lock().channels.values().map(|c| c.tx.clone()).collect() } +} + +impl Default for Controller { + fn default() -> Self { + let inner = ChannelsInner { + last_id: 0, + channels: HashMap::new(), + }; + Self(Arc::new(Mutex::new(inner))) + } +} + +impl ChannelGuard { + fn new(channel_id: ChannelId, controller: Controller) -> Self { Self { channel_id, controller } } +} + +impl Drop for ChannelGuard { + fn drop(&mut self) { self.controller.remove_channel(&self.channel_id); } +} + +impl GuardedReceiver { + /// Receives the next event from the channel + pub async fn recv(&mut self) -> Option> { self.rx.recv().await } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{sleep, Duration}; + + #[tokio::test] + async fn test_create_channel_and_broadcast() { + let mut controller = Controller::new(); + let mut guard_receiver = controller.create_channel(1); + + controller.broadcast("Message".to_string()).await; + + let received_msg = guard_receiver.recv().await.unwrap(); + assert_eq!(*received_msg, "Message".to_string()); + } + + #[tokio::test] + async fn test_multiple_channels_and_broadcast() { + let mut controller = Controller::new(); + + let mut receivers = Vec::new(); + for _ in 0..3 { + receivers.push(controller.create_channel(1)); + } + + controller.broadcast("Message".to_string()).await; + + for receiver in &mut receivers { + let received_msg = receiver.recv().await.unwrap(); + assert_eq!(*received_msg, "Message".to_string()); + } + } + + #[tokio::test] + async fn test_channel_cleanup_on_drop() { + let mut controller: Controller<()> = Controller::new(); + let guard_receiver = controller.create_channel(1); + + assert_eq!(controller.num_connections(), 1); + + drop(guard_receiver); + + sleep(Duration::from_millis(10)).await; // Give time for the drop to execute + + assert_eq!(controller.num_connections(), 0); + } + + #[tokio::test] + async fn test_broadcast_across_channels() { + let mut controller = Controller::new(); + + let mut receivers = Vec::new(); + for _ in 0..3 { + receivers.push(controller.create_channel(1)); + } + + controller.broadcast("Message".to_string()).await; + + for receiver in &mut receivers { + let received_msg = receiver.recv().await.unwrap(); + assert_eq!(*received_msg, "Message".to_string()); + } + } + + #[tokio::test] + async fn test_multiple_messages_and_drop() { + let mut controller = Controller::new(); + let mut guard_receiver = controller.create_channel(6); + + controller.broadcast("Message 1".to_string()).await; + controller.broadcast("Message 2".to_string()).await; + controller.broadcast("Message 3".to_string()).await; + controller.broadcast("Message 4".to_string()).await; + controller.broadcast("Message 5".to_string()).await; + controller.broadcast("Message 6".to_string()).await; + + let mut received_msgs = Vec::new(); + for _ in 0..6 { + let received_msg = guard_receiver.recv().await.unwrap(); + received_msgs.push(received_msg); + } + + assert_eq!(*received_msgs[0], "Message 1".to_string()); + assert_eq!(*received_msgs[1], "Message 2".to_string()); + assert_eq!(*received_msgs[2], "Message 3".to_string()); + assert_eq!(*received_msgs[3], "Message 4".to_string()); + assert_eq!(*received_msgs[4], "Message 5".to_string()); + assert_eq!(*received_msgs[5], "Message 6".to_string()); + + // Consume the GuardedReceiver to trigger drop and channel cleanup + drop(guard_receiver); + + // Sleep for a short time to allow cleanup to complete + sleep(Duration::from_millis(10)).await; + + assert_eq!(controller.num_connections(), 0); + } +} diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs new file mode 100644 index 0000000000..23b7c9478f --- /dev/null +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -0,0 +1,19 @@ +use serde::{Deserialize, Serialize}; + +/// multi-purpose/generic event type that can easily be used over the event streaming +#[derive(Debug, Deserialize, Serialize)] +pub struct Event { + _type: String, + message: String, +} + +impl Event { + pub fn new(event_type: String, message: String) -> Self { + Self { + _type: event_type, + message, + } + } +} + +pub mod controller; diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index f1258d1827..f97253ca12 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -24,6 +24,7 @@ default = [] [dependencies] async-std = { version = "1.5", features = ["unstable"] } async-trait = "0.1" +async-stream = "0.3" bitcrypto = { path = "../mm2_bitcoin/crypto" } blake2 = "0.10.6" bytes = "0.4" @@ -57,6 +58,7 @@ lazy_static = "1.4" libc = "0.2" mm2_core = { path = "../mm2_core" } mm2_err_handle = { path = "../mm2_err_handle" } +mm2_event_stream = { path = "../mm2_event_stream"} mm2_gui_storage = { path = "../mm2_gui_storage" } mm2_io = { path = "../mm2_io" } mm2-libp2p = { path = "../mm2_libp2p" } diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index 8ca80d5274..456f99be1e 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -21,6 +21,7 @@ // use crate::mm2::rpc::rate_limiter::RateLimitError; +use crate::mm2::rpc::sse::{handle_sse_events, SSE_ENDPOINT}; use common::log::{error, info}; use common::{err_to_rpc_json_string, err_tp_rpc_json, HttpStatusCode, APPLICATION_JSON}; use derive_more::Display; @@ -47,6 +48,7 @@ mod dispatcher_legacy; #[path = "rpc/lp_commands/lp_commands_legacy.rs"] pub mod lp_commands_legacy; #[path = "rpc/rate_limiter.rs"] mod rate_limiter; +mod sse; /// Lists the RPC method not requiring the "userpass" authentication. /// None is also public to skip auth and display proper error in case of method is missing @@ -301,6 +303,8 @@ async fn rpc_service(req: Request, ctx_h: u32, client: SocketAddr) -> Resp Response::from_parts(parts, Body::from(body_escaped)) } +// TODO: This should exclude TCP internals, as including them results in having to +// handle various protocols within this function. #[cfg(not(target_arch = "wasm32"))] pub extern "C" fn spawn_rpc(ctx_h: u32) { use common::now_sec; @@ -353,6 +357,11 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { let make_svc_fut = move |remote_addr: SocketAddr| async move { Ok::<_, Infallible>(service_fn(move |req: Request| async move { + if req.uri().path() == SSE_ENDPOINT { + let res = handle_sse_events(ctx_h).await?; + return Ok::<_, Infallible>(res); + } + let res = rpc_service(req, ctx_h, remote_addr).await; Ok::<_, Infallible>(res) })) diff --git a/mm2src/mm2_main/src/sse.rs b/mm2src/mm2_main/src/sse.rs new file mode 100644 index 0000000000..3678333d32 --- /dev/null +++ b/mm2src/mm2_main/src/sse.rs @@ -0,0 +1,47 @@ +// TODO: handle this module inside the `mm2_event_stream` crate. + +use hyper::{body::Bytes, Body, Response}; +use mm2_core::mm_ctx::MmArc; +use std::convert::Infallible; + +pub(crate) const SSE_ENDPOINT: &str = "/event-stream"; + +/// Handles broadcasted messages from `mm2_event_stream` continuously. +pub async fn handle_sse_events(ctx_h: u32) -> Result, Infallible> { + // TODO: Query events from request and only stream the requested ones. + + let ctx = match MmArc::from_ffi_handle(ctx_h) { + Ok(ctx) => ctx, + Err(err) => return handle_internal_error(err).await, + }; + + let mut channel_controller = ctx.stream_channel_controller.clone(); + let mut rx = channel_controller.create_channel(1); // TODO: read this from configuration + let body = Body::wrap_stream(async_stream::stream! { + while let Some(msg) = rx.recv().await { + let Ok(json) = serde_json::to_string(&msg) else { continue }; // TODO: This is not a good idea. Refactor the event type. + yield Ok::<_, hyper::Error>(Bytes::from(format!("data: {json} \n\n"))); + } + }); + + let response = Response::builder() + .status(200) + .header("Content-Type", "text/event-stream") + .header("Cache-Control", "no-cache") + .header("Access-Control-Allow-Origin", "*") // TODO: read this from configuration + .body(body); + + match response { + Ok(res) => Ok(res), + Err(err) => return handle_internal_error(err.to_string()).await, + } +} + +async fn handle_internal_error(message: String) -> Result, Infallible> { + let response = Response::builder() + .status(500) + .body(Body::from(message)) + .expect("Returning 500 should never fail."); + + Ok(response) +} From 8e688c18f363fc62118d68a28eb4adf2ff820e43 Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 13:49:41 +0300 Subject: [PATCH 02/19] create basic client for SSE testing Signed-off-by: ozkanonur --- examples/sse/index.html | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 examples/sse/index.html diff --git a/examples/sse/index.html b/examples/sse/index.html new file mode 100644 index 0000000000..1c17c4db01 --- /dev/null +++ b/examples/sse/index.html @@ -0,0 +1,26 @@ + + + + + +

Events

+
+ + + + + + \ No newline at end of file From 6c1998d46f4444327eb7ef05777043e7bd01d36e Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 13:58:52 +0300 Subject: [PATCH 03/19] update examples README Signed-off-by: ozkanonur --- examples/wasm/README.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/examples/wasm/README.md b/examples/wasm/README.md index 6f2faff136..30b3e1146e 100644 --- a/examples/wasm/README.md +++ b/examples/wasm/README.md @@ -1,4 +1,5 @@ -# AtomicDEX-API WASM example +# Examples +## komodo-defi-framework WASM example **wasm_build** is an example of using **MarketMaker2** in webpages via [WebAssembly](https://developer.mozilla.org/en-US/docs/WebAssembly) @@ -16,3 +17,19 @@ via [WebAssembly](https://developer.mozilla.org/en-US/docs/WebAssembly) Read more about [running a simple local HTTP server](https://developer.mozilla.org/en-US/docs/Learn/Common_questions/set_up_a_local_testing_server#running_a_simple_local_http_server) 3. Open webpage in your browser http://localhost:8000/wasm_build/index.html + +## Listening event-stream from komodo-defi-framework + +1. Sart komodo-defi-framework with event streaming activated +2. Change directory to `sse`. +3. Run a local HTTP server + - if you use Python 3, run: + ``` + python3 -m http.server 8000 + ``` + - if you use Python 2, run: + ``` + python -m SimpleHTTPServer 8000 + ``` + +You should now be able to observe events from the komodo-defi-framework through the SSE. \ No newline at end of file From 5ee822b28d72690ea361b1d8198bb557a24961ce Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 22:29:30 +0300 Subject: [PATCH 04/19] implement client-side event filtering Signed-off-by: ozkanonur --- mm2src/mm2_event_stream/src/lib.rs | 4 ++++ mm2src/mm2_main/src/rpc.rs | 2 +- mm2src/mm2_main/src/sse.rs | 25 ++++++++++++++++++++----- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index 23b7c9478f..b2fbfdee6a 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -14,6 +14,10 @@ impl Event { message, } } + + pub fn event_type(&self) -> &str { &self._type } + + pub fn message(&self) -> &str { &self.message } } pub mod controller; diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index 456f99be1e..5ade053771 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -358,7 +358,7 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { let make_svc_fut = move |remote_addr: SocketAddr| async move { Ok::<_, Infallible>(service_fn(move |req: Request| async move { if req.uri().path() == SSE_ENDPOINT { - let res = handle_sse_events(ctx_h).await?; + let res = handle_sse_events(req, ctx_h).await?; return Ok::<_, Infallible>(res); } diff --git a/mm2src/mm2_main/src/sse.rs b/mm2src/mm2_main/src/sse.rs index 3678333d32..4dfaeca5d6 100644 --- a/mm2src/mm2_main/src/sse.rs +++ b/mm2src/mm2_main/src/sse.rs @@ -1,26 +1,41 @@ // TODO: handle this module inside the `mm2_event_stream` crate. -use hyper::{body::Bytes, Body, Response}; +use hyper::{body::Bytes, Body, Request, Response}; use mm2_core::mm_ctx::MmArc; use std::convert::Infallible; pub(crate) const SSE_ENDPOINT: &str = "/event-stream"; /// Handles broadcasted messages from `mm2_event_stream` continuously. -pub async fn handle_sse_events(ctx_h: u32) -> Result, Infallible> { - // TODO: Query events from request and only stream the requested ones. +pub async fn handle_sse_events(request: Request, ctx_h: u32) -> Result, Infallible> { + fn get_filtered_events(request: Request) -> Vec { + let query = request.uri().query().unwrap_or(""); + let events_param = query + .split('&') + .find(|param| param.starts_with("filter=")) + .map(|param| param.trim_start_matches("filter=")) + .unwrap_or(""); + + events_param.split(',').map(|event| event.to_string()).collect() + } let ctx = match MmArc::from_ffi_handle(ctx_h) { Ok(ctx) => ctx, Err(err) => return handle_internal_error(err).await, }; + let filtered_events = get_filtered_events(request); + let mut channel_controller = ctx.stream_channel_controller.clone(); let mut rx = channel_controller.create_channel(1); // TODO: read this from configuration let body = Body::wrap_stream(async_stream::stream! { while let Some(msg) = rx.recv().await { - let Ok(json) = serde_json::to_string(&msg) else { continue }; // TODO: This is not a good idea. Refactor the event type. - yield Ok::<_, hyper::Error>(Bytes::from(format!("data: {json} \n\n"))); + // If there are no filtered events, that means we want to + // stream out all the events. + if filtered_events.is_empty() || filtered_events.contains(&msg.event_type().to_owned()) { + let Ok(json) = serde_json::to_string(&msg) else { continue }; // TODO: This is not a good idea. Refactor the event type. + yield Ok::<_, hyper::Error>(Bytes::from(format!("data: {json} \n\n"))); + } } }); From b8e997aa4eebe511c768b18cec3fcc17261dfe57 Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 22:38:35 +0300 Subject: [PATCH 05/19] add explanatory comments in sse module Signed-off-by: ozkanonur --- mm2src/mm2_main/src/sse.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mm2src/mm2_main/src/sse.rs b/mm2src/mm2_main/src/sse.rs index 4dfaeca5d6..68426b9dff 100644 --- a/mm2src/mm2_main/src/sse.rs +++ b/mm2src/mm2_main/src/sse.rs @@ -19,6 +19,8 @@ pub async fn handle_sse_events(request: Request, ctx_h: u32) -> Result ctx, Err(err) => return handle_internal_error(err).await, @@ -52,6 +54,7 @@ pub async fn handle_sse_events(request: Request, ctx_h: u32) -> Result Result, Infallible> { let response = Response::builder() .status(500) From 04df80c1c68aba1f2431029e7cffc1bfecce9d09 Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 22:44:44 +0300 Subject: [PATCH 06/19] update SSE client example documentation Signed-off-by: ozkanonur --- examples/sse/README.md | 14 ++++++++++++++ examples/sse/index.html | 2 +- examples/wasm/README.md | 21 ++------------------- 3 files changed, 17 insertions(+), 20 deletions(-) create mode 100644 examples/sse/README.md diff --git a/examples/sse/README.md b/examples/sse/README.md new file mode 100644 index 0000000000..b43c213d02 --- /dev/null +++ b/examples/sse/README.md @@ -0,0 +1,14 @@ +# Listening event-stream from komodo-defi-framework + +1. Start komodo-defi-framework with event streaming activated +2. Run a local HTTP server + - if you use Python 3, run: + ``` + python3 -m http.server 8000 + ``` + - if you use Python 2, run: + ``` + python -m SimpleHTTPServer 8000 + ``` + +You should now be able to observe events from the komodo-defi-framework through the SSE. diff --git a/examples/sse/index.html b/examples/sse/index.html index 1c17c4db01..c064b7068d 100644 --- a/examples/sse/index.html +++ b/examples/sse/index.html @@ -23,4 +23,4 @@

Events

- \ No newline at end of file + diff --git a/examples/wasm/README.md b/examples/wasm/README.md index 30b3e1146e..849ff63773 100644 --- a/examples/wasm/README.md +++ b/examples/wasm/README.md @@ -1,5 +1,4 @@ -# Examples -## komodo-defi-framework WASM example +# AtomicDEX-API WASM example **wasm_build** is an example of using **MarketMaker2** in webpages via [WebAssembly](https://developer.mozilla.org/en-US/docs/WebAssembly) @@ -16,20 +15,4 @@ via [WebAssembly](https://developer.mozilla.org/en-US/docs/WebAssembly) ``` Read more about [running a simple local HTTP server](https://developer.mozilla.org/en-US/docs/Learn/Common_questions/set_up_a_local_testing_server#running_a_simple_local_http_server) -3. Open webpage in your browser http://localhost:8000/wasm_build/index.html - -## Listening event-stream from komodo-defi-framework - -1. Sart komodo-defi-framework with event streaming activated -2. Change directory to `sse`. -3. Run a local HTTP server - - if you use Python 3, run: - ``` - python3 -m http.server 8000 - ``` - - if you use Python 2, run: - ``` - python -m SimpleHTTPServer 8000 - ``` - -You should now be able to observe events from the komodo-defi-framework through the SSE. \ No newline at end of file +3. Open webpage in your browser http://localhost:8000/wasm_build/index.html \ No newline at end of file From fae50e43d8fba51533b5c89e0ba8a3853382e785 Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Wed, 23 Aug 2023 22:48:41 +0300 Subject: [PATCH 07/19] format wasm html example Signed-off-by: ozkanonur --- examples/wasm/index.html | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/examples/wasm/index.html b/examples/wasm/index.html index bd19234020..b9243b9bf8 100644 --- a/examples/wasm/index.html +++ b/examples/wasm/index.html @@ -1,27 +1,32 @@ + MM2 example + -
- -
- -
- - -
-
- -
- -
- -
+
+ +
+ +
+ + +
+
+ +
+ +
+ +
- + + \ No newline at end of file From b8eca5e4963c2fe4f6490edcef0b9860b8691761 Mon Sep 17 00:00:00 2001 From: ozkanonur Date: Thu, 24 Aug 2023 14:58:15 +0300 Subject: [PATCH 08/19] stream network events Signed-off-by: ozkanonur --- Cargo.lock | 9 +++-- examples/sse/index.html | 2 +- mm2src/mm2_event_stream/Cargo.toml | 2 - mm2src/mm2_main/Cargo.toml | 2 - mm2src/mm2_main/src/lp_native_dex.rs | 13 +++++- mm2src/mm2_main/src/lp_network.rs | 35 ++-------------- mm2src/mm2_main/src/ordermatch_tests.rs | 2 +- mm2src/mm2_main/src/rpc.rs | 11 ++--- .../src/rpc/lp_commands/lp_commands_legacy.rs | 6 +-- mm2src/mm2_net/Cargo.toml | 21 ++++++---- mm2src/mm2_net/src/lib.rs | 3 ++ mm2src/mm2_net/src/network_event.rs | 37 +++++++++++++++++ mm2src/mm2_net/src/p2p.rs | 40 +++++++++++++++++++ .../src/sse.rs => mm2_net/src/sse_handler.rs} | 14 ++++--- 14 files changed, 132 insertions(+), 65 deletions(-) create mode 100644 mm2src/mm2_net/src/network_event.rs create mode 100644 mm2src/mm2_net/src/p2p.rs rename mm2src/{mm2_main/src/sse.rs => mm2_net/src/sse_handler.rs} (85%) diff --git a/Cargo.lock b/Cargo.lock index 236fffa37a..ec2adc5a6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4187,10 +4187,8 @@ dependencies = [ name = "mm2_event_stream" version = "0.1.0" dependencies = [ - "async-stream", "parking_lot 0.12.0", "serde", - "serde_json", "tokio", ] @@ -4249,7 +4247,6 @@ name = "mm2_main" version = "0.1.0" dependencies = [ "async-std", - "async-stream", "async-trait", "bitcrypto", "blake2", @@ -4290,7 +4287,6 @@ dependencies = [ "mm2_core", "mm2_db", "mm2_err_handle", - "mm2_event_stream", "mm2_gui_storage", "mm2_io", "mm2_metrics", @@ -4383,6 +4379,7 @@ dependencies = [ name = "mm2_net" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "bytes 1.1.0", "cfg-if 1.0.0", @@ -4396,8 +4393,12 @@ dependencies = [ "hyper", "js-sys", "lazy_static", + "mm2-libp2p", "mm2_core", "mm2_err_handle", + "mm2_event_stream", + "mocktopus", + "parking_lot 0.12.0", "prost", "rand 0.7.3", "rustls 0.20.4", diff --git a/examples/sse/index.html b/examples/sse/index.html index c064b7068d..e780004ccb 100644 --- a/examples/sse/index.html +++ b/examples/sse/index.html @@ -13,7 +13,7 @@

Events

}); source.onmessage = function (event) { var currentDatetime = new Date().toLocaleString(); - var eventData = currentDatetime + ": " + event.data + "
"; + var eventData = currentDatetime + ": " + event.data + "
"; document.getElementById("result").insertAdjacentHTML("afterbegin", eventData); }; } else { diff --git a/mm2src/mm2_event_stream/Cargo.toml b/mm2src/mm2_event_stream/Cargo.toml index 6a16a7829f..8329355c96 100644 --- a/mm2src/mm2_event_stream/Cargo.toml +++ b/mm2src/mm2_event_stream/Cargo.toml @@ -4,10 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -async-stream = "0.3" parking_lot = "0.12" serde = { version = "1", features = ["derive", "rc"] } -serde_json = "1" tokio = { version = "1", features = ["sync"] } [dev-dependencies] diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index f97253ca12..f1258d1827 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -24,7 +24,6 @@ default = [] [dependencies] async-std = { version = "1.5", features = ["unstable"] } async-trait = "0.1" -async-stream = "0.3" bitcrypto = { path = "../mm2_bitcoin/crypto" } blake2 = "0.10.6" bytes = "0.4" @@ -58,7 +57,6 @@ lazy_static = "1.4" libc = "0.2" mm2_core = { path = "../mm2_core" } mm2_err_handle = { path = "../mm2_err_handle" } -mm2_event_stream = { path = "../mm2_event_stream"} mm2_gui_storage = { path = "../mm2_gui_storage" } mm2_io = { path = "../mm2_io" } mm2-libp2p = { path = "../mm2_libp2p" } diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index d11a47a2ca..30da1d6b82 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -31,6 +31,7 @@ use mm2_err_handle::prelude::*; use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SwarmRuntime, WssCerts}; use mm2_metrics::mm_gauge; +use mm2_net::p2p::P2PContext; use rpc_task::RpcTaskError; use serde_json::{self as json}; use std::fs; @@ -42,7 +43,7 @@ use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use crate::mm2::database::init_and_migrate_db; use crate::mm2::lp_message_service::{init_message_service, InitMessageServiceError}; -use crate::mm2::lp_network::{lp_network_ports, p2p_event_process_loop, NetIdError, P2PContext}; +use crate::mm2::lp_network::{lp_network_ports, p2p_event_process_loop, NetIdError}; use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context, lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError}; @@ -52,6 +53,7 @@ use crate::mm2::rpc::spawn_rpc; cfg_native! { use mm2_io::fs::{ensure_dir_is_writable, ensure_file_is_writable}; use mm2_net::ip_addr::myipaddr; + use mm2_net::network_event::start_network_event_stream; use db_common::sqlite::rusqlite::Error as SqlError; } @@ -410,11 +412,18 @@ pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> { // an order and start new swap that might get started 2 times because of kick-start kick_start(ctx.clone()).await?; + #[cfg(not(target_arch = "wasm32"))] + { + // TODO: this should be configurable from MM2 the config + ctx.spawner().spawn(start_network_event_stream(ctx.clone())); + } + ctx.spawner().spawn(lp_ordermatch_loop(ctx.clone())); ctx.spawner().spawn(broadcast_maker_orders_keep_alive_loop(ctx.clone())); ctx.spawner().spawn(clean_memory_loop(ctx.weak())); + Ok(()) } @@ -442,11 +451,13 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes spawn_rpc(ctx_id); let ctx_c = ctx.clone(); + ctx.spawner().spawn(async move { if let Err(err) = ctx_c.init_metrics() { warn!("Couldn't initialize metrics system: {}", err); } }); + // In the mobile version we might depend on `lp_init` staying around until the context stops. loop { if ctx.is_stopping() { diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 7616429bf0..b5b97a91ac 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -1,3 +1,5 @@ +// TODO: a lof of these implementations should be handled in `mm2_net` + /****************************************************************************** * Copyright © 2022 Atomic Private Limited and its contributors * * * @@ -27,17 +29,15 @@ use instant::Instant; use keys::KeyPair; use mm2_core::mm_ctx::{MmArc, MmWeak}; use mm2_err_handle::prelude::*; -use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse, +use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse, AdexResponseChannel}; use mm2_libp2p::peers_exchange::PeerAddresses; use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR}; use mm2_metrics::{mm_label, mm_timing}; -#[cfg(test)] use mocktopus::macros::*; -use parking_lot::Mutex as PaMutex; +use mm2_net::p2p::P2PContext; use serde::de; use std::net::ToSocketAddrs; -use std::sync::Arc; use crate::mm2::lp_ordermatch; use crate::mm2::{lp_stats, lp_swap}; @@ -78,33 +78,6 @@ pub enum P2PRequest { NetworkInfo(lp_stats::NetworkInfoRequest), } -pub struct P2PContext { - /// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message. - pub cmd_tx: PaMutex, -} - -#[cfg_attr(test, mockable)] -impl P2PContext { - pub fn new(cmd_tx: AdexCmdTx) -> Self { - P2PContext { - cmd_tx: PaMutex::new(cmd_tx), - } - } - - pub fn store_to_mm_arc(self, ctx: &MmArc) { *ctx.p2p_ctx.lock().unwrap() = Some(Arc::new(self)) } - - pub fn fetch_from_mm_arc(ctx: &MmArc) -> Arc { - ctx.p2p_ctx - .lock() - .unwrap() - .as_ref() - .unwrap() - .clone() - .downcast() - .unwrap() - } -} - pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay: bool) { loop { let adex_event = rx.next().await; diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index 6f2af8a757..ec96d8f484 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -1,5 +1,4 @@ use super::*; -use crate::mm2::lp_network::P2PContext; use crate::mm2::lp_ordermatch::new_protocol::{MakerOrderUpdated, PubkeyKeepAlive}; use coins::{MmCoin, TestCoin}; use common::{block_on, executor::spawn}; @@ -9,6 +8,7 @@ use futures::{channel::mpsc, StreamExt}; use mm2_core::mm_ctx::{MmArc, MmCtx}; use mm2_libp2p::atomicdex_behaviour::AdexBehaviourCmd; use mm2_libp2p::{decode_message, PeerId}; +use mm2_net::p2p::P2PContext; use mm2_test_helpers::for_tests::mm_ctx_with_iguana; use mocktopus::mocking::*; use rand::{seq::SliceRandom, thread_rng, Rng}; diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index 5ade053771..1c06d76352 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -21,7 +21,6 @@ // use crate::mm2::rpc::rate_limiter::RateLimitError; -use crate::mm2::rpc::sse::{handle_sse_events, SSE_ENDPOINT}; use common::log::{error, info}; use common::{err_to_rpc_json_string, err_tp_rpc_json, HttpStatusCode, APPLICATION_JSON}; use derive_more::Display; @@ -29,8 +28,6 @@ use futures::future::{join_all, FutureExt}; use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, CONTENT_TYPE}; use http::request::Parts; use http::{Method, Request, Response, StatusCode}; -#[cfg(not(target_arch = "wasm32"))] -use hyper::{self, Body, Server}; use lazy_static::lazy_static; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; @@ -41,6 +38,11 @@ use serde_json::{self as json, Value as Json}; use std::borrow::Cow; use std::net::SocketAddr; +cfg_native! { + use hyper::{self, Body, Server}; + use mm2_net::sse_handler::{handle_sse, SSE_ENDPOINT}; +} + #[path = "rpc/dispatcher/dispatcher.rs"] mod dispatcher; #[path = "rpc/dispatcher/dispatcher_legacy.rs"] mod dispatcher_legacy; @@ -48,7 +50,6 @@ mod dispatcher_legacy; #[path = "rpc/lp_commands/lp_commands_legacy.rs"] pub mod lp_commands_legacy; #[path = "rpc/rate_limiter.rs"] mod rate_limiter; -mod sse; /// Lists the RPC method not requiring the "userpass" authentication. /// None is also public to skip auth and display proper error in case of method is missing @@ -358,7 +359,7 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { let make_svc_fut = move |remote_addr: SocketAddr| async move { Ok::<_, Infallible>(service_fn(move |req: Request| async move { if req.uri().path() == SSE_ENDPOINT { - let res = handle_sse_events(req, ctx_h).await?; + let res = handle_sse(req, ctx_h).await?; return Ok::<_, Infallible>(res); } diff --git a/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs b/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs index 7b5d68b8d0..e060175858 100644 --- a/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs +++ b/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs @@ -27,6 +27,7 @@ use futures::compat::Future01CompatExt; use http::Response; use mm2_core::mm_ctx::MmArc; use mm2_metrics::MetricsOps; +use mm2_net::p2p::P2PContext; use mm2_number::construct_detailed; use mm2_rpc::data::legacy::{BalanceResponse, CoinInitResponse, Mm2RpcResult, MmVersionResponse, Status}; use serde_json::{self as json, Value as Json}; @@ -306,7 +307,6 @@ pub fn version(ctx: MmArc) -> HyRes { } pub async fn get_peers_info(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_peers_info; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -319,7 +319,6 @@ pub async fn get_peers_info(ctx: MmArc) -> Result>, String> { } pub async fn get_gossip_mesh(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_mesh; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -332,7 +331,6 @@ pub async fn get_gossip_mesh(ctx: MmArc) -> Result>, String> { } pub async fn get_gossip_peer_topics(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_peer_topics; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -345,7 +343,6 @@ pub async fn get_gossip_peer_topics(ctx: MmArc) -> Result>, Str } pub async fn get_gossip_topic_peers(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_gossip_topic_peers; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); @@ -358,7 +355,6 @@ pub async fn get_gossip_topic_peers(ctx: MmArc) -> Result>, Str } pub async fn get_relay_mesh(ctx: MmArc) -> Result>, String> { - use crate::mm2::lp_network::P2PContext; use mm2_libp2p::atomicdex_behaviour::get_relay_mesh; let ctx = P2PContext::fetch_from_mm_arc(&ctx); let cmd_tx = ctx.cmd_tx.lock().clone(); diff --git a/mm2src/mm2_net/Cargo.toml b/mm2src/mm2_net/Cargo.toml index e567546f6c..44c22ce177 100644 --- a/mm2src/mm2_net/Cargo.toml +++ b/mm2src/mm2_net/Cargo.toml @@ -7,21 +7,25 @@ edition = "2018" doctest = false [dependencies] +async-stream = "0.3" async-trait = "0.1" -serde = "1" -serde_json = { version = "1", features = ["preserve_order", "raw_value"] } bytes = "1.1" cfg-if = "1.0" common = { path = "../common" } -ethkey = { git = "https://github.com/KomodoPlatform/mm2-parity-ethereum.git" } -mm2_err_handle = { path = "../mm2_err_handle" } -mm2_core = { path = "../mm2_core" } derive_more = "0.99" -http = "0.2" -rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } +ethkey = { git = "https://github.com/KomodoPlatform/mm2-parity-ethereum.git" } futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] } +http = "0.2" lazy_static = "1.4" +mm2_core = { path = "../mm2_core" } +mm2_err_handle = { path = "../mm2_err_handle" } +mm2_event_stream = { path = "../mm2_event_stream"} +mm2-libp2p = { path = "../mm2_libp2p" } +parking_lot = { version = "0.12.0", features = ["nightly"] } prost = "0.10" +rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] } +serde = "1" +serde_json = { version = "1", features = ["preserve_order", "raw_value"] } [target.'cfg(target_arch = "wasm32")'.dependencies] gstuff = { version = "0.7", features = ["nightly"] } @@ -38,3 +42,6 @@ gstuff = { version = "0.7", features = ["nightly"] } rustls = { version = "0.20", default-features = false } tokio = { version = "1.20" } tokio-rustls = { version = "0.23", default-features = false } + +[dev-dependencies] +mocktopus = "0.8.0" diff --git a/mm2src/mm2_net/src/lib.rs b/mm2src/mm2_net/src/lib.rs index 99935bd25b..ed70e093fa 100644 --- a/mm2src/mm2_net/src/lib.rs +++ b/mm2src/mm2_net/src/lib.rs @@ -1,8 +1,11 @@ pub mod grpc_web; +pub mod p2p; pub mod transport; #[cfg(not(target_arch = "wasm32"))] pub mod ip_addr; #[cfg(not(target_arch = "wasm32"))] pub mod native_http; #[cfg(not(target_arch = "wasm32"))] pub mod native_tls; +#[cfg(not(target_arch = "wasm32"))] pub mod network_event; +#[cfg(not(target_arch = "wasm32"))] pub mod sse_handler; #[cfg(target_arch = "wasm32")] pub mod wasm_http; #[cfg(target_arch = "wasm32")] pub mod wasm_ws; diff --git a/mm2src/mm2_net/src/network_event.rs b/mm2src/mm2_net/src/network_event.rs new file mode 100644 index 0000000000..bc8fbb1ed5 --- /dev/null +++ b/mm2src/mm2_net/src/network_event.rs @@ -0,0 +1,37 @@ +use common::executor::Timer; +use mm2_core::mm_ctx::MmArc; +use mm2_event_stream::Event; +use mm2_libp2p::atomicdex_behaviour; +use serde_json::json; + +use crate::p2p::P2PContext; + +const NETWORK_EVENT_TYPE: &str = "NETWORK"; + +pub async fn start_network_event_stream(ctx: MmArc) { + let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); + + loop { + let p2p_cmd_tx = p2p_ctx.cmd_tx.lock().clone(); + + let peers_info = atomicdex_behaviour::get_peers_info(p2p_cmd_tx.clone()).await; + let gossip_mesh = atomicdex_behaviour::get_gossip_mesh(p2p_cmd_tx.clone()).await; + let gossip_peer_topics = atomicdex_behaviour::get_gossip_peer_topics(p2p_cmd_tx.clone()).await; + let gossip_topic_peers = atomicdex_behaviour::get_gossip_topic_peers(p2p_cmd_tx.clone()).await; + let relay_mesh = atomicdex_behaviour::get_relay_mesh(p2p_cmd_tx).await; + + let event_data = json!({ + "peers_info": peers_info, + "gossip_mesh": gossip_mesh, + "gossip_peer_topics": gossip_peer_topics, + "gossip_topic_peers": gossip_topic_peers, + "relay_mesh": relay_mesh, + }); + + ctx.stream_channel_controller + .broadcast(Event::new(NETWORK_EVENT_TYPE.to_string(), event_data.to_string())) + .await; + + Timer::sleep(1.).await; // TODO: read this from configuration + } +} diff --git a/mm2src/mm2_net/src/p2p.rs b/mm2src/mm2_net/src/p2p.rs new file mode 100644 index 0000000000..74ee9dd9f1 --- /dev/null +++ b/mm2src/mm2_net/src/p2p.rs @@ -0,0 +1,40 @@ +use mm2_core::mm_ctx::MmArc; +use mm2_libp2p::atomicdex_behaviour::AdexCmdTx; +#[cfg(test)] use mocktopus::macros::*; +use parking_lot::Mutex; +use std::sync::Arc; + +pub struct P2PContext { + /// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message. + pub cmd_tx: Mutex, +} + +// `mockable` violates these +#[allow( + clippy::forget_ref, + clippy::forget_copy, + clippy::swap_ptr_to_ref, + clippy::forget_non_drop, + clippy::let_unit_value +)] +#[cfg_attr(test, mockable)] +impl P2PContext { + pub fn new(cmd_tx: AdexCmdTx) -> Self { + P2PContext { + cmd_tx: Mutex::new(cmd_tx), + } + } + + pub fn store_to_mm_arc(self, ctx: &MmArc) { *ctx.p2p_ctx.lock().unwrap() = Some(Arc::new(self)) } + + pub fn fetch_from_mm_arc(ctx: &MmArc) -> Arc { + ctx.p2p_ctx + .lock() + .unwrap() + .as_ref() + .unwrap() + .clone() + .downcast() + .unwrap() + } +} diff --git a/mm2src/mm2_main/src/sse.rs b/mm2src/mm2_net/src/sse_handler.rs similarity index 85% rename from mm2src/mm2_main/src/sse.rs rename to mm2src/mm2_net/src/sse_handler.rs index 68426b9dff..37b340654c 100644 --- a/mm2src/mm2_main/src/sse.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -1,13 +1,11 @@ -// TODO: handle this module inside the `mm2_event_stream` crate. - use hyper::{body::Bytes, Body, Request, Response}; use mm2_core::mm_ctx::MmArc; use std::convert::Infallible; -pub(crate) const SSE_ENDPOINT: &str = "/event-stream"; +pub const SSE_ENDPOINT: &str = "/event-stream"; /// Handles broadcasted messages from `mm2_event_stream` continuously. -pub async fn handle_sse_events(request: Request, ctx_h: u32) -> Result, Infallible> { +pub async fn handle_sse(request: Request, ctx_h: u32) -> Result, Infallible> { fn get_filtered_events(request: Request) -> Vec { let query = request.uri().query().unwrap_or(""); let events_param = query @@ -16,7 +14,11 @@ pub async fn handle_sse_events(request: Request, ctx_h: u32) -> Result, ctx_h: u32) -> Result Date: Mon, 28 Aug 2023 18:35:23 +0300 Subject: [PATCH 09/19] manage event streaming dynamically Signed-off-by: onur-ozkan --- mm2src/mm2_core/src/mm_ctx.rs | 17 +++++++++++++- mm2src/mm2_event_stream/src/lib.rs | 33 ++++++++++++++++++++++++++++ mm2src/mm2_main/src/lp_native_dex.rs | 21 ++++++++++++++---- mm2src/mm2_main/src/rpc.rs | 8 ++++--- mm2src/mm2_net/src/network_event.rs | 8 ++++--- mm2src/mm2_net/src/sse_handler.rs | 14 ++++++++++-- 6 files changed, 88 insertions(+), 13 deletions(-) diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 49b570d21c..52799ce6d6 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -6,7 +6,7 @@ use common::log::{self, LogLevel, LogOnError, LogState}; use common::{cfg_native, cfg_wasm32, small_rng}; use gstuff::{try_s, Constructible, ERR, ERRL}; use lazy_static::lazy_static; -use mm2_event_stream::{controller::Controller, Event}; +use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration}; use mm2_metrics::{MetricsArc, MetricsOps}; use primitives::hash::H160; use rand::Rng; @@ -341,6 +341,20 @@ impl MmCtx { .lock() .unwrap() } + + /// Reads 'event_stream_configuration' from the mm2 configuration. If the config wasn't given, + /// returns `None`. + pub fn event_stream_configuration(&self) -> Option { + let value = &self.conf["event_stream_configuration"]; + if value.is_null() { + return None; + } + + let config: EventStreamConfiguration = + json::from_value(value.clone()).expect("Invalid json value in 'event_stream_configuration'."); + + Some(config) + } } impl Default for MmCtx { @@ -681,6 +695,7 @@ impl MmCtxBuilder { let mut ctx = MmCtx::with_log_state(log); ctx.mm_version = self.version; ctx.datetime = self.datetime; + if let Some(conf) = self.conf { ctx.conf = conf } diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index b2fbfdee6a..fdc725f03b 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -20,4 +20,37 @@ impl Event { pub fn message(&self) -> &str { &self.message } } +/// Configuration for event streaming +#[derive(Deserialize)] +pub struct EventStreamConfiguration { + #[serde(default)] + pub access_control_allow_origin: String, + #[serde(default)] + pub active_events: Vec, +} + +#[derive(Clone, Default, Deserialize)] +pub struct EventStatus { + name: String, + pub stream_interval_seconds: f64, +} + +impl Default for EventStreamConfiguration { + fn default() -> Self { + Self { + access_control_allow_origin: String::from("*"), + active_events: vec![], + } + } +} + +impl EventStreamConfiguration { + pub fn get_event(&self, event_name: &str) -> Option { + self.active_events + .iter() + .find(|event| event.name == event_name) + .cloned() + } +} + pub mod controller; diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 30da1d6b82..5cc7131931 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -31,6 +31,7 @@ use mm2_err_handle::prelude::*; use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SwarmRuntime, WssCerts}; use mm2_metrics::mm_gauge; +use mm2_net::network_event::NETWORK_EVENT_TYPE; use mm2_net::p2p::P2PContext; use rpc_task::RpcTaskError; use serde_json::{self as json}; @@ -382,6 +383,21 @@ fn migrate_db(ctx: &MmArc) -> MmInitResult<()> { #[cfg(not(target_arch = "wasm32"))] fn migration_1(_ctx: &MmArc) {} +#[cfg(not(target_arch = "wasm32"))] +fn init_event_streaming(ctx: &MmArc) { + if let Some(config) = ctx.event_stream_configuration() { + if let Some(event) = config.get_event(NETWORK_EVENT_TYPE) { + info!( + "Event {NETWORK_EVENT_TYPE} is activated with {} seconds interval.", + event.stream_interval_seconds + ); + + ctx.spawner() + .spawn(start_network_event_stream(ctx.clone(), event.stream_interval_seconds)); + } + } +} + pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> { init_ordermatch_context(&ctx)?; init_p2p(ctx.clone()).await?; @@ -413,10 +429,7 @@ pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> { kick_start(ctx.clone()).await?; #[cfg(not(target_arch = "wasm32"))] - { - // TODO: this should be configurable from MM2 the config - ctx.spawner().spawn(start_network_event_stream(ctx.clone())); - } + init_event_streaming(&ctx); ctx.spawner().spawn(lp_ordermatch_loop(ctx.clone())); diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index 1c06d76352..e59327e811 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -356,9 +356,13 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { // then we might want to refactor into starting it ideomatically in order to benefit from a more graceful shutdown, // cf. https://github.com/hyperium/hyper/pull/1640. + let ctx = MmArc::from_ffi_handle(ctx_h).expect("No context"); + + let is_event_stream_enabled = ctx.event_stream_configuration().is_some(); + let make_svc_fut = move |remote_addr: SocketAddr| async move { Ok::<_, Infallible>(service_fn(move |req: Request| async move { - if req.uri().path() == SSE_ENDPOINT { + if is_event_stream_enabled && req.uri().path() == SSE_ENDPOINT { let res = handle_sse(req, ctx_h).await?; return Ok::<_, Infallible>(res); } @@ -427,8 +431,6 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { }; } - let ctx = MmArc::from_ffi_handle(ctx_h).expect("No context"); - let rpc_ip_port = ctx .rpc_ip_port() .unwrap_or_else(|err| panic!("Invalid RPC port: {}", err)); diff --git a/mm2src/mm2_net/src/network_event.rs b/mm2src/mm2_net/src/network_event.rs index bc8fbb1ed5..44a0d6fa11 100644 --- a/mm2src/mm2_net/src/network_event.rs +++ b/mm2src/mm2_net/src/network_event.rs @@ -6,9 +6,11 @@ use serde_json::json; use crate::p2p::P2PContext; -const NETWORK_EVENT_TYPE: &str = "NETWORK"; +// TODO: Create Event trait to enforce same design for all events. -pub async fn start_network_event_stream(ctx: MmArc) { +pub const NETWORK_EVENT_TYPE: &str = "NETWORK"; + +pub async fn start_network_event_stream(ctx: MmArc, event_interval: f64) { let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); loop { @@ -32,6 +34,6 @@ pub async fn start_network_event_stream(ctx: MmArc) { .broadcast(Event::new(NETWORK_EVENT_TYPE.to_string(), event_data.to_string())) .await; - Timer::sleep(1.).await; // TODO: read this from configuration + Timer::sleep(event_interval).await; } } diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs index 37b340654c..9d550c012d 100644 --- a/mm2src/mm2_net/src/sse_handler.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -28,10 +28,20 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result return handle_internal_error(err).await, }; + let config = match ctx.event_stream_configuration() { + Some(config) => config, + None => { + return handle_internal_error( + "Event stream configuration couldn't be found. This should never happen.".to_string(), + ) + .await + }, + }; + let filtered_events = get_filtered_events(request); let mut channel_controller = ctx.stream_channel_controller.clone(); - let mut rx = channel_controller.create_channel(4); // TODO: read this from configuration + let mut rx = channel_controller.create_channel(config.active_events.len()); let body = Body::wrap_stream(async_stream::stream! { while let Some(msg) = rx.recv().await { // If there are no filtered events, that means we want to @@ -47,7 +57,7 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result Date: Mon, 28 Aug 2023 18:41:11 +0300 Subject: [PATCH 10/19] leave perf improvement TODO note Signed-off-by: onur-ozkan --- mm2src/mm2_core/src/mm_ctx.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 52799ce6d6..8ed054dc0e 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -344,6 +344,8 @@ impl MmCtx { /// Reads 'event_stream_configuration' from the mm2 configuration. If the config wasn't given, /// returns `None`. + /// + /// TODO: Move this value to `MmCtx`, so deserialization will be executed only once pub fn event_stream_configuration(&self) -> Option { let value = &self.conf["event_stream_configuration"]; if value.is_null() { From c58a7d5736d889a703351ef1acb8a1a28a726cd6 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 29 Aug 2023 10:43:34 +0300 Subject: [PATCH 11/19] avoid serializing events on http side Signed-off-by: onur-ozkan --- mm2src/mm2_event_stream/src/lib.rs | 3 +-- mm2src/mm2_net/src/sse_handler.rs | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index fdc725f03b..9f0494235d 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -1,7 +1,6 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; /// multi-purpose/generic event type that can easily be used over the event streaming -#[derive(Debug, Deserialize, Serialize)] pub struct Event { _type: String, message: String, diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs index 9d550c012d..b054916b60 100644 --- a/mm2src/mm2_net/src/sse_handler.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -43,12 +43,11 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result(Bytes::from(format!("data: {json} \n\n"))); + if filtered_events.is_empty() || filtered_events.contains(&event.event_type().to_owned()) { + yield Ok::<_, hyper::Error>(Bytes::from(format!("data: {} \n\n", event.message()))); } } }); From 79245d18ed37fd21a20f965763750e47c81afec4 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 29 Aug 2023 11:01:03 +0300 Subject: [PATCH 12/19] fix wasm error Signed-off-by: onur-ozkan --- mm2src/mm2_main/src/lp_native_dex.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 5cc7131931..980d9abf64 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -31,7 +31,6 @@ use mm2_err_handle::prelude::*; use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SwarmRuntime, WssCerts}; use mm2_metrics::mm_gauge; -use mm2_net::network_event::NETWORK_EVENT_TYPE; use mm2_net::p2p::P2PContext; use rpc_task::RpcTaskError; use serde_json::{self as json}; @@ -385,6 +384,8 @@ fn migration_1(_ctx: &MmArc) {} #[cfg(not(target_arch = "wasm32"))] fn init_event_streaming(ctx: &MmArc) { + use mm2_net::network_event::NETWORK_EVENT_TYPE; + if let Some(config) = ctx.event_stream_configuration() { if let Some(event) = config.get_event(NETWORK_EVENT_TYPE) { info!( From 99f87c820afc1ed8cb873e4f46ca6373d7173c12 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 29 Aug 2023 11:28:02 +0300 Subject: [PATCH 13/19] update SSE data type Signed-off-by: onur-ozkan --- mm2src/mm2_net/src/sse_handler.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs index b054916b60..2c7338cdc2 100644 --- a/mm2src/mm2_net/src/sse_handler.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -1,5 +1,6 @@ use hyper::{body::Bytes, Body, Request, Response}; use mm2_core::mm_ctx::MmArc; +use serde_json::json; use std::convert::Infallible; pub const SSE_ENDPOINT: &str = "/event-stream"; @@ -47,7 +48,12 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result(Bytes::from(format!("data: {} \n\n", event.message()))); + let data = json!({ + "_type": event.event_type(), + "message": event.message(), + }); + + yield Ok::<_, hyper::Error>(Bytes::from(format!("data: {data} \n\n"))); } } }); From 90996c5f46b858a6d6790fc549a4664b26508a53 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 29 Aug 2023 13:14:10 +0300 Subject: [PATCH 14/19] implement `EventBehaviour` Signed-off-by: onur-ozkan --- Cargo.lock | 2 + mm2src/mm2_event_stream/Cargo.toml | 1 + mm2src/mm2_event_stream/src/behaviour.rs | 15 +++++ mm2src/mm2_event_stream/src/lib.rs | 1 + mm2src/mm2_main/Cargo.toml | 1 + mm2src/mm2_main/src/lp_native_dex.rs | 19 ++---- mm2src/mm2_net/src/network_event.rs | 82 +++++++++++++++--------- 7 files changed, 78 insertions(+), 43 deletions(-) create mode 100644 mm2src/mm2_event_stream/src/behaviour.rs diff --git a/Cargo.lock b/Cargo.lock index ec2adc5a6c..600daa8785 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4187,6 +4187,7 @@ dependencies = [ name = "mm2_event_stream" version = "0.1.0" dependencies = [ + "async-trait", "parking_lot 0.12.0", "serde", "tokio", @@ -4287,6 +4288,7 @@ dependencies = [ "mm2_core", "mm2_db", "mm2_err_handle", + "mm2_event_stream", "mm2_gui_storage", "mm2_io", "mm2_metrics", diff --git a/mm2src/mm2_event_stream/Cargo.toml b/mm2src/mm2_event_stream/Cargo.toml index 8329355c96..ce18ef8143 100644 --- a/mm2src/mm2_event_stream/Cargo.toml +++ b/mm2src/mm2_event_stream/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = "0.1" parking_lot = "0.12" serde = { version = "1", features = ["derive", "rc"] } tokio = { version = "1", features = ["sync"] } diff --git a/mm2src/mm2_event_stream/src/behaviour.rs b/mm2src/mm2_event_stream/src/behaviour.rs new file mode 100644 index 0000000000..bb905af3fc --- /dev/null +++ b/mm2src/mm2_event_stream/src/behaviour.rs @@ -0,0 +1,15 @@ +use crate::EventStreamConfiguration; +use async_trait::async_trait; + +#[async_trait] +pub trait EventBehaviour { + /// Unique name of the event. + const EVENT_NAME: &'static str; + + /// Event handler that is responsible for broadcasting event data to the streaming channels. + async fn handle(self, interval: f64); + + /// Spawns the `Self::handle` in a separate thread if the event is active according to the mm2 configuration. + /// Does nothing if the event is not active. + fn spawn_if_active(self, config: &EventStreamConfiguration); +} diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index 9f0494235d..de95ed5105 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -52,4 +52,5 @@ impl EventStreamConfiguration { } } +pub mod behaviour; pub mod controller; diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index f1258d1827..cd5a33fc25 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -57,6 +57,7 @@ lazy_static = "1.4" libc = "0.2" mm2_core = { path = "../mm2_core" } mm2_err_handle = { path = "../mm2_err_handle" } +mm2_event_stream = { path = "../mm2_event_stream" } mm2_gui_storage = { path = "../mm2_gui_storage" } mm2_io = { path = "../mm2_io" } mm2-libp2p = { path = "../mm2_libp2p" } diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 980d9abf64..7ace35fe3a 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -51,10 +51,11 @@ use crate::mm2::lp_swap::{running_swaps_num, swap_kick_starts}; use crate::mm2::rpc::spawn_rpc; cfg_native! { + use db_common::sqlite::rusqlite::Error as SqlError; + use mm2_event_stream::behaviour::EventBehaviour; use mm2_io::fs::{ensure_dir_is_writable, ensure_file_is_writable}; use mm2_net::ip_addr::myipaddr; - use mm2_net::network_event::start_network_event_stream; - use db_common::sqlite::rusqlite::Error as SqlError; + use mm2_net::network_event::NetworkEvent; } #[path = "lp_init/init_context.rs"] mod init_context; @@ -384,18 +385,10 @@ fn migration_1(_ctx: &MmArc) {} #[cfg(not(target_arch = "wasm32"))] fn init_event_streaming(ctx: &MmArc) { - use mm2_net::network_event::NETWORK_EVENT_TYPE; - + // This condition only executed if events were enabled in mm2 configuration. if let Some(config) = ctx.event_stream_configuration() { - if let Some(event) = config.get_event(NETWORK_EVENT_TYPE) { - info!( - "Event {NETWORK_EVENT_TYPE} is activated with {} seconds interval.", - event.stream_interval_seconds - ); - - ctx.spawner() - .spawn(start_network_event_stream(ctx.clone(), event.stream_interval_seconds)); - } + // Network event handling + NetworkEvent::new(ctx.clone()).spawn_if_active(&config); } } diff --git a/mm2src/mm2_net/src/network_event.rs b/mm2src/mm2_net/src/network_event.rs index 44a0d6fa11..6dd5ee4396 100644 --- a/mm2src/mm2_net/src/network_event.rs +++ b/mm2src/mm2_net/src/network_event.rs @@ -1,39 +1,61 @@ -use common::executor::Timer; +use crate::p2p::P2PContext; +use async_trait::async_trait; +use common::{executor::{SpawnFuture, Timer}, + log::info}; use mm2_core::mm_ctx::MmArc; -use mm2_event_stream::Event; +pub use mm2_event_stream::behaviour::EventBehaviour; +use mm2_event_stream::{Event, EventStreamConfiguration}; use mm2_libp2p::atomicdex_behaviour; use serde_json::json; -use crate::p2p::P2PContext; - -// TODO: Create Event trait to enforce same design for all events. - -pub const NETWORK_EVENT_TYPE: &str = "NETWORK"; - -pub async fn start_network_event_stream(ctx: MmArc, event_interval: f64) { - let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx); - - loop { - let p2p_cmd_tx = p2p_ctx.cmd_tx.lock().clone(); - - let peers_info = atomicdex_behaviour::get_peers_info(p2p_cmd_tx.clone()).await; - let gossip_mesh = atomicdex_behaviour::get_gossip_mesh(p2p_cmd_tx.clone()).await; - let gossip_peer_topics = atomicdex_behaviour::get_gossip_peer_topics(p2p_cmd_tx.clone()).await; - let gossip_topic_peers = atomicdex_behaviour::get_gossip_topic_peers(p2p_cmd_tx.clone()).await; - let relay_mesh = atomicdex_behaviour::get_relay_mesh(p2p_cmd_tx).await; +pub struct NetworkEvent { + ctx: MmArc, +} - let event_data = json!({ - "peers_info": peers_info, - "gossip_mesh": gossip_mesh, - "gossip_peer_topics": gossip_peer_topics, - "gossip_topic_peers": gossip_topic_peers, - "relay_mesh": relay_mesh, - }); +impl NetworkEvent { + pub fn new(ctx: MmArc) -> Self { Self { ctx } } +} - ctx.stream_channel_controller - .broadcast(Event::new(NETWORK_EVENT_TYPE.to_string(), event_data.to_string())) - .await; +#[async_trait] +impl EventBehaviour for NetworkEvent { + const EVENT_NAME: &'static str = "NETWORK"; + + async fn handle(self, interval: f64) { + let p2p_ctx = P2PContext::fetch_from_mm_arc(&self.ctx); + + loop { + let p2p_cmd_tx = p2p_ctx.cmd_tx.lock().clone(); + + let peers_info = atomicdex_behaviour::get_peers_info(p2p_cmd_tx.clone()).await; + let gossip_mesh = atomicdex_behaviour::get_gossip_mesh(p2p_cmd_tx.clone()).await; + let gossip_peer_topics = atomicdex_behaviour::get_gossip_peer_topics(p2p_cmd_tx.clone()).await; + let gossip_topic_peers = atomicdex_behaviour::get_gossip_topic_peers(p2p_cmd_tx.clone()).await; + let relay_mesh = atomicdex_behaviour::get_relay_mesh(p2p_cmd_tx).await; + + let event_data = json!({ + "peers_info": peers_info, + "gossip_mesh": gossip_mesh, + "gossip_peer_topics": gossip_peer_topics, + "gossip_topic_peers": gossip_topic_peers, + "relay_mesh": relay_mesh, + }); + + self.ctx + .stream_channel_controller + .broadcast(Event::new(Self::EVENT_NAME.to_string(), event_data.to_string())) + .await; + + Timer::sleep(interval).await; + } + } - Timer::sleep(event_interval).await; + fn spawn_if_active(self, config: &EventStreamConfiguration) { + if let Some(event) = config.get_event(Self::EVENT_NAME) { + info!( + "NETWORK event is activated with {} seconds interval.", + event.stream_interval_seconds + ); + self.ctx.spawner().spawn(self.handle(event.stream_interval_seconds)); + } } } From 8445f35a8363a5c141106c25eab83888a949e883 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 29 Aug 2023 13:25:28 +0300 Subject: [PATCH 15/19] save configuration in mm2 context Signed-off-by: onur-ozkan --- mm2src/mm2_core/src/mm_ctx.rs | 29 ++++++++++++---------------- mm2src/mm2_main/src/lp_native_dex.rs | 4 ++-- mm2src/mm2_main/src/rpc.rs | 2 +- mm2src/mm2_net/src/sse_handler.rs | 4 ++-- 4 files changed, 17 insertions(+), 22 deletions(-) diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 8ed054dc0e..7f16613f50 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -75,6 +75,8 @@ pub struct MmCtx { pub rpc_started: Constructible, /// Channels for continuously streaming data to clients via SSE. pub stream_channel_controller: Controller, + /// Configuration of event streaming used for SSE. + pub event_stream_configuration: Option, /// True if the MarketMaker instance needs to stop. pub stop: Constructible, /// Unique context identifier, allowing us to more easily pass the context through the FFI boundaries. @@ -137,6 +139,7 @@ impl MmCtx { initialized: Constructible::default(), rpc_started: Constructible::default(), stream_channel_controller: Controller::new(), + event_stream_configuration: None, stop: Constructible::default(), ffi_handle: Constructible::default(), ordermatch_ctx: Mutex::new(None), @@ -341,22 +344,6 @@ impl MmCtx { .lock() .unwrap() } - - /// Reads 'event_stream_configuration' from the mm2 configuration. If the config wasn't given, - /// returns `None`. - /// - /// TODO: Move this value to `MmCtx`, so deserialization will be executed only once - pub fn event_stream_configuration(&self) -> Option { - let value = &self.conf["event_stream_configuration"]; - if value.is_null() { - return None; - } - - let config: EventStreamConfiguration = - json::from_value(value.clone()).expect("Invalid json value in 'event_stream_configuration'."); - - Some(config) - } } impl Default for MmCtx { @@ -699,7 +686,15 @@ impl MmCtxBuilder { ctx.datetime = self.datetime; if let Some(conf) = self.conf { - ctx.conf = conf + ctx.conf = conf; + + let event_stream_configuration = &ctx.conf["event_stream_configuration"]; + if !event_stream_configuration.is_null() { + let event_stream_configuration: EventStreamConfiguration = + json::from_value(event_stream_configuration.clone()) + .expect("Invalid json value in 'event_stream_configuration'."); + ctx.event_stream_configuration = Some(event_stream_configuration); + } } #[cfg(target_arch = "wasm32")] diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 7ace35fe3a..4dd2a00480 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -386,9 +386,9 @@ fn migration_1(_ctx: &MmArc) {} #[cfg(not(target_arch = "wasm32"))] fn init_event_streaming(ctx: &MmArc) { // This condition only executed if events were enabled in mm2 configuration. - if let Some(config) = ctx.event_stream_configuration() { + if let Some(config) = &ctx.event_stream_configuration { // Network event handling - NetworkEvent::new(ctx.clone()).spawn_if_active(&config); + NetworkEvent::new(ctx.clone()).spawn_if_active(config); } } diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index e59327e811..001f8bea28 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -358,7 +358,7 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { let ctx = MmArc::from_ffi_handle(ctx_h).expect("No context"); - let is_event_stream_enabled = ctx.event_stream_configuration().is_some(); + let is_event_stream_enabled = ctx.event_stream_configuration.is_some(); let make_svc_fut = move |remote_addr: SocketAddr| async move { Ok::<_, Infallible>(service_fn(move |req: Request| async move { diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs index 2c7338cdc2..cb75785cd4 100644 --- a/mm2src/mm2_net/src/sse_handler.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -29,7 +29,7 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result return handle_internal_error(err).await, }; - let config = match ctx.event_stream_configuration() { + let config = match &ctx.event_stream_configuration { Some(config) => config, None => { return handle_internal_error( @@ -62,7 +62,7 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result Date: Tue, 29 Aug 2023 19:50:25 +0300 Subject: [PATCH 16/19] update doc-comment of `stream_channel_controller` in `MmCtx` Signed-off-by: onur-ozkan --- mm2src/mm2_core/src/mm_ctx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 7f16613f50..ca66a979df 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -73,7 +73,7 @@ pub struct MmCtx { pub initialized: Constructible, /// True if the RPC HTTP server was started. pub rpc_started: Constructible, - /// Channels for continuously streaming data to clients via SSE. + /// Controller for continuously streaming data using streaming channels of `mm2_event_stream`. pub stream_channel_controller: Controller, /// Configuration of event streaming used for SSE. pub event_stream_configuration: Option, From 2e942af652723a8521928547879bcd390db5fe75 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 11 Sep 2023 07:54:44 +0300 Subject: [PATCH 17/19] optimize `mm2src/mm2_event_stream/src/lib.rs` Signed-off-by: onur-ozkan --- mm2src/mm2_event_stream/src/lib.rs | 32 ++++++++++++++++++++---------- mm2src/mm2_net/src/sse_handler.rs | 30 ++++++++++++---------------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/mm2src/mm2_event_stream/src/lib.rs b/mm2src/mm2_event_stream/src/lib.rs index de95ed5105..afa5c7e1be 100644 --- a/mm2src/mm2_event_stream/src/lib.rs +++ b/mm2src/mm2_event_stream/src/lib.rs @@ -1,12 +1,15 @@ use serde::Deserialize; +use std::collections::HashMap; -/// multi-purpose/generic event type that can easily be used over the event streaming +/// Multi-purpose/generic event type that can easily be used over the event streaming pub struct Event { _type: String, message: String, } impl Event { + /// Creates a new `Event` instance with the specified event type and message. + #[inline] pub fn new(event_type: String, message: String) -> Self { Self { _type: event_type, @@ -14,23 +17,29 @@ impl Event { } } + /// Gets the event type. + #[inline] pub fn event_type(&self) -> &str { &self._type } + /// Gets the event message. + #[inline] pub fn message(&self) -> &str { &self.message } } /// Configuration for event streaming #[derive(Deserialize)] pub struct EventStreamConfiguration { + /// The value to set for the `Access-Control-Allow-Origin` header. #[serde(default)] pub access_control_allow_origin: String, #[serde(default)] - pub active_events: Vec, + active_events: HashMap, } +/// Represents the configuration for a specific event within the event stream. #[derive(Clone, Default, Deserialize)] -pub struct EventStatus { - name: String, +pub struct EventConfig { + /// The interval in seconds at which the event should be streamed. pub stream_interval_seconds: f64, } @@ -38,18 +47,19 @@ impl Default for EventStreamConfiguration { fn default() -> Self { Self { access_control_allow_origin: String::from("*"), - active_events: vec![], + active_events: Default::default(), } } } impl EventStreamConfiguration { - pub fn get_event(&self, event_name: &str) -> Option { - self.active_events - .iter() - .find(|event| event.name == event_name) - .cloned() - } + /// Retrieves the configuration for a specific event by its name. + #[inline] + pub fn get_event(&self, event_name: &str) -> Option { self.active_events.get(event_name).cloned() } + + /// Gets the total number of active events in the configuration. + #[inline] + pub fn total_active_events(&self) -> usize { self.active_events.len() } } pub mod behaviour; diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs index cb75785cd4..f57018ca04 100644 --- a/mm2src/mm2_net/src/sse_handler.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -7,21 +7,6 @@ pub const SSE_ENDPOINT: &str = "/event-stream"; /// Handles broadcasted messages from `mm2_event_stream` continuously. pub async fn handle_sse(request: Request, ctx_h: u32) -> Result, Infallible> { - fn get_filtered_events(request: Request) -> Vec { - let query = request.uri().query().unwrap_or(""); - let events_param = query - .split('&') - .find(|param| param.starts_with("filter=")) - .map(|param| param.trim_start_matches("filter=")) - .unwrap_or(""); - - if events_param.is_empty() { - Vec::new() - } else { - events_param.split(',').map(|event| event.to_string()).collect() - } - } - // This is only called once for per client on the initialization, // meaning this is not a resource intensive computation. let ctx = match MmArc::from_ffi_handle(ctx_h) { @@ -39,10 +24,21 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result { } /// guard to trace channels disconnection -pub struct ChannelGuard { +pub struct ChannelGuard { channel_id: ChannelId, controller: Controller, } /// Receiver to cleanup resources on `Drop` -pub struct GuardedReceiver { +pub struct GuardedReceiver { rx: Receiver>, #[allow(dead_code)] guard: ChannelGuard, } -impl Controller { +impl Controller { /// Creates a new channels controller pub fn new() -> Self { Default::default() } @@ -83,15 +83,19 @@ impl Default for Controller { } } -impl ChannelGuard { +impl ChannelGuard { fn new(channel_id: ChannelId, controller: Controller) -> Self { Self { channel_id, controller } } } -impl Drop for ChannelGuard { - fn drop(&mut self) { self.controller.remove_channel(&self.channel_id); } +impl Drop for ChannelGuard { + fn drop(&mut self) { + common::log::debug!("Dropping event channel with id: {}", self.channel_id); + + self.controller.remove_channel(&self.channel_id); + } } -impl GuardedReceiver { +impl GuardedReceiver { /// Receives the next event from the channel pub async fn recv(&mut self) -> Option> { self.rx.recv().await } } @@ -99,10 +103,25 @@ impl GuardedReceiver { #[cfg(test)] mod tests { use super::*; - use tokio::time::{sleep, Duration}; - #[tokio::test] - async fn test_create_channel_and_broadcast() { + common::cfg_wasm32! { + use wasm_bindgen_test::*; + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + } + + macro_rules! cross_test { + ($test_name:ident, $test_code:block) => { + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test(flavor = "multi_thread")] + async fn $test_name() { $test_code } + + #[cfg(target_arch = "wasm32")] + #[wasm_bindgen_test] + async fn $test_name() { $test_code } + }; + } + + cross_test!(test_create_channel_and_broadcast, { let mut controller = Controller::new(); let mut guard_receiver = controller.create_channel(1); @@ -110,10 +129,9 @@ mod tests { let received_msg = guard_receiver.recv().await.unwrap(); assert_eq!(*received_msg, "Message".to_string()); - } + }); - #[tokio::test] - async fn test_multiple_channels_and_broadcast() { + cross_test!(test_multiple_channels_and_broadcast, { let mut controller = Controller::new(); let mut receivers = Vec::new(); @@ -127,10 +145,9 @@ mod tests { let received_msg = receiver.recv().await.unwrap(); assert_eq!(*received_msg, "Message".to_string()); } - } + }); - #[tokio::test] - async fn test_channel_cleanup_on_drop() { + cross_test!(test_channel_cleanup_on_drop, { let mut controller: Controller<()> = Controller::new(); let guard_receiver = controller.create_channel(1); @@ -138,13 +155,12 @@ mod tests { drop(guard_receiver); - sleep(Duration::from_millis(10)).await; // Give time for the drop to execute + common::executor::Timer::sleep(0.1).await; // Give time for the drop to execute assert_eq!(controller.num_connections(), 0); - } + }); - #[tokio::test] - async fn test_broadcast_across_channels() { + cross_test!(test_broadcast_across_channels, { let mut controller = Controller::new(); let mut receivers = Vec::new(); @@ -158,10 +174,9 @@ mod tests { let received_msg = receiver.recv().await.unwrap(); assert_eq!(*received_msg, "Message".to_string()); } - } + }); - #[tokio::test] - async fn test_multiple_messages_and_drop() { + cross_test!(test_multiple_messages_and_drop, { let mut controller = Controller::new(); let mut guard_receiver = controller.create_channel(6); @@ -188,9 +203,8 @@ mod tests { // Consume the GuardedReceiver to trigger drop and channel cleanup drop(guard_receiver); - // Sleep for a short time to allow cleanup to complete - sleep(Duration::from_millis(10)).await; + common::executor::Timer::sleep(0.1).await; // Give time for the drop to execute assert_eq!(controller.num_connections(), 0); - } + }); } diff --git a/mm2src/mm2_net/Cargo.toml b/mm2src/mm2_net/Cargo.toml index 44c22ce177..2eeb39071c 100644 --- a/mm2src/mm2_net/Cargo.toml +++ b/mm2src/mm2_net/Cargo.toml @@ -37,7 +37,7 @@ js-sys = "0.3.27" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] futures-util = { version = "0.3" } -hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp"] } +hyper = { version = "0.14.26", features = ["client", "http2", "server", "tcp", "stream"] } gstuff = { version = "0.7", features = ["nightly"] } rustls = { version = "0.20", default-features = false } tokio = { version = "1.20" } From a7a2064986bc178dadb769f084963a6dfed386cc Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Fri, 15 Sep 2023 09:22:05 +0300 Subject: [PATCH 19/19] enable wasm tests to be executed with wasm-pack Signed-off-by: onur-ozkan --- mm2src/mm2_event_stream/Cargo.toml | 2 +- mm2src/mm2_event_stream/src/controller.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mm2src/mm2_event_stream/Cargo.toml b/mm2src/mm2_event_stream/Cargo.toml index 51654fbbf5..2865e0a01f 100644 --- a/mm2src/mm2_event_stream/Cargo.toml +++ b/mm2src/mm2_event_stream/Cargo.toml @@ -5,13 +5,13 @@ edition = "2021" [dependencies] async-trait = "0.1" +cfg-if = "1.0" common = { path = "../common" } parking_lot = "0.12" serde = { version = "1", features = ["derive", "rc"] } tokio = { version = "1", features = ["sync"] } [dev-dependencies] -cfg-if = "1.0" tokio = { version = "1", features = ["sync", "macros", "time", "rt"] } [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/mm2src/mm2_event_stream/src/controller.rs b/mm2src/mm2_event_stream/src/controller.rs index 5b6effd7fa..098c6e4bb7 100644 --- a/mm2src/mm2_event_stream/src/controller.rs +++ b/mm2src/mm2_event_stream/src/controller.rs @@ -100,7 +100,7 @@ impl GuardedReceiver { pub async fn recv(&mut self) -> Option> { self.rx.recv().await } } -#[cfg(test)] +#[cfg(any(test, target_arch = "wasm32"))] mod tests { use super::*;