Skip to content

Commit

Permalink
Merge pull request #534 from interlay/interlay-oracle-fix
Browse files Browse the repository at this point in the history
fix: allow oracles composed of different feeds
  • Loading branch information
sander2 authored Nov 17, 2023
2 parents 93a3330 + 719dbe6 commit ac9dee9
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 23 deletions.
18 changes: 9 additions & 9 deletions oracle/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct PriceConfig<Currency> {
#[serde(default)]
pub value: Option<f64>,
// Feeds to consume to calculate this exchange rate.
#[serde(default = "BTreeMap::new")]
pub feeds: BTreeMap<FeedName, Vec<CurrencyPair<Currency>>>,
#[serde(default = "Vec::new")]
pub feeds: Vec<Vec<(FeedName, CurrencyPair<Currency>)>>,
}

impl<Currency> PriceConfig<Currency>
Expand All @@ -47,12 +47,15 @@ where
{
// TODO: validate currencies exist
pub fn validate(&self) -> Result<(), PriceConfigError<Currency>> {
for (name, path) in &self.feeds {
for path in self
.feeds
.iter()
.map(|x| x.iter().map(|(_name, pair)| pair.clone()).collect::<Vec<_>>())
{
let end = &match &path.first() {
Some(currency_pair) if currency_pair.contains(&self.pair.base) => Ok(self.pair.quote.clone()),
Some(currency_pair) if currency_pair.contains(&self.pair.quote) => Ok(self.pair.base.clone()),
_ => Err(PriceConfigError {
feed: name.clone(),
pair: self.pair.clone(),
error: ConfigError::NoStart,
}),
Expand All @@ -61,7 +64,6 @@ where
match &path.last() {
Some(currency_pair) if currency_pair.contains(end) => Ok(()),
_ => Err(PriceConfigError {
feed: name.clone(),
pair: self.pair.clone(),
error: ConfigError::NoEnd,
}),
Expand All @@ -70,7 +72,6 @@ where
for [left, right] in path.windows(2).flat_map(<&[CurrencyPair<Currency>; 2]>::try_from) {
if !left.has_shared(right) {
return Err(PriceConfigError {
feed: name.clone(),
pair: self.pair.clone(),
error: ConfigError::NoPath(left.clone(), right.clone()),
});
Expand All @@ -91,7 +92,7 @@ mod tests {
PriceConfig {
pair: $pair,
value: None,
feeds: vec![(FeedName::Kraken, vec![$($path),*])].into_iter().collect()
feeds: vec![vec![$((FeedName::Kraken, $path)),*]]
}
.validate().expect("Config is valid")
}};
Expand All @@ -102,14 +103,13 @@ mod tests {
let result = PriceConfig {
pair: $pair,
value: None,
feeds: vec![(FeedName::Kraken, vec![$($path),*])].into_iter().collect()
feeds: vec![vec![$((FeedName::Kraken, $path)),*]]
}
.validate();
assert!(
matches!(
result,
Err(PriceConfigError{
feed: FeedName::Kraken,
pair: _,
error: $err
})
Expand Down
3 changes: 1 addition & 2 deletions oracle/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ pub enum ConfigError<Currency> {
}

#[derive(Error, Debug)]
#[error("{feed}: {pair} => {error}")]
#[error("{pair} => {error}")]
pub struct PriceConfigError<Currency> {
pub(crate) feed: FeedName,
pub(crate) pair: CurrencyPair<Currency>,
pub(crate) error: ConfigError<Currency>,
}
Expand Down
19 changes: 11 additions & 8 deletions oracle/src/feeds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,25 @@ impl PriceFeeds {
price_config
.feeds
.into_iter()
.map(|(name, route)| {
self.feeds
.get(&name)
.map(|feed| (name.clone(), route, feed))
.ok_or(Error::NotConfigured(name))
.map(|x| {
x.into_iter()
.map(|(name, pair)| {
self.feeds
.get(&name)
.map(|feed| (name.clone(), pair, feed))
.ok_or(Error::NotConfigured(name))
})
.collect::<Result<Vec<_>, Error>>()
})
.collect::<Result<Vec<_>, Error>>()?
.into_iter()
.map(|(name, route, feed)| {
.map(|route| {
let currency_pair = currency_pair.clone();
async move {
let mut currency_pair_and_price = if let Some(currency_pair_and_price) = join_all(
route
.into_iter()
.map(|currency_pair| feed.get_price(currency_pair, currency_store)),
.map(|(name, currency_pair, feed)| feed.get_price(currency_pair, currency_store)),
)
.await
.into_iter()
Expand All @@ -149,7 +153,6 @@ impl PriceFeeds {
currency_pair_and_price = currency_pair_and_price.invert()
}

log::trace!("Using {:?}: {}", name, currency_pair_and_price);
Ok(Some(currency_pair_and_price))
}
}),
Expand Down
2 changes: 1 addition & 1 deletion oracle/src/feeds/dia_fair_price.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::single_char_pattern)]
use super::{get_http, PriceFeed};
use crate::{config::CurrencyStore, currency::*, Error};
use crate::{config::CurrencyStore, currency::*, feeds::DiaApi, Error};
use async_trait::async_trait;
use clap::Parser;
use reqwest::Url;
Expand Down
9 changes: 6 additions & 3 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ mod feeds;

use backoff::{future::retry_notify, ExponentialBackoff};
use clap::Parser;
use config::{CurrencyStore, OracleConfig};
use config::{CurrencyStore, OracleConfig, PriceConfig};
use currency::*;
use error::Error;
use feeds::CoinGeckoApi;
use futures::{future::join_all, stream::StreamExt};
use git_version::git_version;
use runtime::{
Expand All @@ -16,7 +17,7 @@ use runtime::{
};
use signal_hook::consts::*;
use signal_hook_tokio::Signals;
use std::{path::PathBuf, time::Duration};
use std::{collections::BTreeMap, path::PathBuf, time::Duration};
use tokio::{join, time::sleep};

const VERSION: &str = git_version!(args = ["--tags"]);
Expand Down Expand Up @@ -179,7 +180,7 @@ async fn _main() -> Result<(), Error> {

let currency_store = &oracle_config.currencies;
let mut price_feeds = feeds::PriceFeeds::new(currency_store.clone());
price_feeds.maybe_add_coingecko(opts.coingecko);
price_feeds.maybe_add_coingecko(opts.coingecko.clone());
price_feeds.maybe_add_dia(opts.dia);
price_feeds.maybe_add_dia_fair_price(opts.dia_fair_price);
price_feeds.maybe_add_gateio(opts.gateio);
Expand All @@ -206,6 +207,8 @@ async fn _main() -> Result<(), Error> {
.into_iter()
.collect::<Result<Vec<_>, _>>()?;

log::debug!("Collected prices: {:?}", prices);

// get prices above first to prevent websocket timeout
let shutdown_tx = ShutdownSender::new();
let parachain_rpc = InterBtcParachain::from_url_with_retry(
Expand Down

0 comments on commit ac9dee9

Please sign in to comment.