Skip to content

Commit

Permalink
dex: index available liquidity as part of put_position (#2843)
Browse files Browse the repository at this point in the history
This PR creates liquidity indices that track "inbound liquidity" for a given asset. Those indices are used to surface candidates during path search. The inbound liquidity of A is defined as the amount of asset A that can be purchased when routing from an adjacent asset B.
  • Loading branch information
zbuc authored Jul 20, 2023
1 parent 3b7e4b3 commit 3fe090e
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/component/dex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ark-serialize = "0.4"
ark-groth16 = {version = "0.4", default-features = false}
ark-snark = "0.4"
async-trait = "0.1.52"
async-stream = "0.2"
tokio = {version = "1.3", features = ["full"], optional = true}
hex = "0.4"
thiserror = "1"
Expand Down
225 changes: 214 additions & 11 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use std::{collections::BTreeSet, iter::FromIterator, pin::Pin, sync::Arc};
use std::future;
use std::{pin::Pin, sync::Arc};

use anyhow::Result;
use async_stream::try_stream;
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
use penumbra_asset::asset;
use penumbra_num::Amount;
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_storage::{EscapedByteSlice, StateRead, StateWrite};

use crate::lp::position::State;
use crate::{
lp::position::{self, Position},
state_key, DirectedTradingPair,
};

const DYNAMIC_ASSET_LIMIT: usize = 10;

#[async_trait]
pub trait PositionRead: StateRead {
/// Return a stream of all [`position::Metadata`] available.
Expand Down Expand Up @@ -138,12 +144,16 @@ pub trait PositionManager: StateWrite + PositionRead {
// reserves or the position state might have invalidated them.
self.deindex_position_by_price(&position);

let position = self.handle_limit_order(prev, position);
let position = self.handle_limit_order(&prev, position);

// Only index the position's liquidity if it is active.
if position.state == position::State::Opened {
self.index_position_by_price(&position);
}

// Update the available liquidity for this position's trading pair.
self.update_available_liquidity(&position, &prev).await?;

self.put(state_key::position_by_id(&id), position);
Ok(())
}
Expand All @@ -153,7 +163,7 @@ pub trait PositionManager: StateWrite + PositionRead {
/// not a limit order, or has not been filled, it is returned unchanged.
fn handle_limit_order(
&self,
prev_position: Option<position::Position>,
prev_position: &Option<position::Position>,
position: Position,
) -> Position {
let id = position.id();
Expand Down Expand Up @@ -187,16 +197,47 @@ pub trait PositionManager: StateWrite + PositionRead {
/// Combines a list of fixed candidates with a list of liquidity-based candidates.
/// This ensures that the fixed candidates are always considered, minimizing
/// the risk of attacks on routing.
async fn candidate_set(
fn candidate_set(
&self,
_from: asset::Id,
from: asset::Id,
fixed_candidates: Arc<Vec<asset::Id>>,
) -> Result<Vec<asset::Id>> {
let candidates = BTreeSet::from_iter(fixed_candidates.iter().cloned());
// TODO: do dynamic candidate selection based on liquidity (tracked by #2750)
// Idea: each block, compute the per-asset candidate set and store it
// in the object store as a BTreeMap.
Ok(candidates.into_iter().collect())
) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send>> {
// Clone the fixed candidates Arc so it can be moved into the stream filter's future.
let fc = fixed_candidates.clone();
let mut dynamic_candidates = self
.ordered_routable_assets(&from)
.filter(move |c| {
future::ready(!fc.contains(c.as_ref().expect("failed to fetch candidate")))
})
.take(DYNAMIC_ASSET_LIMIT);
try_stream! {
// First stream the fixed candidates, so those can be processed while the dynamic candidates are fetched.
for candidate in fixed_candidates.iter() {
yield candidate.clone();
}

// Yield the liquidity-based candidates. Note that this _may_ include some assets already included in the fixed set.
while let Some(candidate) = dynamic_candidates
.next().await {
yield candidate.expect("failed to fetch candidate");
}
}
.boxed()
}

/// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity.
fn ordered_routable_assets(
&self,
from: &asset::Id,
) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
let prefix = state_key::internal::routable_assets::prefix(from);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
self.nonverifiable_prefix_raw(&prefix)
.map(|entry| match entry {
Ok((_, v)) => Ok(asset::Id::decode(&*v)?),
Err(e) => Err(e),
})
.boxed()
}
}

Expand Down Expand Up @@ -252,5 +293,167 @@ pub(super) trait Inner: StateWrite {
self.nonverifiable_delete(state_key::internal::price_index::key(&pair12, &phi12, &id));
self.nonverifiable_delete(state_key::internal::price_index::key(&pair21, &phi21, &id));
}

/// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`].
/// An [`Option<Position>`] may be specified to allow for the case where a position is being updated.
async fn update_liquidity_index(
&mut self,
pair: DirectedTradingPair,
position: &Position,
prev: &Option<Position>,
) -> Result<()> {
tracing::debug!(?pair, "updating available liquidity indices");

let (new_a_from_b, current_a_from_b) = match (position.state, prev) {
(State::Opened, None) => {
// Add the new position's contribution to the index, no cancellation of the previous version necessary.

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the new reserves to compute `new_position_contribution`,
// the amount of asset A contributed by the position (i.e. the reserves of asset A).
let new_position_contribution = position
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Add the contribution from the updated version.
current_a_from_b.saturating_add(&new_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Opened, Some(prev)) => {
// Add the new position's contribution to the index, deleting the previous version's contribution.

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1).
let prev_position_contribution = prev
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Use the new reserves to compute `new_position_contribution`,
// the amount of asset A contributed by the position (i.e. the reserves of asset A).
let new_position_contribution = position
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Subtract the previous version of the position's contribution to represent that position no longer
// being correct, and add the contribution from the updated version.
(current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Closed, Some(prev)) => {
// Compute the previous contribution and erase it from the current index

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1).
let prev_position_contribution = prev
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Subtract the previous version of the position's contribution to represent that position no longer
// being correct, and since the updated version is Closed, it has no contribution.
current_a_from_b.saturating_sub(&prev_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Withdrawn, _) | (State::Claimed, _) | (State::Closed, None) => {
// The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted.
return Ok(());
}
};

// Delete the existing key for this position if the reserve amount has changed.
if new_a_from_b != current_a_from_b {
self.nonverifiable_delete(
state_key::internal::routable_assets::key(&pair.start, current_a_from_b).to_vec(),
);
}

// Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity.
self.nonverifiable_put_raw(
state_key::internal::routable_assets::key(&pair.start, new_a_from_b).to_vec(),
pair.end.encode_to_vec(),
);
tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end");

// Write the new lookup index storing `new_a_from_b` for this trading pair.
self.nonverifiable_put_raw(
state_key::internal::routable_assets::a_from_b(&pair).to_vec(),
new_a_from_b.to_be_bytes().to_vec(),
);
tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair");

Ok(())
}

async fn update_available_liquidity(
&mut self,
position: &Position,
prev_position: &Option<Position>,
) -> Result<()> {
// Since swaps may be performed in either direction, the available liquidity indices
// need to be calculated and stored for both the A -> B and B -> A directions.
let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2());

// A -> B
self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position)
.await?;
// B -> A
self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position)
.await?;

Ok(())
}
}
impl<T: StateWrite + ?Sized> Inner for T {}
14 changes: 8 additions & 6 deletions crates/core/component/dex/src/component/router/path_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use penumbra_asset::asset;
use penumbra_num::fixpoint::U128x128;
use penumbra_storage::{StateDelta, StateRead};
Expand Down Expand Up @@ -95,22 +96,23 @@ async fn relax_path<S: StateRead + 'static>(
mut path: Path<S>,
fixed_candidates: Arc<Vec<asset::Id>>,
) -> Result<()> {
let candidates = path
let mut candidates = path
.state
.candidate_set(*path.end(), fixed_candidates)
.instrument(path.span.clone())
.await?;
.instrument(path.span.clone());

path.span.in_scope(|| {
tracing::debug!(degree = ?candidates.len(), ?candidates, "relaxing path");
tracing::debug!("relaxing path");
});

let mut js = JoinSet::new();
for new_end in candidates {
// while let Some(new_end) = candidates {

while let Some(new_end) = candidates.inner_mut().next().await {
let new_path = path.fork();
let cache2 = cache.clone();
js.spawn(async move {
if let Some(new_path) = new_path.extend_to(new_end).await? {
if let Some(new_path) = new_path.extend_to(new_end?).await? {
cache2.lock().consider(new_path)
}
anyhow::Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/core/component/dex/src/lp/trading_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl TradingFunction {
input: Value,
reserves: &Reserves,
) -> anyhow::Result<(Value, Reserves, Value)> {
tracing::debug!(?input, ?reserves, "filling trade");
if input.asset_id == self.pair.asset_1() {
let (unfilled, new_reserves, output) = self.component.fill(input.amount, reserves)?;
Ok((
Expand Down Expand Up @@ -412,7 +413,7 @@ impl BareTradingFunction {
price_ratio.checked_div(&self.gamma()).expect("gamma != 0")
}

/// Converts an amount `delta_1` into `lambda_2`, using the id effective price inverse.
/// Converts an amount `delta_1` into `lambda_2`, using the inverse of the effective price.
pub fn convert_to_lambda_2(&self, delta_1: U128x128) -> anyhow::Result<U128x128> {
let lambda_2 = self.effective_price_inv() * delta_1;
Ok(lambda_2?)
Expand Down
38 changes: 38 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,44 @@ pub(crate) mod internal {
use super::*;
use crate::lp::BareTradingFunction;

/// Find assets with liquidity positions from asset `from`, ordered by price.
pub mod routable_assets {
use penumbra_asset::asset;
use penumbra_num::Amount;

use super::*;

/// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`.
/// `a_from_b` represents the amount of `A` that can be bought with `B`.
/// The prefix query includes only the `A` portion, meaning the keys will be returned in order of liquidity.
pub fn prefix(from: &asset::Id) -> [u8; 39] {
let mut key = [0u8; 39];
key[0..7].copy_from_slice(b"dex/ra/");
key[7..7 + 32].copy_from_slice(&from.to_bytes());
key
}

/// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`.
/// `a_from_b` represents the amount of `A` that can be bought with `B`.
pub fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] {
let mut key = [0u8; 55];
key[0..7].copy_from_slice(b"dex/ra/");
key[7..32 + 7].copy_from_slice(&from.to_bytes());
key[32 + 7..32 + 7 + 16].copy_from_slice(&a_from_b.to_be_bytes());
key
}

/// `(A, B) => A_from_B` this will encode the current amount of `A` tradable into `B` for every directly routable trading pair.
/// This index can be used to determine the key values for the [`super::key`] ordered index to perform updates efficiently.
pub fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] {
let mut key = [0u8; 71];
key[0..7].copy_from_slice(b"dex/ab/");
key[7..7 + 32].copy_from_slice(&pair.start.to_bytes());
key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes());
key
}
}

pub mod price_index {
use super::*;

Expand Down
Loading

0 comments on commit 3fe090e

Please sign in to comment.