Skip to content

Commit

Permalink
perf: resync_membership_proofs is more concurrent
Browse files Browse the repository at this point in the history
addresses #183.
  • Loading branch information
dan-da committed Sep 17, 2024
1 parent a59bae6 commit ea1f317
Showing 1 changed file with 137 additions and 66 deletions.
203 changes: 137 additions & 66 deletions src/models/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod shared;
pub mod wallet;

use std::cmp::max;
use std::collections::BTreeMap;
use std::ops::Deref;
use std::ops::DerefMut;

Expand Down Expand Up @@ -191,8 +192,70 @@ impl GlobalStateLock {
}

/// resync membership proofs
///
/// concurrency:
///
/// This fn performs potentially lengthy operations. The work has been
/// split into two: an immutable fn that prepares a batch of updates and a
/// mutable fn that finalizes (writes) the updates as quickly as possible.
/// In this way, we do not hold the write-lock any longer than necessary.
///
/// Splitting the logic this way requires buffering updates in RAM. We
/// batch the updates to avoid running out of mem. The batch size is
/// initially set to 1000. This is just a wild-ass-guess. It seems high
/// enough that most wallets will only need a single batch, but low enough
/// that we shouldn't run out of mem if processing a huge wallet.
///
/// see: <https://github.com/Neptune-Crypto/neptune-core/issues/183>
pub async fn resync_membership_proofs(&mut self) -> Result<()> {
self.lock_guard_mut().await.resync_membership_proofs().await
// acquire read-lock
let gs = self.lock_guard().await;

// Do not fix memberhip proofs if node is in sync mode, as we would otherwise
// have to sync many times, instead of just *one* time once we have caught up.
if gs.net.syncing {
debug!("Not syncing MS membership proofs because we are syncing");
return Ok(());
}

// is it necessary?
let tip_hash = gs.chain.light_state().hash();
if gs.wallet_state.is_synced_to(tip_hash).await {
debug!("Membership proof syncing not needed");
return Ok(());
}

// do we have blocks?
if !gs.chain.is_archival_node() {
todo!("We don't yet support non-archival nodes");
// request blocks from peers, etc.
}

drop(gs); // release read-lock.

// Process monitored_utxos in batches of 1000. It seems unlikely many
// wallets would have more than 1000 mutxo that need resyncing but We do
// this as a safety valve to avoid blowing up memory too much.
const BATCH_SIZE: u64 = 1000;
for batch_idx in 0.. {
// perform lengthy op gathering updates with read-lock
let updates_batch = self
.lock_guard()
.await
.prepare_resync_membership_proofs(tip_hash, batch_idx, BATCH_SIZE)
.await?;
if updates_batch.len() == 0 {
// no more updates, we're done.
break;
}

// write updates as quickly as possible with write-lock.
self.lock_guard_mut()
.await
.finalize_resync_membership_proofs(tip_hash, updates_batch)
.await?;
}
Ok(())
}

