Skip to content

Commit

Permalink
Merge pull request #479 from gregdhill/refactor/harden-faucet
Browse files Browse the repository at this point in the history
refactor: better caching, optional auth check
  • Loading branch information
sander2 authored Jun 26, 2023
2 parents a807e18 + 612d4d2 commit 8fed888
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
event-logs
faucet/kv
keyfile.json
config.json
.deploy/monitoring/data
.deploy/monitoring/prometheus
.deploy/monitoring/alertmanager
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ kv = { version = "0.22.0", features = ["json-value"] }
async-trait = "0.1.40"
futures = "0.3.5"
git-version = "0.3.4"
lazy_static = "1.4.0"

reqwest = "0.11.11"
url = "2.2.2"

# Workspace dependencies
runtime = { path = "../runtime" }
Expand Down
21 changes: 15 additions & 6 deletions faucet/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#![allow(clippy::enum_variant_names)]

use chrono::ParseError;
use chrono::ParseError as ChronoParseError;
use jsonrpc_http_server::jsonrpc_core::Error as JsonRpcError;
use kv::Error as KvError;
use parity_scale_codec::Error as CodecError;
use reqwest::Error as ReqwestError;
use runtime::Error as RuntimeError;
use serde_json::Error as SerdeJsonError;
use std::{io::Error as IoError, net::AddrParseError};
use thiserror::Error;
use url::ParseError as UrlParseError;

