Skip to content

Commit

Permalink
Merge pull request #4017 from anoma/mergify/bp/maint-0.45/pr-4016
Browse files Browse the repository at this point in the history
Improve the performance of the shielded sync ledger client (backport #4016)
  • Loading branch information
mergify[bot] authored Nov 12, 2024
2 parents 8b486b4 + aec0664 commit 430a97c
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Improve the shielded sync's ledger client performance and user experience.
([\#4016](https://github.com/anoma/namada/pull/4016))
9 changes: 9 additions & 0 deletions crates/apps_lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,8 @@ pub mod args {
}),
);
pub const BIRTHDAY: ArgOpt<BlockHeight> = arg_opt("birthday");
pub const BLOCK_BATCH: ArgDefault<usize> =
arg_default("block-batch", DefaultFn(|| 10));
pub const BLOCK_HEIGHT: Arg<BlockHeight> = arg("block-height");
pub const BLOCK_HEIGHT_OPT: ArgOpt<BlockHeight> = arg_opt("height");
pub const BLOCK_HEIGHT_TO_OPT: ArgOpt<BlockHeight> = arg_opt("to-height");
Expand Down Expand Up @@ -6803,6 +6805,7 @@ pub mod args {
Some(times) => RetryStrategy::Times(times),
None => RetryStrategy::Forever,
};
let block_batch_size = BLOCK_BATCH.parse(matches);
Self {
ledger_address,
last_query_height,
Expand All @@ -6812,6 +6815,7 @@ pub mod args {
wait_for_last_query_height,
max_concurrent_fetches,
retry_strategy,
block_batch_size,
}
}

Expand Down Expand Up @@ -6849,6 +6853,10 @@ pub mod args {
"Maximum number of times to retry fetching. If no \
argument is provided, defaults to retrying forever."
)))
.arg(BLOCK_BATCH.def().help(wrap!(
"Number of blocks fetched per concurrent fetch job. The \
default is 10."
)))
}
}

Expand All @@ -6862,6 +6870,7 @@ pub mod args {
let chain_ctx = ctx.borrow_mut_chain_or_exit();

Ok(ShieldedSync {
block_batch_size: self.block_batch_size,
max_concurrent_fetches: self.max_concurrent_fetches,
wait_for_last_query_height: self.wait_for_last_query_height,
ledger_address: chain_ctx.get(&self.ledger_address),
Expand Down
2 changes: 2 additions & 0 deletions crates/apps_lib/src/client/masp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub async fn syncing<
.shutdown_signal(install_shutdown_signal(false))
.wait_for_last_query_height(args.wait_for_last_query_height)
.retry_strategy(args.retry_strategy)
.block_batch_size(args.block_batch_size)
.build();

let env = MaspLocalTaskEnv::new(500)
Expand Down Expand Up @@ -141,6 +142,7 @@ pub async fn syncing<
dispatch_client!(LedgerMaspClient::new(
client,
args.max_concurrent_fetches,
Duration::from_millis(5),
))?
};

Expand Down
36 changes: 33 additions & 3 deletions crates/core/src/control_flow/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ pub trait SleepStrategy {
/// Calculate a duration from a sleep strategy state.
fn backoff(&self, state: &Self::State) -> Duration;

/// Update the state of the sleep strategy.
/// Move to the next state of the sleep strategy.
fn next_state(&self, state: &mut Self::State);

/// Move to the previous state of the sleep strategy.
fn prev_state(&self, state: &mut Self::State);

/// Map a function to the duration returned from a
/// sleep strategy.
fn map<M>(self, map: M) -> Map<Self, M>
Expand Down Expand Up @@ -73,6 +76,11 @@ where
fn next_state(&self, state: &mut S::State) {
self.strategy.next_state(state)
}

#[inline]
fn prev_state(&self, state: &mut S::State) {
self.strategy.prev_state(state)
}
}

/// Constant sleep strategy.
Expand All @@ -93,6 +101,10 @@ impl SleepStrategy for Constant {
fn next_state(&self, _: &mut ()) {
// NOOP
}

fn prev_state(&self, _: &mut ()) {
// NOOP
}
}

/// Linear backoff sleep strategy.
Expand All @@ -114,7 +126,11 @@ impl SleepStrategy for LinearBackoff {
}

fn next_state(&self, state: &mut Duration) {
*state += self.delta;
*state = state.saturating_add(self.delta);
}

fn prev_state(&self, state: &mut Duration) {
*state = state.saturating_sub(self.delta);
}
}

Expand Down Expand Up @@ -144,6 +160,10 @@ where
fn next_state(&self, state: &mut Self::State) {
*state = state.saturating_add(1);
}

fn prev_state(&self, state: &mut Self::State) {
*state = state.saturating_sub(1);
}
}

/// A [`SleepStrategy`] adaptor, to run async tasks with custom
Expand Down Expand Up @@ -236,7 +256,17 @@ impl<S: SleepStrategy> Sleep<S> {
/// Update the sleep strategy state, and sleep for the given backoff.
async fn sleep_update(&self, state: &mut S::State) {
self.strategy.next_state(state);
sleep(self.strategy.backoff(state)).await;
self.sleep_with_current_backoff(state).await;
}

/// Sleep for a [`Duration`] equivalent to the value of
/// the current backoff.
pub fn sleep_with_current_backoff(
&self,
state: &S::State,
) -> impl Future<Output = ()> + 'static {
let backoff_duration = self.strategy.backoff(state);
sleep(backoff_duration)
}

