Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement arbitrage #2539

Merged
merged 9 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions app/src/dex/arb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use penumbra_chain::component::StateReadExt;
use penumbra_crypto::{asset, dex::execution::SwapExecution, Value, STAKING_TOKEN_ASSET_ID};
use penumbra_storage::{StateDelta, StateWrite};
use tracing::instrument;

use crate::dex::{
router::{RouteAndFill, RoutingParams},
StateWriteExt,
};

#[async_trait]
pub trait Arbitrage: StateWrite + Sized {
/// Attempts to extract as much as possible of the `arb_token` from the available
/// liquidity positions, and returns the amount of `arb_token` extracted.
#[instrument(skip(self, arb_token, fixed_candidates))]
async fn arbitrage(
self: &mut Arc<Self>,
arb_token: asset::Id,
fixed_candidates: Vec<asset::Id>,
) -> Result<Value>
where
Self: 'static,
{
tracing::debug!(?arb_token, ?fixed_candidates, "beginning arb search");

// Work in a new `StateDelta`, so we can transactionally apply any state
// changes, and roll them back if we fail (e.g., if for some reason we
// discover at the end that the arb wasn't profitable).
let mut this = Arc::new(StateDelta::new(self.clone()));

// TODO: Build an extended candidate set with:
// - both ends of all trading pairs for which there were swaps in the block
// - both ends of all trading pairs for which positions were opened
let params = RoutingParams {
max_hops: 5,
price_limit: Some(1u64.into()),
fixed_candidates: Arc::new(fixed_candidates),
};

// Create a flash-loan 2^64 of the arb token to ourselves.
let flash_loan = Value {
asset_id: arb_token,
amount: u64::MAX.into(),
};

let (output, unfilled_input) = this
.route_and_fill(arb_token, arb_token, flash_loan.amount, params)
.await?;

// Because we're trading the arb token to itself, the total output is the
// output from the route-and-fill, plus the unfilled input.
let total_output = output + unfilled_input;

// Now "repay" the flash loan by subtracting it from the total output.
let Some(arb_profit) = total_output.checked_sub(&flash_loan.amount) else {
// This shouldn't happen, but because route-and-fill prioritizes
// guarantees about forward progress over precise application of
// price limits, it technically could occur.
tracing::debug!("mis-estimation in route-and-fill led to unprofitable arb, discarding");
return Ok(Value { amount: 0u64.into(), asset_id: arb_token });
};

if arb_profit == 0u64.into() {
// If we didn't make any profit, we don't need to do anything,
// and we can just discard the state delta entirely.
tracing::debug!("found 0-profit arb, discarding");
return Ok(Value {
amount: 0u64.into(),
asset_id: arb_token,
});
}

tracing::debug!(
?arb_profit,
"successfully arbitraged positions, burning profit"
);

// TODO: this is a bit nasty, can it be simplified?
// should this even be done "inside" the method, or all the way at the top?
let (self2, cache) = Arc::try_unwrap(this)
.map_err(|_| ())
.expect("no more outstanding refs to state after routing")
.flatten();
std::mem::drop(self2);
// Now there is only one reference to self again
let mut self_mut = Arc::get_mut(self).expect("self was unique ref");
cache.apply_to(&mut self_mut);

// Finally, record the arb execution in the state:
let traces: im::Vector<Vec<Value>> = self_mut
.object_get("trade_traces")
.ok_or_else(|| anyhow::anyhow!("missing swap execution in object store2"))?;
let height = self_mut.get_block_height().await?;
self_mut.set_arb_execution(
height,
SwapExecution {
traces: traces.into_iter().collect(),
},
);

return Ok(Value {
amount: arb_profit,
asset_id: arb_token,
});
}
}

impl<T: StateWrite> Arbitrage for T {}
37 changes: 35 additions & 2 deletions app/src/dex/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use penumbra_chain::component::StateReadExt as _;
use penumbra_compact_block::component::{StateReadExt as _, StateWriteExt as _};
use penumbra_component::Component;
use penumbra_crypto::{
asset,
dex::{execution::SwapExecution, BatchSwapOutputData, TradingPair},
SwapFlow,
SwapFlow, STAKING_TOKEN_ASSET_ID,
};
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_storage::{StateRead, StateWrite};
Expand All @@ -16,7 +17,7 @@ use tracing::instrument;

use super::{
router::{HandleBatchSwaps, RoutingParams},
state_key, PositionManager,
state_key, Arbitrage, PositionManager,
};

pub struct Dex {}
Expand Down Expand Up @@ -60,6 +61,30 @@ impl Component for Dex {
.expect("unable to process batch swaps");
}

// Then, perform arbitrage:
let arb_burn = state
.arbitrage(
*STAKING_TOKEN_ASSET_ID,
vec![
*STAKING_TOKEN_ASSET_ID,
asset::REGISTRY.parse_unit("gm").id(),
asset::REGISTRY.parse_unit("gn").id(),
asset::REGISTRY.parse_unit("test_usd").id(),
asset::REGISTRY.parse_unit("test_btc").id(),
asset::REGISTRY.parse_unit("test_atom").id(),
asset::REGISTRY.parse_unit("test_osmo").id(),
],
)
.await
.expect("must be able to process arbitrage");

