From 719dbe69b9f4803a29b373d02c412a7449700c52 Mon Sep 17 00:00:00 2001 From: Sander Bosma Date: Thu, 16 Nov 2023 21:36:12 +0100 Subject: [PATCH] fix: allow oracles composed of different feeds --- oracle/src/config.rs | 18 +++++++++--------- oracle/src/error.rs | 3 +-- oracle/src/feeds.rs | 19 +++++++++++-------- oracle/src/feeds/dia_fair_price.rs | 2 +- oracle/src/main.rs | 9 ++++++--- 5 files changed, 28 insertions(+), 23 deletions(-) diff --git a/oracle/src/config.rs b/oracle/src/config.rs index c8c706673..27cd46b31 100644 --- a/oracle/src/config.rs +++ b/oracle/src/config.rs @@ -37,8 +37,8 @@ pub struct PriceConfig { #[serde(default)] pub value: Option, // Feeds to consume to calculate this exchange rate. - #[serde(default = "BTreeMap::new")] - pub feeds: BTreeMap>>, + #[serde(default = "Vec::new")] + pub feeds: Vec)>>, } impl PriceConfig @@ -47,12 +47,15 @@ where { // TODO: validate currencies exist pub fn validate(&self) -> Result<(), PriceConfigError> { - for (name, path) in &self.feeds { + for path in self + .feeds + .iter() + .map(|x| x.iter().map(|(_name, pair)| pair.clone()).collect::>()) + { let end = &match &path.first() { Some(currency_pair) if currency_pair.contains(&self.pair.base) => Ok(self.pair.quote.clone()), Some(currency_pair) if currency_pair.contains(&self.pair.quote) => Ok(self.pair.base.clone()), _ => Err(PriceConfigError { - feed: name.clone(), pair: self.pair.clone(), error: ConfigError::NoStart, }), @@ -61,7 +64,6 @@ where match &path.last() { Some(currency_pair) if currency_pair.contains(end) => Ok(()), _ => Err(PriceConfigError { - feed: name.clone(), pair: self.pair.clone(), error: ConfigError::NoEnd, }), @@ -70,7 +72,6 @@ where for [left, right] in path.windows(2).flat_map(<&[CurrencyPair; 2]>::try_from) { if !left.has_shared(right) { return Err(PriceConfigError { - feed: name.clone(), pair: self.pair.clone(), error: ConfigError::NoPath(left.clone(), right.clone()), }); @@ -91,7 +92,7 @@ mod tests { PriceConfig { pair: $pair, value: None, - feeds: vec![(FeedName::Kraken, vec![$($path),*])].into_iter().collect() + feeds: vec![vec![$((FeedName::Kraken, $path)),*]] } .validate().expect("Config is valid") }}; @@ -102,14 +103,13 @@ mod tests { let result = PriceConfig { pair: $pair, value: None, - feeds: vec![(FeedName::Kraken, vec![$($path),*])].into_iter().collect() + feeds: vec![vec![$((FeedName::Kraken, $path)),*]] } .validate(); assert!( matches!( result, Err(PriceConfigError{ - feed: FeedName::Kraken, pair: _, error: $err }) diff --git a/oracle/src/error.rs b/oracle/src/error.rs index 03f8b4958..cca047249 100644 --- a/oracle/src/error.rs +++ b/oracle/src/error.rs @@ -24,9 +24,8 @@ pub enum ConfigError { } #[derive(Error, Debug)] -#[error("{feed}: {pair} => {error}")] +#[error("{pair} => {error}")] pub struct PriceConfigError { - pub(crate) feed: FeedName, pub(crate) pair: CurrencyPair, pub(crate) error: ConfigError, } diff --git a/oracle/src/feeds.rs b/oracle/src/feeds.rs index 79cbb209d..105c81856 100644 --- a/oracle/src/feeds.rs +++ b/oracle/src/feeds.rs @@ -118,21 +118,25 @@ impl PriceFeeds { price_config .feeds .into_iter() - .map(|(name, route)| { - self.feeds - .get(&name) - .map(|feed| (name.clone(), route, feed)) - .ok_or(Error::NotConfigured(name)) + .map(|x| { + x.into_iter() + .map(|(name, pair)| { + self.feeds + .get(&name) + .map(|feed| (name.clone(), pair, feed)) + .ok_or(Error::NotConfigured(name)) + }) + .collect::, Error>>() }) .collect::, Error>>()? .into_iter() - .map(|(name, route, feed)| { + .map(|route| { let currency_pair = currency_pair.clone(); async move { let mut currency_pair_and_price = if let Some(currency_pair_and_price) = join_all( route .into_iter() - .map(|currency_pair| feed.get_price(currency_pair, currency_store)), + .map(|(name, currency_pair, feed)| feed.get_price(currency_pair, currency_store)), ) .await .into_iter() @@ -149,7 +153,6 @@ impl PriceFeeds { currency_pair_and_price = currency_pair_and_price.invert() } - log::trace!("Using {:?}: {}", name, currency_pair_and_price); Ok(Some(currency_pair_and_price)) } }), diff --git a/oracle/src/feeds/dia_fair_price.rs b/oracle/src/feeds/dia_fair_price.rs index 80193f446..43b427576 100644 --- a/oracle/src/feeds/dia_fair_price.rs +++ b/oracle/src/feeds/dia_fair_price.rs @@ -1,6 +1,6 @@ #![allow(clippy::single_char_pattern)] use super::{get_http, PriceFeed}; -use crate::{config::CurrencyStore, currency::*, Error}; +use crate::{config::CurrencyStore, currency::*, feeds::DiaApi, Error}; use async_trait::async_trait; use clap::Parser; use reqwest::Url; diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 713d7adb8..dfe4fa2d4 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -5,9 +5,10 @@ mod feeds; use backoff::{future::retry_notify, ExponentialBackoff}; use clap::Parser; -use config::{CurrencyStore, OracleConfig}; +use config::{CurrencyStore, OracleConfig, PriceConfig}; use currency::*; use error::Error; +use feeds::CoinGeckoApi; use futures::{future::join_all, stream::StreamExt}; use git_version::git_version; use runtime::{ @@ -16,7 +17,7 @@ use runtime::{ }; use signal_hook::consts::*; use signal_hook_tokio::Signals; -use std::{path::PathBuf, time::Duration}; +use std::{collections::BTreeMap, path::PathBuf, time::Duration}; use tokio::{join, time::sleep}; const VERSION: &str = git_version!(args = ["--tags"]); @@ -179,7 +180,7 @@ async fn _main() -> Result<(), Error> { let currency_store = &oracle_config.currencies; let mut price_feeds = feeds::PriceFeeds::new(currency_store.clone()); - price_feeds.maybe_add_coingecko(opts.coingecko); + price_feeds.maybe_add_coingecko(opts.coingecko.clone()); price_feeds.maybe_add_dia(opts.dia); price_feeds.maybe_add_dia_fair_price(opts.dia_fair_price); price_feeds.maybe_add_gateio(opts.gateio); @@ -206,6 +207,8 @@ async fn _main() -> Result<(), Error> { .into_iter() .collect::, _>>()?; + log::debug!("Collected prices: {:?}", prices); + // get prices above first to prevent websocket timeout let shutdown_tx = ShutdownSender::new(); let parachain_rpc = InterBtcParachain::from_url_with_retry(