From 3f5cb6997f8bf0c889747badb9498a402ecf96a4 Mon Sep 17 00:00:00 2001 From: LLFourn Date: Fri, 29 Oct 2021 17:41:02 +1100 Subject: [PATCH 01/13] Invert dependencies in electrum sync Blockchain calls sync logic rather than the other way around. Sync logic is captured in script_sync.rs. --- src/blockchain/electrum.rs | 228 +++++++++++++++---- src/blockchain/esplora/api.rs | 117 ++++++++++ src/blockchain/esplora/mod.rs | 62 +++-- src/blockchain/esplora/reqwest.rs | 277 ++++++++++------------ src/blockchain/esplora/ureq.rs | 301 +++++++++++------------- src/blockchain/mod.rs | 5 +- src/blockchain/script_sync.rs | 367 ++++++++++++++++++++++++++++++ src/blockchain/utils.rs | 363 ----------------------------- src/testutils/blockchain_tests.rs | 68 ++++++ src/wallet/utils.rs | 34 --- 10 files changed, 1042 insertions(+), 780 deletions(-) create mode 100644 src/blockchain/esplora/api.rs create mode 100644 src/blockchain/script_sync.rs diff --git a/src/blockchain/electrum.rs b/src/blockchain/electrum.rs index 53d4dabb9..f0c264256 100644 --- a/src/blockchain/electrum.rs +++ b/src/blockchain/electrum.rs @@ -24,20 +24,20 @@ //! # Ok::<(), bdk::Error>(()) //! ``` -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; #[allow(unused_imports)] use log::{debug, error, info, trace}; -use bitcoin::{BlockHeader, Script, Transaction, Txid}; +use bitcoin::{Transaction, Txid}; use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config}; -use self::utils::{ElectrumLikeSync, ElsGetHistoryRes}; +use super::script_sync::Request; use super::*; -use crate::database::BatchDatabase; +use crate::database::{BatchDatabase, Database}; use crate::error::Error; -use crate::FeeRate; +use crate::{ConfirmationTime, FeeRate}; /// Wrapper over an Electrum Client that implements the required blockchain traits /// @@ -71,10 +71,151 @@ impl Blockchain for ElectrumBlockchain { fn setup( &self, database: &mut D, - progress_update: P, + _progress_update: P, ) -> Result<(), Error> { - self.client - .electrum_like_setup(self.stop_gap, database, progress_update) + let mut request = script_sync::start(database, self.stop_gap)?; + let mut block_times = HashMap::::new(); + let mut txid_to_height = HashMap::::new(); + let mut tx_cache = TxCache::new(database, &self.client); + let chunk_size = self.stop_gap; + // The electrum server has been inconsistent somehow in its responses during sync. For + // example, we do a batch request of transactions and the response contains less + // tranascations than in the request. This should never happen but we don't want to panic. + let electrum_goof = || Error::Generic("electrum server misbehaving".to_string()); + + let batch_update = loop { + request = match request { + Request::Script(script_req) => { + let scripts = script_req.request().take(chunk_size); + let txids_per_script: Vec> = self + .client + .batch_script_get_history(scripts) + .map_err(Error::Electrum)? + .into_iter() + .map(|txs| { + txs.into_iter() + .map(|tx| { + let tx_height = match tx.height { + none if none <= 0 => None, + height => { + txid_to_height.insert(tx.tx_hash, height as u32); + Some(height as u32) + } + }; + (tx.tx_hash, tx_height) + }) + .collect() + }) + .collect(); + + script_req.satisfy(txids_per_script)? + } + + Request::Conftime(conftimereq) => { + let needs_block_height = conftimereq + .request() + .filter_map(|txid| txid_to_height.get(txid).cloned()) + .filter(|height| block_times.get(height).is_none()) + .take(chunk_size) + .collect::>(); + + let new_block_headers = + self.client.batch_block_header(needs_block_height.clone())?; + + for (height, header) in needs_block_height.into_iter().zip(new_block_headers) { + block_times.insert(height, header.time); + } + + let conftimes = conftimereq + .request() + .take(chunk_size) + .map(|txid| { + let confirmation_time = txid_to_height + .get(txid) + .map(|height| { + let timestamp = + *block_times.get(height).ok_or_else(electrum_goof)?; + Result::<_, Error>::Ok(ConfirmationTime { + height: *height, + timestamp: timestamp.into(), + }) + }) + .transpose()?; + Ok(confirmation_time) + }) + .collect::>()?; + + conftimereq.satisfy(conftimes)? + } + Request::Tx(txreq) => { + let needs_block_height = txreq + .request() + .filter_map(|txid| txid_to_height.get(txid).cloned()) + .filter(|height| block_times.get(height).is_none()) + .take(chunk_size) + .collect::>(); + + let new_block_headers = + self.client.batch_block_header(needs_block_height.clone())?; + for (height, header) in needs_block_height.into_iter().zip(new_block_headers) { + block_times.insert(height, header.time); + } + let needs_full = txreq.request().take(chunk_size); + + tx_cache.save_txs(needs_full.clone())?; + let full_transactions = needs_full + .map(|txid| tx_cache.get(*txid).ok_or_else(electrum_goof)) + .collect::, _>>()?; + let input_txs = full_transactions.iter().flat_map(|tx| { + tx.input + .iter() + .filter(|input| !input.previous_output.is_null()) + .map(|input| &input.previous_output.txid) + }); + tx_cache.save_txs(input_txs)?; + + let full_details = full_transactions + .into_iter() + .map(|tx| { + let confirmation_time = txid_to_height + .get(&tx.txid()) + .map(|height| { + let time = block_times.get(height).ok_or_else(electrum_goof)?; + Result::<_, Error>::Ok(ConfirmationTime { + height: *height, + timestamp: *time as u64, + }) + }) + .transpose()?; + let prev_outputs = tx + .input + .iter() + .map(|input| { + if input.previous_output.is_null() { + return Ok(None); + } + let prev_tx = tx_cache + .get(input.previous_output.txid) + .ok_or_else(electrum_goof)?; + let txout = prev_tx + .output + .get(input.previous_output.vout as usize) + .ok_or_else(electrum_goof)?; + Ok(Some(txout.clone())) + }) + .collect::, Error>>()?; + Ok((confirmation_time, prev_outputs, tx)) + }) + .collect::, Error>>()?; + + txreq.satisfy(full_details)? + } + Request::Finish(batch_update) => break batch_update, + } + }; + + database.commit_batch(batch_update)?; + Ok(()) } fn get_tx(&self, txid: &Txid) -> Result, Error> { @@ -101,43 +242,48 @@ impl Blockchain for ElectrumBlockchain { } } -impl ElectrumLikeSync for Client { - fn els_batch_script_get_history<'s, I: IntoIterator + Clone>( - &self, - scripts: I, - ) -> Result>, Error> { - self.batch_script_get_history(scripts) - .map(|v| { - v.into_iter() - .map(|v| { - v.into_iter() - .map( - |electrum_client::GetHistoryRes { - height, tx_hash, .. - }| ElsGetHistoryRes { - height, - tx_hash, - }, - ) - .collect() - }) - .collect() - }) - .map_err(Error::Electrum) +struct TxCache<'a, 'b, D> { + db: &'a D, + client: &'b Client, + cache: HashMap, +} + +impl<'a, 'b, D: Database> TxCache<'a, 'b, D> { + fn new(db: &'a D, client: &'b Client) -> Self { + TxCache { + db, + client, + cache: HashMap::default(), + } } + fn save_txs<'c>(&mut self, txids: impl Iterator) -> Result<(), Error> { + let mut need_fetch = vec![]; + for txid in txids { + if self.cache.get(txid).is_some() { + continue; + } else if let Some(transaction) = self.db.get_raw_tx(txid)? { + self.cache.insert(*txid, transaction); + } else { + need_fetch.push(txid); + } + } - fn els_batch_transaction_get<'s, I: IntoIterator + Clone>( - &self, - txids: I, - ) -> Result, Error> { - self.batch_transaction_get(txids).map_err(Error::Electrum) + if !need_fetch.is_empty() { + let txs = self + .client + .batch_transaction_get(need_fetch.clone()) + .map_err(Error::Electrum)?; + for (tx, _txid) in txs.into_iter().zip(need_fetch) { + debug_assert_eq!(*_txid, tx.txid()); + self.cache.insert(tx.txid(), tx); + } + } + + Ok(()) } - fn els_batch_block_header + Clone>( - &self, - heights: I, - ) -> Result, Error> { - self.batch_block_header(heights).map_err(Error::Electrum) + fn get(&self, txid: Txid) -> Option { + self.cache.get(&txid).map(Clone::clone) } } diff --git a/src/blockchain/esplora/api.rs b/src/blockchain/esplora/api.rs new file mode 100644 index 000000000..74c46c885 --- /dev/null +++ b/src/blockchain/esplora/api.rs @@ -0,0 +1,117 @@ +//! structs from the esplora API +//! +//! see: +use crate::ConfirmationTime; +use bitcoin::{OutPoint, Script, Transaction, TxIn, TxOut, Txid}; + +#[derive(serde::Deserialize, Clone, Debug)] +pub struct PrevOut { + pub value: u64, + pub scriptpubkey: Script, +} + +#[derive(serde::Deserialize, Clone, Debug)] +pub struct Vin { + pub txid: Txid, + pub vout: u32, + // None if coinbase + pub prevout: Option, + pub scriptsig: Script, + #[serde(deserialize_with = "deserialize_witness")] + pub witness: Vec>, + pub sequence: u32, + pub is_coinbase: bool, +} + +#[derive(serde::Deserialize, Clone, Debug)] +pub struct Vout { + pub value: u64, + pub scriptpubkey: Script, +} + +#[derive(serde::Deserialize, Clone, Debug)] +pub struct TxStatus { + pub confirmed: bool, + pub block_height: Option, + pub block_time: Option, +} + +#[derive(serde::Deserialize, Clone, Debug)] +pub struct Tx { + pub txid: Txid, + pub version: i32, + pub locktime: u32, + pub vin: Vec, + pub vout: Vec, + pub status: TxStatus, + pub fee: u64, +} + +impl Tx { + pub fn to_tx(&self) -> Transaction { + Transaction { + version: self.version, + lock_time: self.locktime, + input: self + .vin + .iter() + .cloned() + .map(|vin| TxIn { + previous_output: OutPoint { + txid: vin.txid, + vout: vin.vout, + }, + script_sig: vin.scriptsig, + sequence: vin.sequence, + witness: vin.witness, + }) + .collect(), + output: self + .vout + .iter() + .cloned() + .map(|vout| TxOut { + value: vout.value, + script_pubkey: vout.scriptpubkey, + }) + .collect(), + } + } + + pub fn confirmation_time(&self) -> Option { + match self.status { + TxStatus { + confirmed: true, + block_height: Some(height), + block_time: Some(timestamp), + } => Some(ConfirmationTime { timestamp, height }), + _ => None, + } + } + + pub fn previous_outputs(&self) -> Vec> { + self.vin + .iter() + .cloned() + .map(|vin| { + vin.prevout.map(|po| TxOut { + script_pubkey: po.scriptpubkey, + value: po.value, + }) + }) + .collect() + } +} + +fn deserialize_witness<'de, D>(d: D) -> Result>, D::Error> +where + D: serde::de::Deserializer<'de>, +{ + use crate::serde::Deserialize; + use bitcoin::hashes::hex::FromHex; + let list = Vec::::deserialize(d)?; + list.into_iter() + .map(|hex_str| Vec::::from_hex(&hex_str)) + .collect::>, _>>() + .map_err(serde::de::Error::custom) +} diff --git a/src/blockchain/esplora/mod.rs b/src/blockchain/esplora/mod.rs index 921a1e62c..d4a217b54 100644 --- a/src/blockchain/esplora/mod.rs +++ b/src/blockchain/esplora/mod.rs @@ -21,8 +21,6 @@ use std::collections::HashMap; use std::fmt; use std::io; -use serde::Deserialize; - use bitcoin::consensus; use bitcoin::{BlockHash, Txid}; @@ -41,6 +39,8 @@ mod ureq; #[cfg(feature = "ureq")] pub use self::ureq::*; +mod api; + fn into_fee_rate(target: usize, estimates: HashMap) -> Result { let fee_val = estimates .into_iter() @@ -56,18 +56,6 @@ fn into_fee_rate(target: usize, estimates: HashMap) -> Result, -} - /// Errors that can happen during a sync with [`EsploraBlockchain`] #[derive(Debug)] pub enum EsploraError { @@ -107,10 +95,50 @@ impl fmt::Display for EsploraError { } } +/// Configuration for an [`EsploraBlockchain`] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +pub struct EsploraBlockchainConfig { + /// Base URL of the esplora service + /// + /// eg. `https://blockstream.info/api/` + pub base_url: String, + /// Optional URL of the proxy to use to make requests to the Esplora server + /// + /// The string should be formatted as: `://:@host:`. + /// + /// Note that the format of this value and the supported protocols change slightly between the + /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more + /// details check with the documentation of the two crates. Both of them are compiled with + /// the `socks` feature enabled. + /// + /// The proxy is ignored when targeting `wasm32`. + #[serde(skip_serializing_if = "Option::is_none")] + pub proxy: Option, + /// Number of parallel requests sent to the esplora service (default: 4) + #[serde(skip_serializing_if = "Option::is_none")] + pub concurrency: Option, + /// Stop searching addresses for transactions after finding an unused gap of this length. + pub stop_gap: usize, + /// Socket timeout. + #[serde(skip_serializing_if = "Option::is_none")] + pub timeout: Option, +} + +impl EsploraBlockchainConfig { + /// create a config with default values given the base url and stop gap + pub fn new(base_url: String) -> Self { + Self { + base_url, + proxy: None, + timeout: None, + stop_gap: 20, + concurrency: None, + } + } +} + impl std::error::Error for EsploraError {} -#[cfg(feature = "ureq")] -impl_error!(::ureq::Error, Ureq, EsploraError); #[cfg(feature = "ureq")] impl_error!(::ureq::Transport, UreqTransport, EsploraError); #[cfg(feature = "reqwest")] @@ -127,3 +155,5 @@ crate::bdk_blockchain_tests! { EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), 20) } } + +const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; diff --git a/src/blockchain/esplora/reqwest.rs b/src/blockchain/esplora/reqwest.rs index 6e9c7aad9..a6024adbc 100644 --- a/src/blockchain/esplora/reqwest.rs +++ b/src/blockchain/esplora/reqwest.rs @@ -21,20 +21,16 @@ use bitcoin::{BlockHeader, Script, Transaction, Txid}; #[allow(unused_imports)] use log::{debug, error, info, trace}; -use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; - use ::reqwest::{Client, StatusCode}; +use futures::stream::{FuturesOrdered, TryStreamExt}; -use crate::blockchain::esplora::{EsploraError, EsploraGetHistory}; -use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes}; +use super::api::Tx; +use crate::blockchain::esplora::EsploraError; use crate::blockchain::*; use crate::database::BatchDatabase; use crate::error::Error; -use crate::wallet::utils::ChunksIterator; use crate::FeeRate; -const DEFAULT_CONCURRENT_REQUESTS: u8 = 4; - #[derive(Debug)] struct UrlClient { url: String, @@ -70,7 +66,7 @@ impl EsploraBlockchain { url_client: UrlClient { url: base_url.to_string(), client: Client::new(), - concurrency: DEFAULT_CONCURRENT_REQUESTS, + concurrency: super::DEFAULT_CONCURRENT_REQUESTS, }, stop_gap, } @@ -98,11 +94,91 @@ impl Blockchain for EsploraBlockchain { fn setup( &self, database: &mut D, - progress_update: P, + _progress_update: P, ) -> Result<(), Error> { - maybe_await!(self - .url_client - .electrum_like_setup(self.stop_gap, database, progress_update)) + use crate::blockchain::script_sync::Request; + let mut request = script_sync::start(database, self.stop_gap)?; + let mut tx_index: HashMap = HashMap::new(); + + let batch_update = loop { + request = match request { + Request::Script(script_req) => { + let futures: FuturesOrdered<_> = script_req + .request() + .take(self.url_client.concurrency as usize) + .map(|script| async move { + let mut related_txs: Vec = + self.url_client._scripthash_txs(script, None).await?; + + let n_confirmed = + related_txs.iter().filter(|tx| tx.status.confirmed).count(); + // esplora pages on 25 confirmed transactions. If there's more than + // 25 we need to keep requesting. + if n_confirmed >= 25 { + loop { + let new_related_txs: Vec = self + .url_client + ._scripthash_txs( + script, + Some(related_txs.last().unwrap().txid), + ) + .await?; + let n = new_related_txs.len(); + related_txs.extend(new_related_txs); + // we've reached the end + if n < 25 { + break; + } + } + } + Result::<_, Error>::Ok(related_txs) + }) + .collect(); + let txs_per_script: Vec> = await_or_block!(futures.try_collect())?; + let mut satisfaction = vec![]; + + for txs in txs_per_script { + satisfaction.push( + txs.iter() + .map(|tx| (tx.txid, tx.status.block_height)) + .collect(), + ); + for tx in txs { + tx_index.insert(tx.txid, tx); + } + } + + script_req.satisfy(satisfaction)? + } + Request::Conftime(conftimereq) => { + let conftimes = conftimereq + .request() + .map(|txid| { + tx_index + .get(txid) + .expect("must be in index") + .confirmation_time() + }) + .collect(); + conftimereq.satisfy(conftimes)? + } + Request::Tx(txreq) => { + let full_txs = txreq + .request() + .map(|txid| { + let tx = tx_index.get(txid).expect("must be in index"); + (tx.confirmation_time(), tx.previous_outputs(), tx.to_tx()) + }) + .collect(); + txreq.satisfy(full_txs)? + } + Request::Finish(batch_update) => break batch_update, + } + }; + + database.commit_batch(batch_update)?; + + Ok(()) } fn get_tx(&self, txid: &Txid) -> Result, Error> { @@ -124,10 +200,6 @@ impl Blockchain for EsploraBlockchain { } impl UrlClient { - fn script_to_scripthash(script: &Script) -> String { - sha256::Hash::hash(script.as_bytes()).into_inner().to_hex() - } - async fn _get_tx(&self, txid: &Txid) -> Result, EsploraError> { let resp = self .client @@ -196,71 +268,27 @@ impl UrlClient { Ok(req.error_for_status()?.text().await?.parse()?) } - async fn _script_get_history( + async fn _scripthash_txs( &self, script: &Script, - ) -> Result, EsploraError> { - let mut result = Vec::new(); - let scripthash = Self::script_to_scripthash(script); - - // Add the unconfirmed transactions first - result.extend( - self.client - .get(&format!( - "{}/scripthash/{}/txs/mempool", - self.url, scripthash - )) - .send() - .await? - .error_for_status()? - .json::>() - .await? - .into_iter() - .map(|x| ElsGetHistoryRes { - tx_hash: x.txid, - height: x.status.block_height.unwrap_or(0) as i32, - }), - ); - - debug!( - "Found {} mempool txs for {} - {:?}", - result.len(), - scripthash, - script - ); - - // Then go through all the pages of confirmed transactions - let mut last_txid = String::new(); - loop { - let response = self - .client - .get(&format!( - "{}/scripthash/{}/txs/chain/{}", - self.url, scripthash, last_txid - )) - .send() - .await? - .error_for_status()? - .json::>() - .await?; - let len = response.len(); - if let Some(elem) = response.last() { - last_txid = elem.txid.to_hex(); - } - - debug!("... adding {} confirmed transactions", len); - - result.extend(response.into_iter().map(|x| ElsGetHistoryRes { - tx_hash: x.txid, - height: x.status.block_height.unwrap_or(0) as i32, - })); - - if len < 25 { - break; - } - } - - Ok(result) + last_seen: Option, + ) -> Result, EsploraError> { + let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex(); + let url = match last_seen { + Some(last_seen) => format!( + "{}/scripthash/{}/txs/chain/{}", + self.url, script_hash, last_seen + ), + None => format!("{}/scripthash/{}/txs", self.url, script_hash), + }; + Ok(self + .client + .get(url) + .send() + .await? + .error_for_status()? + .json::>() + .await?) } async fn _get_fee_estimates(&self) -> Result, EsploraError> { @@ -275,83 +303,8 @@ impl UrlClient { } } -#[maybe_async] -impl ElectrumLikeSync for UrlClient { - fn els_batch_script_get_history<'s, I: IntoIterator>( - &self, - scripts: I, - ) -> Result>, Error> { - let mut results = vec![]; - for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) { - let mut futs = FuturesOrdered::new(); - for script in chunk { - futs.push(self._script_get_history(script)); - } - let partial_results: Vec> = await_or_block!(futs.try_collect())?; - results.extend(partial_results); - } - Ok(await_or_block!(stream::iter(results).collect())) - } - - fn els_batch_transaction_get<'s, I: IntoIterator>( - &self, - txids: I, - ) -> Result, Error> { - let mut results = vec![]; - for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) { - let mut futs = FuturesOrdered::new(); - for txid in chunk { - futs.push(self._get_tx_no_opt(txid)); - } - let partial_results: Vec = await_or_block!(futs.try_collect())?; - results.extend(partial_results); - } - Ok(await_or_block!(stream::iter(results).collect())) - } - - fn els_batch_block_header>( - &self, - heights: I, - ) -> Result, Error> { - let mut results = vec![]; - for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) { - let mut futs = FuturesOrdered::new(); - for height in chunk { - futs.push(self._get_header(height)); - } - let partial_results: Vec = await_or_block!(futs.try_collect())?; - results.extend(partial_results); - } - Ok(await_or_block!(stream::iter(results).collect())) - } -} - -/// Configuration for an [`EsploraBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] -pub struct EsploraBlockchainConfig { - /// Base URL of the esplora service - /// - /// eg. `https://blockstream.info/api/` - pub base_url: String, - /// Optional URL of the proxy to use to make requests to the Esplora server - /// - /// The string should be formatted as: `://:@host:`. - /// - /// Note that the format of this value and the supported protocols change slightly between the - /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more - /// details check with the documentation of the two crates. Both of them are compiled with - /// the `socks` feature enabled. - /// - /// The proxy is ignored when targeting `wasm32`. - pub proxy: Option, - /// Number of parallel requests sent to the esplora service (default: 4) - pub concurrency: Option, - /// Stop searching addresses for transactions after finding an unused gap of this length. - pub stop_gap: usize, -} - impl ConfigurableBlockchain for EsploraBlockchain { - type Config = EsploraBlockchainConfig; + type Config = super::EsploraBlockchainConfig; fn from_config(config: &Self::Config) -> Result { let map_e = |e: reqwest::Error| Error::Esplora(Box::new(e.into())); @@ -360,13 +313,19 @@ impl ConfigurableBlockchain for EsploraBlockchain { if let Some(concurrency) = config.concurrency { blockchain.url_client.concurrency = concurrency; } + let mut builder = Client::builder(); #[cfg(not(target_arch = "wasm32"))] if let Some(proxy) = &config.proxy { - blockchain.url_client.client = Client::builder() - .proxy(reqwest::Proxy::all(proxy).map_err(map_e)?) - .build() - .map_err(map_e)?; + builder = builder.proxy(reqwest::Proxy::all(proxy).map_err(map_e)?); } + + #[cfg(not(target_arch = "wasm32"))] + if let Some(timeout) = config.timeout { + builder = builder.timeout(core::time::Duration::from_secs(timeout)); + } + + blockchain.url_client.client = builder.build().map_err(map_e)?; + Ok(blockchain) } } diff --git a/src/blockchain/esplora/ureq.rs b/src/blockchain/esplora/ureq.rs index 177b773d6..365b3281d 100644 --- a/src/blockchain/esplora/ureq.rs +++ b/src/blockchain/esplora/ureq.rs @@ -26,14 +26,14 @@ use bitcoin::hashes::hex::{FromHex, ToHex}; use bitcoin::hashes::{sha256, Hash}; use bitcoin::{BlockHeader, Script, Transaction, Txid}; -use crate::blockchain::esplora::{EsploraError, EsploraGetHistory}; -use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes}; +use super::api::Tx; +use crate::blockchain::esplora::EsploraError; use crate::blockchain::*; use crate::database::BatchDatabase; use crate::error::Error; use crate::FeeRate; -#[derive(Debug)] +#[derive(Debug, Clone)] struct UrlClient { url: String, agent: Agent, @@ -47,15 +47,7 @@ struct UrlClient { pub struct EsploraBlockchain { url_client: UrlClient, stop_gap: usize, -} - -impl std::convert::From for EsploraBlockchain { - fn from(url_client: UrlClient) -> Self { - EsploraBlockchain { - url_client, - stop_gap: 20, - } - } + concurrency: u8, } impl EsploraBlockchain { @@ -66,6 +58,7 @@ impl EsploraBlockchain { url: base_url.to_string(), agent: Agent::new(), }, + concurrency: super::DEFAULT_CONCURRENT_REQUESTS, stop_gap, } } @@ -75,6 +68,12 @@ impl EsploraBlockchain { self.url_client.agent = agent; self } + + /// Set the number of parallel requests the client can make. + pub fn with_concurrency(mut self, concurrency: u8) -> Self { + self.concurrency = concurrency; + self + } } impl Blockchain for EsploraBlockchain { @@ -91,10 +90,94 @@ impl Blockchain for EsploraBlockchain { fn setup( &self, database: &mut D, - progress_update: P, + _progress_update: P, ) -> Result<(), Error> { - self.url_client - .electrum_like_setup(self.stop_gap, database, progress_update) + use crate::blockchain::script_sync::Request; + let mut request = script_sync::start(database, self.stop_gap)?; + let mut tx_index: HashMap = HashMap::new(); + let batch_update = loop { + request = match request { + Request::Script(script_req) => { + let scripts = script_req + .request() + .take(self.concurrency as usize) + .cloned(); + + let handles = scripts.map(move |script| { + let client = self.url_client.clone(); + // make each request in its own thread. + std::thread::spawn(move || { + let mut related_txs: Vec = client._scripthash_txs(&script, None)?; + + let n_confirmed = + related_txs.iter().filter(|tx| tx.status.confirmed).count(); + // esplora pages on 25 confirmed transactions. If there's more than + // 25 we need to keep requesting. + if n_confirmed >= 25 { + loop { + let new_related_txs: Vec = client._scripthash_txs( + &script, + Some(related_txs.last().unwrap().txid), + )?; + let n = new_related_txs.len(); + related_txs.extend(new_related_txs); + // we've reached the end + if n < 25 { + break; + } + } + } + Result::<_, Error>::Ok(related_txs) + }) + }); + + let txs_per_script: Vec> = handles + .map(|handle| handle.join().unwrap()) + .collect::>()?; + let mut satisfaction = vec![]; + + for txs in txs_per_script { + satisfaction.push( + txs.iter() + .map(|tx| (tx.txid, tx.status.block_height)) + .collect(), + ); + for tx in txs { + tx_index.insert(tx.txid, tx); + } + } + + script_req.satisfy(satisfaction)? + } + Request::Conftime(conftimereq) => { + let conftimes = conftimereq + .request() + .map(|txid| { + tx_index + .get(txid) + .expect("must be in index") + .confirmation_time() + }) + .collect(); + conftimereq.satisfy(conftimes)? + } + Request::Tx(txreq) => { + let full_txs = txreq + .request() + .map(|txid| { + let tx = tx_index.get(txid).expect("must be in index"); + (tx.confirmation_time(), tx.previous_outputs(), tx.to_tx()) + }) + .collect(); + txreq.satisfy(full_txs)? + } + Request::Finish(batch_update) => break batch_update, + } + }; + + database.commit_batch(batch_update)?; + + Ok(()) } fn get_tx(&self, txid: &Txid) -> Result, Error> { @@ -117,10 +200,6 @@ impl Blockchain for EsploraBlockchain { } impl UrlClient { - fn script_to_scripthash(script: &Script) -> String { - sha256::Hash::hash(script.as_bytes()).into_inner().to_hex() - } - fn _get_tx(&self, txid: &Txid) -> Result, EsploraError> { let resp = self .agent @@ -200,81 +279,6 @@ impl UrlClient { } } - fn _script_get_history(&self, script: &Script) -> Result, EsploraError> { - let mut result = Vec::new(); - let scripthash = Self::script_to_scripthash(script); - - // Add the unconfirmed transactions first - - let resp = self - .agent - .get(&format!( - "{}/scripthash/{}/txs/mempool", - self.url, scripthash - )) - .call(); - - let v = match resp { - Ok(resp) => { - let v: Vec = resp.into_json()?; - Ok(v) - } - Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)), - Err(e) => Err(EsploraError::Ureq(e)), - }?; - - result.extend(v.into_iter().map(|x| ElsGetHistoryRes { - tx_hash: x.txid, - height: x.status.block_height.unwrap_or(0) as i32, - })); - - debug!( - "Found {} mempool txs for {} - {:?}", - result.len(), - scripthash, - script - ); - - // Then go through all the pages of confirmed transactions - let mut last_txid = String::new(); - loop { - let resp = self - .agent - .get(&format!( - "{}/scripthash/{}/txs/chain/{}", - self.url, scripthash, last_txid - )) - .call(); - - let v = match resp { - Ok(resp) => { - let v: Vec = resp.into_json()?; - Ok(v) - } - Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)), - Err(e) => Err(EsploraError::Ureq(e)), - }?; - - let len = v.len(); - if let Some(elem) = v.last() { - last_txid = elem.txid.to_hex(); - } - - debug!("... adding {} confirmed transactions", len); - - result.extend(v.into_iter().map(|x| ElsGetHistoryRes { - tx_hash: x.txid, - height: x.status.block_height.unwrap_or(0) as i32, - })); - - if len < 25 { - break; - } - } - - Ok(result) - } - fn _get_fee_estimates(&self) -> Result, EsploraError> { let resp = self .agent @@ -292,6 +296,22 @@ impl UrlClient { Ok(map) } + + fn _scripthash_txs( + &self, + script: &Script, + last_seen: Option, + ) -> Result, EsploraError> { + let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex(); + let url = match last_seen { + Some(last_seen) => format!( + "{}/scripthash/{}/txs/chain/{}", + self.url, script_hash, last_seen + ), + None => format!("{}/scripthash/{}/txs", self.url, script_hash), + }; + Ok(self.agent.get(&url).call()?.into_json()?) + } } fn is_status_not_found(status: u16) -> bool { @@ -315,84 +335,37 @@ fn into_bytes(resp: Response) -> Result, io::Error> { Ok(buf) } -impl ElectrumLikeSync for UrlClient { - fn els_batch_script_get_history<'s, I: IntoIterator>( - &self, - scripts: I, - ) -> Result>, Error> { - let mut results = vec![]; - for script in scripts.into_iter() { - let v = self._script_get_history(script)?; - results.push(v); - } - Ok(results) - } - - fn els_batch_transaction_get<'s, I: IntoIterator>( - &self, - txids: I, - ) -> Result, Error> { - let mut results = vec![]; - for txid in txids.into_iter() { - let tx = self._get_tx_no_opt(txid)?; - results.push(tx); - } - Ok(results) - } - - fn els_batch_block_header>( - &self, - heights: I, - ) -> Result, Error> { - let mut results = vec![]; - for height in heights.into_iter() { - let header = self._get_header(height)?; - results.push(header); - } - Ok(results) - } -} - -/// Configuration for an [`EsploraBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] -pub struct EsploraBlockchainConfig { - /// Base URL of the esplora service eg. `https://blockstream.info/api/` - pub base_url: String, - /// Optional URL of the proxy to use to make requests to the Esplora server - /// - /// The string should be formatted as: `://:@host:`. - /// - /// Note that the format of this value and the supported protocols change slightly between the - /// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more - /// details check with the documentation of the two crates. Both of them are compiled with - /// the `socks` feature enabled. - /// - /// The proxy is ignored when targeting `wasm32`. - pub proxy: Option, - /// Socket read timeout. - pub timeout_read: u64, - /// Socket write timeout. - pub timeout_write: u64, - /// Stop searching addresses for transactions after finding an unused gap of this length. - pub stop_gap: usize, -} - impl ConfigurableBlockchain for EsploraBlockchain { - type Config = EsploraBlockchainConfig; + type Config = super::EsploraBlockchainConfig; fn from_config(config: &Self::Config) -> Result { - let mut agent_builder = ureq::AgentBuilder::new() - .timeout_read(Duration::from_secs(config.timeout_read)) - .timeout_write(Duration::from_secs(config.timeout_write)); + let mut agent_builder = ureq::AgentBuilder::new(); + + if let Some(timeout) = config.timeout { + agent_builder = agent_builder.timeout(Duration::from_secs(timeout)); + } if let Some(proxy) = &config.proxy { agent_builder = agent_builder .proxy(Proxy::new(proxy).map_err(|e| Error::Esplora(Box::new(e.into())))?); } - Ok( - EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap) - .with_agent(agent_builder.build()), - ) + let mut blockchain = EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap) + .with_agent(agent_builder.build()); + + if let Some(concurrency) = config.concurrency { + blockchain = blockchain.with_concurrency(concurrency); + } + + Ok(blockchain) + } +} + +impl From for EsploraError { + fn from(e: ureq::Error) -> Self { + match e { + ureq::Error::Status(code, _) => EsploraError::HttpResponse(code), + e => EsploraError::Ureq(e), + } } } diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 4d84caea7..bbf0303df 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -27,9 +27,6 @@ use crate::database::BatchDatabase; use crate::error::Error; use crate::FeeRate; -#[cfg(any(feature = "electrum", feature = "esplora"))] -pub(crate) mod utils; - #[cfg(any( feature = "electrum", feature = "esplora", @@ -37,6 +34,8 @@ pub(crate) mod utils; feature = "rpc" ))] pub mod any; +mod script_sync; + #[cfg(any( feature = "electrum", feature = "esplora", diff --git a/src/blockchain/script_sync.rs b/src/blockchain/script_sync.rs new file mode 100644 index 000000000..d9c0bcc88 --- /dev/null +++ b/src/blockchain/script_sync.rs @@ -0,0 +1,367 @@ +/*! +This models a how a sync happens where you have a server that you send your script pubkeys to and it +returns associated transactions i.e. electrum. +*/ +#![allow(dead_code)] +use crate::{ + database::{BatchDatabase, BatchOperations, DatabaseUtils}, + ConfirmationTime, Error, KeychainKind, LocalUtxo, TransactionDetails, +}; +use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid}; +use std::collections::{HashMap, HashSet, VecDeque}; + +struct State<'a, D> { + db: &'a D, + last_active_index: HashMap, + tx_needed: VecDeque, + conftime_needed: VecDeque, + observed_txs: Vec, +} + +/// A reqeust for on-chain information +pub enum Request<'a, D: BatchDatabase> { + /// A request for transactions related to script pubkeys. + Script(ScriptReq<'a, D>), + /// A request for confirmation times for some transactions. + Conftime(ConftimeReq<'a, D>), + /// A request for full transaction details of some transactions. + Tx(TxReq<'a, D>), + /// Requests are finished here's a batch database update to reflect data gathered. + Finish(D::Batch), +} + +/// starts a sync +pub fn start(db: &D, stop_gap: usize) -> Result, Error> { + use rand::seq::SliceRandom; + let mut keychains = vec![KeychainKind::Internal, KeychainKind::External]; + // shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses + keychains.shuffle(&mut rand::thread_rng()); + let keychain = keychains.pop().unwrap(); + let scripts_needed = db + .iter_script_pubkeys(Some(keychain))? + .into_iter() + .collect(); + let state = State { + db, + last_active_index: HashMap::default(), + conftime_needed: VecDeque::default(), + observed_txs: vec![], + tx_needed: VecDeque::default(), + }; + + Ok(Request::Script(ScriptReq { + state, + scripts_needed, + script_index: 0, + stop_gap, + keychain, + next_keychains: keychains, + tx_interested: HashSet::default(), + tx_conftime_interested: HashSet::default(), + })) +} + +pub struct ScriptReq<'a, D: BatchDatabase> { + state: State<'a, D>, + script_index: usize, + scripts_needed: VecDeque