#[derive(Error, Debug)]
pub enum Error {
Expand All @@ -21,16 +23,23 @@ pub enum Error {
AddrParseError(#[from] AddrParseError),
#[error("Kv store error: {0}")]
KvError(#[from] KvError),
#[error("ReqwestError: {0}")]
ReqwestError(#[from] ReqwestError),
#[error("UrlParseError: {0}")]
UrlParseError(#[from] UrlParseError),
#[error("Error parsing datetime string: {0}")]
DatetimeParsingError(#[from] ParseError),
DatetimeParsingError(#[from] ChronoParseError),
#[error("IoError: {0}")]
IoError(#[from] IoError),
#[error("SerdeJsonError: {0}")]
SerdeJsonError(#[from] SerdeJsonError),

#[error("Requester balance already sufficient")]
AccountBalanceExceedsMaximum,
#[error("Requester was recently funded")]
AccountAlreadyFunded,
#[error("Mathematical operation error")]
MathError,
#[error("IoError: {0}")]
IoError(#[from] IoError),
#[error("SerdeJsonError: {0}")]
SerdeJsonError(#[from] SerdeJsonError),
#[error("Terms and conditions not signed")]
SignatureMissing,
}
137 changes: 90 additions & 47 deletions faucet/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@ use jsonrpc_http_server::{
DomainsValidation, ServerBuilder,
};
use kv::*;
use lazy_static::lazy_static;
use parity_scale_codec::{Decode, Encode};
use reqwest::Url;
use runtime::{
AccountId, CollateralBalancesPallet, CurrencyId, Error as RuntimeError, InterBtcParachain, RuntimeCurrencyInfo,
TryFromSymbol, VaultRegistryPallet,
Ss58Codec, TryFromSymbol, VaultRegistryPallet, SS58_PREFIX,
};
use serde::{Deserialize, Deserializer};
use serde::{Deserialize, Deserializer, Serialize};
use std::{net::SocketAddr, time::Duration};
use tokio::time::timeout;
use tokio::{
sync::{Mutex, MutexGuard},
time::timeout,
};

const HEALTH_DURATION: Duration = Duration::from_millis(5000);
const KV_STORE_NAME: &str = "store";

#[derive(serde::Serialize, serde::Deserialize, PartialEq)]
lazy_static! {
static ref LOCK: Mutex<()> = Mutex::new(());
}

#[derive(Serialize, Deserialize, PartialEq)]
struct FaucetRequest {
datetime: String,
account_type: FundingRequestAccountType,
Expand Down Expand Up @@ -135,21 +144,26 @@ fn has_request_expired(
)
}

async fn ensure_funding_allowed(
async fn ensure_funding_allowed<'a>(
kv: &'a Bucket<'a, String, Json<FaucetRequest>>,
parachain_rpc: &InterBtcParachain,
account_id: AccountId,
account_id: &AccountId,
allowance_config: AllowanceConfig,
last_request_json: Option<Json<FaucetRequest>>,
account_type: FundingRequestAccountType,
) -> Result<(), Error> {
) -> Result<MutexGuard<'static, ()>, Error> {
if let Some(auth_url) = allowance_config.auth_url {
ensure_signature_exists(&auth_url, account_id).await?;
}

let account_allowances = match account_type {
FundingRequestAccountType::User => allowance_config.user_allowances,
FundingRequestAccountType::Vault => allowance_config.vault_allowances,
FundingRequestAccountType::User => &allowance_config.user_allowances,
FundingRequestAccountType::Vault => &allowance_config.vault_allowances,
};
let currency_ids: Result<Vec<_>, _> = account_allowances
.iter()
.map(|x| CurrencyId::try_from_symbol(x.symbol.clone()))
.collect();

for currency_id in currency_ids?.iter() {
let free_balance = parachain_rpc
.get_free_balance_for_id(account_id.clone(), *currency_id)
Expand All @@ -176,6 +190,11 @@ async fn ensure_funding_allowed(
.checked_sub_signed(ISO8601::hours(allowance_config.faucet_cooldown_hours))
.ok_or(Error::MathError)?;

// aquire lock after auth and balance checks since they may be slow and we only
// want to guard writes to the local key store
let mutex_guard = LOCK.lock().await;

let last_request_json = kv.get(account_id.to_string())?;
match last_request_json {
Some(last_request_json) => {
let last_request_expired = has_request_expired(
Expand All @@ -188,58 +207,69 @@ async fn ensure_funding_allowed(
log::warn!("Already funded {} at {:?}", account_id, last_request_json.0.datetime);
Err(Error::AccountAlreadyFunded)
} else {
Ok(())
Ok(mutex_guard)
}
}
None => Ok(()),
None => Ok(mutex_guard),
}
}

#[derive(Deserialize)]
struct GetSignatureData {
exists: bool,
}

async fn ensure_signature_exists(auth_url: &str, account_id: &AccountId) -> Result<(), Error> {
reqwest::get(Url::parse(auth_url)?.join(&account_id.to_ss58check_with_version(SS58_PREFIX.into()))?)
.await?
.json::<GetSignatureData>()
.await?
.exists
.then(|| ())
.ok_or(Error::SignatureMissing)
}

async fn atomic_faucet_funding(
parachain_rpc: &InterBtcParachain,
kv: Bucket<'_, String, Json<FaucetRequest>>,
kv: &Bucket<'_, String, Json<FaucetRequest>>,
account_id: AccountId,
allowance_config: AllowanceConfig,
) -> Result<(), Error> {
let account_str = account_id.to_string();
let last_request_json = kv.get(account_str.clone())?;
let account_type = get_account_type(parachain_rpc, account_id.clone()).await?;
let amounts: Allowance = match account_type {
FundingRequestAccountType::User => allowance_config.user_allowances.clone(),
FundingRequestAccountType::Vault => allowance_config.vault_allowances.clone(),
};

ensure_funding_allowed(
parachain_rpc,
account_id.clone(),
allowance_config,
last_request_json,
account_type.clone(),
)
.await?;

let mut transfers = vec![];
for AllowanceAmount { symbol, amount } in amounts.iter() {
let currency_id = CurrencyId::try_from_symbol(symbol.clone())?;
log::info!(
"AccountId: {}, Currency: {:?} Type: {:?}, Amount: {}",
account_id,
currency_id.symbol().unwrap_or_default(),
account_type,
amount
);
transfers.push(parachain_rpc.transfer_to(&account_id, *amount, currency_id));
}
let mutex_guard =
ensure_funding_allowed(kv, parachain_rpc, &account_id, allowance_config, account_type.clone()).await?;

let result = futures::future::join_all(transfers).await;
// replace the previous (expired) claim datetime with the datetime of the current claim
update_kv_store(
kv,
account_id.to_string(),
Utc::now().to_rfc2822(),
account_type.clone(),
)?;
// don't block other threads for transfer since we updated the store
drop(mutex_guard);

if let Some(err) = result.into_iter().find_map(|x| x.err()) {
return Err(err.into());
}
let transfers = amounts
.into_iter()
.map(|AllowanceAmount { symbol, amount }| {
let currency_id = CurrencyId::try_from_symbol(symbol.clone())?;
log::info!(
"AccountId: {}, Currency: {:?} Type: {:?}, Amount: {}",
account_id,
currency_id.symbol().unwrap_or_default(),
account_type,
amount
);
Ok((amount, currency_id))
})
.collect::<Result<Vec<_>, Error>>()?;

// Replace the previous (expired) claim datetime with the datetime of the current claim, only update
// this after successfully transferring funds to ensure that this can be called again on error
update_kv_store(&kv, account_str, Utc::now().to_rfc2822(), account_type.clone())?;
parachain_rpc.transfer_to(&account_id, transfers).await?;
Ok(())
}

Expand All @@ -251,8 +281,21 @@ async fn fund_account(
) -> Result<(), Error> {
let parachain_rpc = parachain_rpc.clone();
let kv = open_kv_store(store)?;
atomic_faucet_funding(&parachain_rpc, kv, req.account_id.clone(), allowance_config).await?;
Ok(())
match atomic_faucet_funding(&parachain_rpc, &kv, req.account_id.clone(), allowance_config).await {
Err(Error::RuntimeError(err))
if err.is_any_module_err()
|| err.is_invalid_transaction().is_some()
|| matches!(err, RuntimeError::AssetNotFound) =>
{
let account_str = req.account_id.to_string();
log::error!("Failed to fund {}", account_str);
// transfer failed, reset the db so this can be called again
kv.remove(account_str)?;
Err(Error::RuntimeError(err))
}
Err(err) => Err(err),
Ok(_) => Ok(()),
}
}

pub async fn start_http(
Expand All @@ -262,7 +305,7 @@ pub async fn start_http(
allowance_config: AllowanceConfig,
) -> jsonrpc_http_server::CloseHandle {
let mut io = IoHandler::default();
let store = Store::new(Config::new("./kv")).expect("Unable to open kv store");
let store = Store::new(Config::new("./kv").flush_every_ms(100)).expect("Unable to open kv store");
let user_allowances_clone = allowance_config.user_allowances.clone();
let vault_allowances_clone = allowance_config.vault_allowances.clone();
io.add_sync_method("user_allowance", move |_| handle_resp(Ok(&user_allowances_clone)));
Expand Down Expand Up @@ -397,7 +440,7 @@ mod tests {
join_all(balance.iter().map(|(amount, currency)| {
let leftover = leftover_units * 10u128.pow(currency.decimals().unwrap());
let amount_to_transfer = if *amount > leftover { amount - leftover } else { 0 };
provider.transfer_to(&drain_account_id, amount_to_transfer, currency.clone())
provider.transfer_to(&drain_account_id, vec![(amount_to_transfer, currency.clone())])
}))
.await
.into_iter()
Expand Down
2 changes: 2 additions & 0 deletions faucet/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use parity_scale_codec::{Decode, Encode};
use serde::Deserialize;

#[derive(Deserialize, Debug, Clone, Encode, Decode)]
pub struct AllowanceAmount {
pub symbol: String,
pub amount: u128,
}

impl AllowanceAmount {
pub fn new(symbol: String, amount: u128) -> Self {
Self { symbol, amount }
Expand Down
2 changes: 2 additions & 0 deletions faucet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct AllowanceConfig {
pub faucet_cooldown_hours: i64,
pub user_allowances: Allowance,
pub vault_allowances: Allowance,
pub auth_url: Option<String>,
}

impl AllowanceConfig {
Expand All @@ -54,6 +55,7 @@ impl AllowanceConfig {
faucet_cooldown_hours,
user_allowances,
vault_allowances,
auth_url: None,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ impl From<module_bitcoin::Error> for Error {
}

impl Error {
pub fn is_any_module_err(&self) -> bool {
matches!(
self,
Error::SubxtRuntimeError(SubxtError::Runtime(DispatchError::Module(_))),
)
}

fn is_module_err(&self, pallet_name: &str, error_name: &str) -> bool {
matches!(
self,
Expand Down
20 changes: 15 additions & 5 deletions runtime/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ pub trait CollateralBalancesPallet {

async fn get_reserved_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result<Balance, Error>;

async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), Error>;
async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), Error>;
}

#[async_trait]
Expand All @@ -859,10 +859,20 @@ impl CollateralBalancesPallet for InterBtcParachain {
Ok(self.query_finalized_or_default(storage_key).await?.reserved)
}

async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), Error> {
self.with_unique_signer(metadata::tx().tokens().transfer(recipient.clone(), currency_id, amount))
.await?;
Ok(())
async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), Error> {
self.batch(
amounts
.into_iter()
.map(|(amount, currency_id)| {
EncodedCall::Tokens(metadata::runtime_types::orml_tokens::module::Call::transfer {
dest: recipient.clone(),
currency_id,
amount,
})
})
.collect(),
)
.await
}
}

Expand Down
2 changes: 1 addition & 1 deletion vault/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ mod tests {
async fn get_free_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result<Balance, RuntimeError>;
async fn get_reserved_balance(&self, currency_id: CurrencyId) -> Result<Balance, RuntimeError>;
async fn get_reserved_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result<Balance, RuntimeError>;
async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), RuntimeError>;
async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), RuntimeError>;
}

#[async_trait]
Expand Down
Loading

0 comments on commit 8fed888

Please sign in to comment.