Skip to content

Commit

Permalink
[WIP] concurrent balance loop controller
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Sep 21, 2023
1 parent a7a2064 commit 41fa296
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ lazy_static = "1.4"
libc = "0.2"
mm2_core = { path = "../mm2_core" }
mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_git = { path = "../mm2_git" }
mm2_io = { path = "../mm2_io" }
mm2_metrics = { path = "../mm2_metrics" }
Expand Down
7 changes: 7 additions & 0 deletions mm2src/coins/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::MmCoin;
use async_trait::async_trait;

#[async_trait]
pub trait BalanceEvent: MmCoin {
async fn stream(&self);
}
65 changes: 65 additions & 0 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use async_trait::async_trait;
use base58::FromBase58Error;
use bip32::ExtendedPrivateKey;
use common::custom_futures::timeout::TimeoutError;
use common::executor::Timer;
use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner},
AbortSettings, AbortedError, SpawnAbortable, SpawnFuture};
use common::log::{warn, LogOnError};
Expand Down Expand Up @@ -250,6 +251,8 @@ pub use test_coin::TestCoin;

pub mod tx_history_storage;

pub mod events;

#[doc(hidden)]
#[allow(unused_variables)]
#[cfg(all(
Expand Down Expand Up @@ -3411,6 +3414,68 @@ fn lp_spawn_tx_history(ctx: MmArc, coin: MmCoinEnum) -> Result<(), String> {
Ok(())
}

/// Concurrent balance event controller loop for active coins
pub async fn balance_event_loop(ctx: MmArc) {
let cctx = CoinsContext::from_ctx(&ctx).unwrap();

// Events that are already fired
let mut event_pool: Vec<String> = vec![];

loop {
let coins_mutex = cctx.coins.lock().await;

let coins: Vec<MmCoinEnum> = coins_mutex
.values()
.filter_map(|coin| {
if coin.is_available.load(AtomicOrdering::Relaxed) && coin.inner.is_platform_coin() {
Some(coin.inner.clone())
} else {
None
}
})
.collect();

drop(coins_mutex);

for coin in coins {
let ticker = coin.ticker().to_owned();

if event_pool.contains(&ticker) {
continue;
}

match coin {
MmCoinEnum::Tendermint(_) => {
println!("COIN IS HERE {:?}", ticker);
},
MmCoinEnum::TendermintToken(_) => {
unimplemented!();
},
MmCoinEnum::UtxoCoin(_) => todo!(),
MmCoinEnum::QtumCoin(_) => todo!(),
MmCoinEnum::Qrc20Coin(_) => todo!(),
MmCoinEnum::EthCoin(_) => todo!(),
MmCoinEnum::ZCoin(_) => todo!(),
MmCoinEnum::Bch(_) => todo!(),
MmCoinEnum::SlpToken(_) => todo!(),
MmCoinEnum::LightningCoin(_) => todo!(),
MmCoinEnum::Test(_) => todo!(),
#[cfg(all(
feature = "enable-solana",
not(target_os = "ios"),
not(target_os = "android"),
not(target_arch = "wasm32")
))]
MmCoinEnum::SolanaCoin(_) | MmCoinEnum::SplToken(_) => todo!(),
}

event_pool.push(ticker);
}

Timer::sleep(5.).await;
}
}

/// NB: Returns only the enabled (aka active) coins.
pub async fn lp_coinfind(ctx: &MmArc, ticker: &str) -> Result<Option<MmCoinEnum>, String> {
let cctx = try_s!(CoinsContext::from_ctx(ctx));
Expand Down
9 changes: 9 additions & 0 deletions mm2src/coins/tendermint/balance_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use async_trait::async_trait;

use super::TendermintCoin;
use crate::events::BalanceEvent;

#[async_trait]
impl BalanceEvent for TendermintCoin {
async fn stream(&self) {}
}
1 change: 1 addition & 0 deletions mm2src/coins/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Useful resources
// https://docs.cosmos.network/

pub mod balance_event;
mod ibc;
mod iris;
mod rpc;
Expand Down
4 changes: 4 additions & 0 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//

use bitcrypto::sha256;
use coins::balance_event_loop;
use coins::register_balance_update_handler;
use common::executor::{SpawnFuture, Timer};
use common::log::{info, warn};
Expand Down Expand Up @@ -382,10 +383,13 @@ fn migration_1(_ctx: &MmArc) {}
#[cfg(not(target_arch = "wasm32"))]
fn init_event_streaming(ctx: &MmArc) {
// This condition only executed if events were enabled in mm2 configuration.

if let Some(config) = &ctx.event_stream_configuration {
// Network event handling
NetworkEvent::new(ctx.clone()).spawn_if_active(config);
}

ctx.spawner().spawn(balance_event_loop(ctx.clone()));
}

pub async fn lp_init_continue(ctx: MmArc) -> MmInitResult<()> {
Expand Down

0 comments on commit 41fa296

Please sign in to comment.