if arb_burn.amount != 0u64.into() {
// TODO: hack to avoid needing an asset cache for nice debug output
let unit = asset::REGISTRY.parse_unit("penumbra");
let burn = format!("{}{}", unit.format_value(arb_burn.amount), unit);
tracing::info!(%burn, "executed arbitrage opportunity");
}

// Next, close all positions queued for closure at the end of the block.
// It's important to do this after execution, to allow block-scoped JIT liquidity.
Arc::get_mut(state)
Expand Down Expand Up @@ -96,6 +121,10 @@ pub trait StateReadExt: StateRead {
.await
}

async fn arb_execution(&self, height: u64) -> Result<Option<SwapExecution>> {
self.get(&state_key::arb_execution(height)).await
}

// Get the swap flow for the given trading pair accumulated in this block so far.
fn swap_flow(&self, pair: &TradingPair) -> SwapFlow {
self.swap_flows().get(pair).cloned().unwrap_or_default()
Expand Down Expand Up @@ -128,6 +157,10 @@ pub trait StateWriteExt: StateWrite + StateReadExt {
self.stub_put_compact_block(compact_block);
}

fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
self.put(state_key::arb_execution(height), execution);
}

fn put_swap_flow(&mut self, trading_pair: &TradingPair, swap_flow: SwapFlow) {
// TODO: replace with IM struct later
let mut swap_flows = self.swap_flows();
Expand Down
2 changes: 2 additions & 0 deletions app/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ pub mod state_key;

pub mod router;

mod arb;
mod position_manager;

pub use self::metrics::register_metrics;
pub use arb::Arbitrage;
pub use component::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::{PositionManager, PositionRead};

Expand Down
4 changes: 4 additions & 0 deletions app/src/dex/router/route_and_fill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ pub trait RouteAndFill: StateWrite + Sized {
tracing::debug!("no path found, exiting route_and_fill");
break;
};
if path.is_empty() {
tracing::debug!("empty path found, exiting route_and_fill");
break;
}

(outer_lambda_2, outer_unfilled_1) = {
// path found, fill as much as we can
Expand Down
4 changes: 4 additions & 0 deletions app/src/dex/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub fn swap_execution(height: u64, trading_pair: TradingPair) -> String {
)
}

pub fn arb_execution(height: u64) -> String {
format!("dex/arb_execution/{height:020}")
}

pub fn swap_flows() -> &'static str {
"dex/swap_flows"
}
Expand Down
57 changes: 56 additions & 1 deletion app/src/dex/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use penumbra_crypto::Value;
use crate::dex::{
position_manager::PositionManager,
router::{limit_buy, limit_sell, HandleBatchSwaps, RoutingParams},
StateWriteExt,
Arbitrage, StateWriteExt,
};
use crate::dex::{position_manager::PositionRead, StateReadExt};
use crate::TempStorageExt;
Expand Down Expand Up @@ -631,3 +631,58 @@ async fn swap_execution_tests() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test]
/// Test that a basic cycle arb is detected and filled.
async fn basic_cycle_arb() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let storage = TempStorage::new().await?.apply_default_genesis().await?;
let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
let mut state_tx = state.try_begin_transaction().unwrap();

let penumbra = asset::REGISTRY.parse_unit("penumbra");
let gm = asset::REGISTRY.parse_unit("gm");
let gn = asset::REGISTRY.parse_unit("gn");

tracing::info!(gm_id = ?gm.id());
tracing::info!(gn_id = ?gn.id());
tracing::info!(penumbra_id = ?penumbra.id());

// Sell 10 gn at 1 penumbra each.
state_tx.put_position(limit_sell(
DirectedUnitPair::new(gn.clone(), penumbra.clone()),
10u64.into(),
1u64.into(),
));
// Buy 100 gn at 2 gm each.
state_tx.put_position(limit_buy(
DirectedUnitPair::new(gn.clone(), gm.clone()),
100u64.into(),
2u64.into(),
));
// Sell 100 penumbra at 1 gm each.
state_tx.put_position(limit_sell(
DirectedUnitPair::new(penumbra.clone(), gm.clone()),
100u64.into(),
1u64.into(),
));
state_tx.apply();

// Now we should be able to arb 10penumbra => 10gn => 20gm => 20penumbra.
state
.arbitrage(penumbra.id(), vec![penumbra.id(), gm.id(), gn.id()])
.await?;

let arb_execution = state.arb_execution(0).await?.expect("arb was performed");
assert_eq!(
arb_execution.traces,
vec![vec![
penumbra.value(10u32.into()),
gn.value(10u32.into()),
gm.value(20u32.into()),
penumbra.value(20u32.into()),
],]
);

Ok(())
}
4 changes: 2 additions & 2 deletions proto/proto/buf.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ deps:
- remote: buf.build
owner: cosmos
repository: cosmos-sdk
commit: 1706a742a274436cb0eabf4950ed85cf
digest: shake256:0fa7809d8fcab1da96ea2a19f8f13a81cd673b820fbed1addf122911db4de6c143679f6033186d4448afe4c408445054f48a129b2fee2cf79f814f012ba8ef10
commit: 07205de1b4354a9eb61010f9e6640150
digest: shake256:ed2737b2a8fa2169bb2b82b44b8707ac8d98271ea9c5bcd575a84534d4d2269253d2451a9698942c8bb70ec69c869a49509d5d6693e5d2ae25018879f6731cbd
- remote: buf.build
owner: cosmos
repository: gogo-proto
Expand Down