diff --git a/Cargo.lock b/Cargo.lock index 2771993f66..e3b2e2f587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4728,6 +4728,7 @@ dependencies = [ "ark-relations", "ark-serialize", "ark-snark", + "async-stream 0.2.1", "async-trait", "base64 0.20.0", "blake2b_simd 0.5.11", diff --git a/crates/core/component/dex/Cargo.toml b/crates/core/component/dex/Cargo.toml index 3b40884322..461aa1de1e 100644 --- a/crates/core/component/dex/Cargo.toml +++ b/crates/core/component/dex/Cargo.toml @@ -42,6 +42,7 @@ ark-serialize = "0.4" ark-groth16 = {version = "0.4", default-features = false} ark-snark = "0.4" async-trait = "0.1.52" +async-stream = "0.2" tokio = {version = "1.3", features = ["full"], optional = true} hex = "0.4" thiserror = "1" diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 6606844dab..bdab6dfc77 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -1,19 +1,25 @@ -use std::{collections::BTreeSet, iter::FromIterator, pin::Pin, sync::Arc}; +use std::future; +use std::{pin::Pin, sync::Arc}; use anyhow::Result; +use async_stream::try_stream; use async_trait::async_trait; use futures::Stream; use futures::StreamExt; use penumbra_asset::asset; use penumbra_num::Amount; +use penumbra_proto::DomainType; use penumbra_proto::{StateReadProto, StateWriteProto}; use penumbra_storage::{EscapedByteSlice, StateRead, StateWrite}; +use crate::lp::position::State; use crate::{ lp::position::{self, Position}, state_key, DirectedTradingPair, }; +const DYNAMIC_ASSET_LIMIT: usize = 10; + #[async_trait] pub trait PositionRead: StateRead { /// Return a stream of all [`position::Metadata`] available. @@ -138,12 +144,16 @@ pub trait PositionManager: StateWrite + PositionRead { // reserves or the position state might have invalidated them. self.deindex_position_by_price(&position); - let position = self.handle_limit_order(prev, position); + let position = self.handle_limit_order(&prev, position); // Only index the position's liquidity if it is active. if position.state == position::State::Opened { self.index_position_by_price(&position); } + + // Update the available liquidity for this position's trading pair. + self.update_available_liquidity(&position, &prev).await?; + self.put(state_key::position_by_id(&id), position); Ok(()) } @@ -153,7 +163,7 @@ pub trait PositionManager: StateWrite + PositionRead { /// not a limit order, or has not been filled, it is returned unchanged. fn handle_limit_order( &self, - prev_position: Option, + prev_position: &Option, position: Position, ) -> Position { let id = position.id(); @@ -187,16 +197,47 @@ pub trait PositionManager: StateWrite + PositionRead { /// Combines a list of fixed candidates with a list of liquidity-based candidates. /// This ensures that the fixed candidates are always considered, minimizing /// the risk of attacks on routing. - async fn candidate_set( + fn candidate_set( &self, - _from: asset::Id, + from: asset::Id, fixed_candidates: Arc>, - ) -> Result> { - let candidates = BTreeSet::from_iter(fixed_candidates.iter().cloned()); - // TODO: do dynamic candidate selection based on liquidity (tracked by #2750) - // Idea: each block, compute the per-asset candidate set and store it - // in the object store as a BTreeMap. - Ok(candidates.into_iter().collect()) + ) -> Pin> + Send>> { + // Clone the fixed candidates Arc so it can be moved into the stream filter's future. + let fc = fixed_candidates.clone(); + let mut dynamic_candidates = self + .ordered_routable_assets(&from) + .filter(move |c| { + future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate"))) + }) + .take(DYNAMIC_ASSET_LIMIT); + try_stream! { + // First stream the fixed candidates, so those can be processed while the dynamic candidates are fetched. + for candidate in fixed_candidates.iter() { + yield candidate.clone(); + } + + // Yield the liquidity-based candidates. Note that this _may_ include some assets already included in the fixed set. + while let Some(candidate) = dynamic_candidates + .next().await { + yield candidate.expect("failed to fetch candidate"); + } + } + .boxed() + } + + /// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity. + fn ordered_routable_assets( + &self, + from: &asset::Id, + ) -> Pin> + Send + 'static>> { + let prefix = state_key::internal::routable_assets::prefix(from); + tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity"); + self.nonverifiable_prefix_raw(&prefix) + .map(|entry| match entry { + Ok((_, v)) => Ok(asset::Id::decode(&*v)?), + Err(e) => Err(e), + }) + .boxed() } } @@ -252,5 +293,167 @@ pub(super) trait Inner: StateWrite { self.nonverifiable_delete(state_key::internal::price_index::key(&pair12, &phi12, &id)); self.nonverifiable_delete(state_key::internal::price_index::key(&pair21, &phi21, &id)); } + + /// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`]. + /// An [`Option`] may be specified to allow for the case where a position is being updated. + async fn update_liquidity_index( + &mut self, + pair: DirectedTradingPair, + position: &Position, + prev: &Option, + ) -> Result<()> { + tracing::debug!(?pair, "updating available liquidity indices"); + + let (new_a_from_b, current_a_from_b) = match (position.state, prev) { + (State::Opened, None) => { + // Add the new position's contribution to the index, no cancellation of the previous version necessary. + + // Query the current available liquidity for this trading pair, or zero if the trading pair + // has no current liquidity. + let current_a_from_b = self + .nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair)) + .await? + .map(|bytes| { + Amount::from_be_bytes( + bytes + .try_into() + .expect("liquidity index amount can always be parsed"), + ) + }) + .unwrap_or_default(); + + // Use the new reserves to compute `new_position_contribution`, + // the amount of asset A contributed by the position (i.e. the reserves of asset A). + let new_position_contribution = position + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Compute `new_A_from_B`. + let new_a_from_b = + // Add the contribution from the updated version. + current_a_from_b.saturating_add(&new_position_contribution); + + tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair"); + + (new_a_from_b, current_a_from_b) + } + (State::Opened, Some(prev)) => { + // Add the new position's contribution to the index, deleting the previous version's contribution. + + // Query the current available liquidity for this trading pair, or zero if the trading pair + // has no current liquidity. + let current_a_from_b = self + .nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair)) + .await? + .map(|bytes| { + Amount::from_be_bytes( + bytes + .try_into() + .expect("liquidity index amount can always be parsed"), + ) + }) + .unwrap_or_default(); + + // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). + let prev_position_contribution = prev + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Use the new reserves to compute `new_position_contribution`, + // the amount of asset A contributed by the position (i.e. the reserves of asset A). + let new_position_contribution = position + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Compute `new_A_from_B`. + let new_a_from_b = + // Subtract the previous version of the position's contribution to represent that position no longer + // being correct, and add the contribution from the updated version. + (current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution); + + tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair"); + + (new_a_from_b, current_a_from_b) + } + (State::Closed, Some(prev)) => { + // Compute the previous contribution and erase it from the current index + + // Query the current available liquidity for this trading pair, or zero if the trading pair + // has no current liquidity. + let current_a_from_b = self + .nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair)) + .await? + .map(|bytes| { + Amount::from_be_bytes( + bytes + .try_into() + .expect("liquidity index amount can always be parsed"), + ) + }) + .unwrap_or_default(); + + // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). + let prev_position_contribution = prev + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Compute `new_A_from_B`. + let new_a_from_b = + // Subtract the previous version of the position's contribution to represent that position no longer + // being correct, and since the updated version is Closed, it has no contribution. + current_a_from_b.saturating_sub(&prev_position_contribution); + + tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair"); + + (new_a_from_b, current_a_from_b) + } + (State::Withdrawn, _) | (State::Claimed, _) | (State::Closed, None) => { + // The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted. + return Ok(()); + } + }; + + // Delete the existing key for this position if the reserve amount has changed. + if new_a_from_b != current_a_from_b { + self.nonverifiable_delete( + state_key::internal::routable_assets::key(&pair.start, current_a_from_b).to_vec(), + ); + } + + // Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity. + self.nonverifiable_put_raw( + state_key::internal::routable_assets::key(&pair.start, new_a_from_b).to_vec(), + pair.end.encode_to_vec(), + ); + tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end"); + + // Write the new lookup index storing `new_a_from_b` for this trading pair. + self.nonverifiable_put_raw( + state_key::internal::routable_assets::a_from_b(&pair).to_vec(), + new_a_from_b.to_be_bytes().to_vec(), + ); + tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair"); + + Ok(()) + } + + async fn update_available_liquidity( + &mut self, + position: &Position, + prev_position: &Option, + ) -> Result<()> { + // Since swaps may be performed in either direction, the available liquidity indices + // need to be calculated and stored for both the A -> B and B -> A directions. + let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2()); + + // A -> B + self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position) + .await?; + // B -> A + self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position) + .await?; + + Ok(()) + } } impl Inner for T {} diff --git a/crates/core/component/dex/src/component/router/path_search.rs b/crates/core/component/dex/src/component/router/path_search.rs index c24aaa351c..dccc5d932d 100644 --- a/crates/core/component/dex/src/component/router/path_search.rs +++ b/crates/core/component/dex/src/component/router/path_search.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; +use futures::StreamExt; use penumbra_asset::asset; use penumbra_num::fixpoint::U128x128; use penumbra_storage::{StateDelta, StateRead}; @@ -95,22 +96,23 @@ async fn relax_path( mut path: Path, fixed_candidates: Arc>, ) -> Result<()> { - let candidates = path + let mut candidates = path .state .candidate_set(*path.end(), fixed_candidates) - .instrument(path.span.clone()) - .await?; + .instrument(path.span.clone()); path.span.in_scope(|| { - tracing::debug!(degree = ?candidates.len(), ?candidates, "relaxing path"); + tracing::debug!("relaxing path"); }); let mut js = JoinSet::new(); - for new_end in candidates { + // while let Some(new_end) = candidates { + + while let Some(new_end) = candidates.inner_mut().next().await { let new_path = path.fork(); let cache2 = cache.clone(); js.spawn(async move { - if let Some(new_path) = new_path.extend_to(new_end).await? { + if let Some(new_path) = new_path.extend_to(new_end?).await? { cache2.lock().consider(new_path) } anyhow::Ok(()) diff --git a/crates/core/component/dex/src/lp/trading_function.rs b/crates/core/component/dex/src/lp/trading_function.rs index 3f4dd85f47..eb88d4b0c8 100644 --- a/crates/core/component/dex/src/lp/trading_function.rs +++ b/crates/core/component/dex/src/lp/trading_function.rs @@ -44,6 +44,7 @@ impl TradingFunction { input: Value, reserves: &Reserves, ) -> anyhow::Result<(Value, Reserves, Value)> { + tracing::debug!(?input, ?reserves, "filling trade"); if input.asset_id == self.pair.asset_1() { let (unfilled, new_reserves, output) = self.component.fill(input.amount, reserves)?; Ok(( @@ -412,7 +413,7 @@ impl BareTradingFunction { price_ratio.checked_div(&self.gamma()).expect("gamma != 0") } - /// Converts an amount `delta_1` into `lambda_2`, using the id effective price inverse. + /// Converts an amount `delta_1` into `lambda_2`, using the inverse of the effective price. pub fn convert_to_lambda_2(&self, delta_1: U128x128) -> anyhow::Result { let lambda_2 = self.effective_price_inv() * delta_1; Ok(lambda_2?) diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 3a5b469b17..71f913ac87 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -64,6 +64,44 @@ pub(crate) mod internal { use super::*; use crate::lp::BareTradingFunction; + /// Find assets with liquidity positions from asset `from`, ordered by price. + pub mod routable_assets { + use penumbra_asset::asset; + use penumbra_num::Amount; + + use super::*; + + /// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`. + /// `a_from_b` represents the amount of `A` that can be bought with `B`. + /// The prefix query includes only the `A` portion, meaning the keys will be returned in order of liquidity. + pub fn prefix(from: &asset::Id) -> [u8; 39] { + let mut key = [0u8; 39]; + key[0..7].copy_from_slice(b"dex/ra/"); + key[7..7 + 32].copy_from_slice(&from.to_bytes()); + key + } + + /// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`. + /// `a_from_b` represents the amount of `A` that can be bought with `B`. + pub fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] { + let mut key = [0u8; 55]; + key[0..7].copy_from_slice(b"dex/ra/"); + key[7..32 + 7].copy_from_slice(&from.to_bytes()); + key[32 + 7..32 + 7 + 16].copy_from_slice(&a_from_b.to_be_bytes()); + key + } + + /// `(A, B) => A_from_B` this will encode the current amount of `A` tradable into `B` for every directly routable trading pair. + /// This index can be used to determine the key values for the [`super::key`] ordered index to perform updates efficiently. + pub fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] { + let mut key = [0u8; 71]; + key[0..7].copy_from_slice(b"dex/ab/"); + key[7..7 + 32].copy_from_slice(&pair.start.to_bytes()); + key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes()); + key + } + } + pub mod price_index { use super::*; diff --git a/crates/core/component/ibc/src/component/transfer.rs b/crates/core/component/ibc/src/component/transfer.rs index 4fe0e7e066..f9754e1a09 100644 --- a/crates/core/component/ibc/src/component/transfer.rs +++ b/crates/core/component/ibc/src/component/transfer.rs @@ -75,7 +75,10 @@ pub trait Ics20TransferWriteExt: StateWrite { // create packet, assume it's already checked since the component caller contract calls `check` before `execute` let checked_packet = IBCPacket::::from(withdrawal.clone()).assume_checked(); - let prefix = format!("{}/{}/", &withdrawal.source_port, &withdrawal.source_channel); + let prefix = format!( + "{}/{}/", + &withdrawal.source_port, &withdrawal.source_channel + ); if !withdrawal.denom.starts_with(&prefix) { // we are the source. add the value balance to the escrow channel. let existing_value_balance: Amount = self diff --git a/crates/core/num/src/amount.rs b/crates/core/num/src/amount.rs index 74805b94ea..7c1036ee94 100644 --- a/crates/core/num/src/amount.rs +++ b/crates/core/num/src/amount.rs @@ -35,17 +35,39 @@ impl Amount { self.inner.to_le_bytes() } + pub fn to_be_bytes(&self) -> [u8; 16] { + self.inner.to_be_bytes() + } + pub fn from_le_bytes(bytes: [u8; 16]) -> Amount { Amount { inner: u128::from_le_bytes(bytes), } } + pub fn from_be_bytes(bytes: [u8; 16]) -> Amount { + Amount { + inner: u128::from_be_bytes(bytes), + } + } + pub fn checked_sub(&self, rhs: &Self) -> Option { self.inner .checked_sub(rhs.inner) .map(|inner| Self { inner }) } + + pub fn saturating_add(&self, rhs: &Self) -> Self { + Self { + inner: self.inner.saturating_add(rhs.inner), + } + } + + pub fn saturating_sub(&self, rhs: &Self) -> Self { + Self { + inner: self.inner.saturating_sub(rhs.inner), + } + } } #[derive(Clone)]