pub async fn prune_abandoned_monitored_utxos(
Expand Down Expand Up @@ -959,16 +1022,36 @@ impl GlobalState {
Ok(())
}

/// Locking:
/// * acquires `monitored_utxos_lock` for write
pub async fn resync_membership_proofs_from_stored_blocks(
&mut self,
/// perf: this fn in o(n) with the number of monitored_utxos in the wallet.
///
/// ```text
/// lengthy, i/o calls, foreach mutxo:
/// monitored_utxos.get()
/// ArchivalState::find_path()
/// ArchivalState::get_block()
/// called twice per block in a backwards loop along path
/// called twice per block in a forwards loop along path
///
/// it should be possible to reduce this to 1 call per block
/// for each loop by storing prev block in a temp var.
/// ```
///
/// see <https://github.com/Neptune-Crypto/neptune-core/issues/183>
async fn prepare_resync_membership_proofs(
&self,
tip_hash: Digest,
) -> Result<()> {
batch_idx: u64,
batch_size: u64,
) -> Result<impl ExactSizeIterator<Item = (Index, MonitoredUtxo)> + Send> {
// loop over all monitored utxos
let monitored_utxos = self.wallet_state.wallet_db.monitored_utxos_mut();
let monitored_utxos = self.wallet_state.wallet_db.monitored_utxos();

let mut mutxo_updates: BTreeMap<Index, MonitoredUtxo> = Default::default();

'outer: for i in 0..monitored_utxos.len().await {
let start = batch_idx * batch_size;
let end = std::cmp::min(start + batch_size, monitored_utxos.len().await);

'outer: for i in start..end {
let i = i as Index;
let monitored_utxo = monitored_utxos.get(i).await;

Expand Down Expand Up @@ -1126,10 +1209,26 @@ impl GlobalState {
// store updated membership proof
monitored_utxo.add_membership_proof_for_tip(tip_hash, membership_proof);

// update storage.
monitored_utxos.set(i, monitored_utxo).await
mutxo_updates.insert(i, monitored_utxo);
}

Ok(mutxo_updates.into_iter())
}

/// writes/persists a list of [MonitoredUtxo].
///
/// see <https://github.com/Neptune-Crypto/neptune-core/issues/183>
async fn finalize_resync_membership_proofs(
&mut self,
tip_hash: Digest,
monitored_utxos: impl IntoIterator<Item = (Index, MonitoredUtxo)> + Send,
) -> Result<()> {
self.wallet_state
.wallet_db
.monitored_utxos_mut()
.set_many(monitored_utxos)
.await;

// Update sync label and persist
self.wallet_state.wallet_db.set_sync_label(tip_hash).await;
self.wallet_state.wallet_db.persist().await;
Expand Down Expand Up @@ -1349,35 +1448,6 @@ impl GlobalState {
))
}

/// resync membership proofs
pub async fn resync_membership_proofs(&mut self) -> Result<()> {
// Do not fix memberhip proofs if node is in sync mode, as we would otherwise
// have to sync many times, instead of just *one* time once we have caught up.
if self.net.syncing {
debug!("Not syncing MS membership proofs because we are syncing");
return Ok(());
}

// is it necessary?
let current_tip_digest = self.chain.light_state().hash();
if self.wallet_state.is_synced_to(current_tip_digest).await {
debug!("Membership proof syncing not needed");
return Ok(());
}

// do we have blocks?
if self.chain.is_archival_node() {
return self
.resync_membership_proofs_from_stored_blocks(current_tip_digest)
.await;
}

// request blocks from peers
todo!("We don't yet support non-archival nodes");

// Ok(())
}

#[inline]
pub fn cli(&self) -> &cli_args::Args {
&self.cli
Expand Down Expand Up @@ -1700,8 +1770,12 @@ mod global_state_tests {
);

// Call resync
let mutxo_updates = global_state
.prepare_resync_membership_proofs(mock_block_1a.hash(), 0, 1000)
.await
.unwrap();
global_state
.resync_membership_proofs_from_stored_blocks(mock_block_1a.hash())
.finalize_resync_membership_proofs(mock_block_1a.hash(), mutxo_updates)
.await
.unwrap();

Expand Down Expand Up @@ -1771,15 +1845,15 @@ mod global_state_tests {
global_state.set_new_tip(next_block.clone()).await.unwrap();
parent_block = next_block;
}
drop(global_state);

// Call resync which fails to sync the UTXO that was abandoned when block 1a was abandoned
global_state
.resync_membership_proofs_from_stored_blocks(parent_block.hash())
.await
.unwrap();
global_state_lock.resync_membership_proofs().await.unwrap();

let gs = global_state_lock.lock_guard().await;

// Verify that one MUTXO is unsynced, and that 1 (from genesis) is synced
let wallet_status_after_forking = global_state
let wallet_status_after_forking = gs
.wallet_state
.get_wallet_status_from_lock(parent_block.hash())
.await;
Expand All @@ -1788,19 +1862,19 @@ mod global_state_tests {

// Verify that the MUTXO from block 1a is considered abandoned, and that the one from
// genesis block is not.
let monitored_utxos = global_state.wallet_state.wallet_db.monitored_utxos();
let monitored_utxos = gs.wallet_state.wallet_db.monitored_utxos();
assert!(
!monitored_utxos
.get(0)
.await
.was_abandoned(parent_block.hash(), global_state.chain.archival_state())
.was_abandoned(parent_block.hash(), gs.chain.archival_state())
.await
);
assert!(
monitored_utxos
.get(1)
.await
.was_abandoned(parent_block.hash(), global_state.chain.archival_state())
.was_abandoned(parent_block.hash(), gs.chain.archival_state())
.await
);

Expand Down Expand Up @@ -1896,12 +1970,13 @@ mod global_state_tests {
wallet_status_on_b_fork_before_resync.unsynced_unspent.len()
);

drop(global_state);

// Run the resync and verify that MPs are synced
global_state
.resync_membership_proofs_from_stored_blocks(fork_b_block.hash())
.await
.unwrap();
let wallet_status_on_b_fork_after_resync = global_state
global_state_lock.resync_membership_proofs().await.unwrap();

let mut gsm = global_state_lock.lock_guard_mut().await;
let wallet_status_on_b_fork_after_resync = gsm
.wallet_state
.get_wallet_status_from_lock(fork_b_block.hash())
.await;
Expand All @@ -1918,15 +1993,12 @@ mod global_state_tests {
for _ in 0..100 {
let (next_c_block, _, _) =
make_mock_block(&fork_c_block, None, other_receiving_address, rng.gen());
global_state
.set_new_tip(next_c_block.clone())
.await
.unwrap();
gsm.set_new_tip(next_c_block.clone()).await.unwrap();
fork_c_block = next_c_block;
}

// Verify that there are zero MUTXOs with synced MPs
let wallet_status_on_c_fork_before_resync = global_state
let wallet_status_on_c_fork_before_resync = gsm
.wallet_state
.get_wallet_status_from_lock(fork_c_block.hash())
.await;
Expand All @@ -1938,14 +2010,13 @@ mod global_state_tests {
2,
wallet_status_on_c_fork_before_resync.unsynced_unspent.len()
);
drop(gsm);

// Run the resync and verify that UTXO from genesis is synced, but that
// UTXO from 1a is not synced.
global_state
.resync_membership_proofs_from_stored_blocks(fork_c_block.hash())
.await
.unwrap();
let wallet_status_on_c_fork_after_resync = global_state
global_state_lock.resync_membership_proofs().await.unwrap();
let gs = global_state_lock.lock_guard().await;
let wallet_status_on_c_fork_after_resync = gs
.wallet_state
.get_wallet_status_from_lock(fork_c_block.hash())
.await;
Expand All @@ -1956,19 +2027,19 @@ mod global_state_tests {
);

// Also check that UTXO from 1a is considered abandoned
let monitored_utxos = global_state.wallet_state.wallet_db.monitored_utxos();
let monitored_utxos = gs.wallet_state.wallet_db.monitored_utxos();
assert!(
!monitored_utxos
.get(0)
.await
.was_abandoned(fork_c_block.hash(), global_state.chain.archival_state())
.was_abandoned(fork_c_block.hash(), gs.chain.archival_state())
.await
);
assert!(
monitored_utxos
.get(1)
.await
.was_abandoned(fork_c_block.hash(), global_state.chain.archival_state())
.was_abandoned(fork_c_block.hash(), gs.chain.archival_state())
.await
);

Expand Down

0 comments on commit ea1f317

Please sign in to comment.