/// Run a future as many times as `iter_times`
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/bench_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@ impl BenchShieldedCtx {
wait_for_last_query_height: false,
max_concurrent_fetches: 100,
retry_strategy: RetryStrategy::Forever,
block_batch_size: 10,
},
&StdIo,
))
Expand Down
2 changes: 2 additions & 0 deletions crates/sdk/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2158,6 +2158,8 @@ pub struct ShieldedSync<C: NamadaTypes = SdkTypes> {
/// Maximum number of fetch jobs that will ever
/// execute concurrently during the shielded sync.
pub max_concurrent_fetches: usize,
/// Number of blocks fetched per concurrent fetch job.
pub block_batch_size: usize,
/// Maximum number of times to retry fetching. If `None`
/// is provided, defaults to "forever".
pub retry_strategy: RetryStrategy,
Expand Down
73 changes: 58 additions & 15 deletions crates/sdk/src/masp/utilities.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
//! Helper functions and types

use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use borsh::BorshDeserialize;
use masp_primitives::merkle_tree::{CommitmentTree, IncrementalWitness};
use masp_primitives::sapling::Node;
use masp_primitives::transaction::Transaction as MaspTx;
use namada_core::chain::BlockHeight;
use namada_core::collections::HashMap;
use namada_core::control_flow::time::{
Duration, LinearBackoff, Sleep, SleepStrategy,
};
use namada_core::storage::TxIndex;
use namada_events::extend::IndexedMaspData;
use namada_io::Client;
Expand All @@ -24,6 +27,8 @@ use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height};
struct LedgerMaspClientInner<C> {
client: C,
semaphore: Semaphore,
backoff: RwLock<Duration>,
sleep: Sleep<LinearBackoff>,
}

/// An inefficient MASP client which simply uses a
Expand All @@ -43,36 +48,39 @@ impl<C> Clone for LedgerMaspClient<C> {
impl<C> LedgerMaspClient<C> {
/// Create a new [`MaspClient`] given an rpc client.
#[inline(always)]
pub fn new(client: C, max_concurrent_fetches: usize) -> Self {
pub fn new(
client: C,
max_concurrent_fetches: usize,
linear_backoff_delta: Duration,
) -> Self {
Self {
inner: Arc::new(LedgerMaspClientInner {
client,
semaphore: Semaphore::new(max_concurrent_fetches),
backoff: RwLock::new(Duration::from_secs(0)),
sleep: Sleep {
strategy: LinearBackoff {
delta: linear_backoff_delta,
},
},
}),
}
}
}

impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
type Error = Error;

async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
Ok(maybe_block.map(|b| b.height))
}

async fn fetch_shielded_transfers(
impl<C: Client + Send + Sync> LedgerMaspClient<C> {
async fn fetch_shielded_transfers_inner(
&self,
from: BlockHeight,
to: BlockHeight,
) -> Result<Vec<IndexedNoteEntry>, Error> {
let _permit = self.inner.semaphore.acquire().await.unwrap();

// Fetch all the transactions we do not have yet
let mut txs = vec![];

for height in from.0..=to.0 {
let maybe_txs_results = async {
let _permit = self.inner.semaphore.acquire().await.unwrap();

get_indexed_masp_events_at_height(
&self.inner.client,
height.into(),
Expand All @@ -86,8 +94,6 @@ impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
};

let block = {
let _permit = self.inner.semaphore.acquire().await.unwrap();

// Query the actual block to get the txs bytes. If we only need
// one tx it might be slightly better to query
// the /tx endpoint to reduce the amount of data
Expand Down Expand Up @@ -127,6 +133,43 @@ impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {

Ok(txs)
}
}

impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
type Error = Error;

async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
Ok(maybe_block.map(|b| b.height))
}

async fn fetch_shielded_transfers(
&self,
from: BlockHeight,
to: BlockHeight,
) -> Result<Vec<IndexedNoteEntry>, Error> {
const ZERO: Duration = Duration::from_secs(0);
let current_backoff = { *self.inner.backoff.read().unwrap() };

if current_backoff > ZERO {
self.inner
.sleep
.sleep_with_current_backoff(&current_backoff)
.await;
}

let result = self.fetch_shielded_transfers_inner(from, to).await;

if result.is_err() {
let mut backoff = self.inner.backoff.write().unwrap();
self.inner.sleep.strategy.next_state(&mut *backoff);
} else if current_backoff > ZERO {
let mut backoff = self.inner.backoff.write().unwrap();
self.inner.sleep.strategy.prev_state(&mut *backoff);
}

result
}

#[inline(always)]
fn capabilities(&self) -> MaspClientCapabilities {
Expand Down
1 change: 0 additions & 1 deletion crates/shielded_token/src/masp/shielded_sync/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ where
}

if self.config.retry_strategy.may_retry() {
self.config.fetched_tracker.message(format!("{error}"));
true
} else {
// NB: store last encountered error
Expand Down

0 comments on commit 430a97c

Please sign in to comment.