Skip to content

Commit

Permalink
Fix wallet sync not finding coins of addresses which are not cached
Browse files Browse the repository at this point in the history
Previously, electrum-based blockchain implementations only synced for
`scriptPubKey`s that are already cached in `Database`.

This PR introduces a feedback mechanism, that uses `stop_gap` and the
difference between "current index" and "last active index" to determine
whether we need to cache more `scriptPubKeys`.

The `WalletSync::wallet_setup` trait now returns a `usize` on success.
This represents the number of extra `scriptPubKey`s to cache, in order
to satisfy `stop_gap` for the next call.

`Wallet::sync` now calls `WalletSync` in a loop, cacheing inbetween
subsequent calls (if needed).
  • Loading branch information
evanlinjin committed Jul 17, 2022
1 parent 9165fae commit c699aea
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 64 deletions.
4 changes: 2 additions & 2 deletions src/blockchain/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl WalletSync for AnyBlockchain {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
maybe_await!(impl_inner_method!(
self,
wallet_sync,
Expand All @@ -146,7 +146,7 @@ impl WalletSync for AnyBlockchain {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
maybe_await!(impl_inner_method!(
self,
wallet_setup,
Expand Down
4 changes: 2 additions & 2 deletions src/blockchain/compact_filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl WalletSync for CompactFiltersBlockchain {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
let first_peer = &self.peers[0];

let skip_blocks = self.skip_blocks.unwrap_or(0);
Expand Down Expand Up @@ -474,7 +474,7 @@ impl WalletSync for CompactFiltersBlockchain {
.unwrap()
.update(100.0, Some("Done".into()))?;

Ok(())
Ok(0)
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/blockchain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl WalletSync for ElectrumBlockchain {
&self,
database: &mut D,
_progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
let mut request = script_sync::start(database, self.stop_gap)?;
let mut block_times = HashMap::<u32, u32>::new();
let mut txid_to_height = HashMap::<Txid, u32>::new();
Expand All @@ -124,7 +124,9 @@ impl WalletSync for ElectrumBlockchain {
// tranascations than in the request. This should never happen but we don't want to panic.
let electrum_goof = || Error::Generic("electrum server misbehaving".to_string());

let batch_update = loop {
// `missing_count` is the min number of scripts to cache, so that we may try to satisfy
// `stop_gap` with the next attempted sync
let (batch_update, missing_count) = loop {
request = match request {
Request::Script(script_req) => {
let scripts = script_req.request().take(chunk_size);
Expand Down Expand Up @@ -232,12 +234,14 @@ impl WalletSync for ElectrumBlockchain {

tx_req.satisfy(full_details)?
}
Request::Finish(batch_update) => break batch_update,
Request::Finish(batch_update, missing_count) => {
break (batch_update, missing_count)
}
}
};

database.commit_batch(batch_update)?;
Ok(())
Ok(missing_count)
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/blockchain/esplora/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ impl WalletSync for EsploraBlockchain {
&self,
database: &mut D,
_progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
use crate::blockchain::script_sync::Request;
let mut request = script_sync::start(database, self.stop_gap)?;
let mut tx_index: HashMap<Txid, Tx> = HashMap::new();

let batch_update = loop {
let (batch_update, missing_count) = loop {
request = match request {
Request::Script(script_req) => {
let futures: FuturesOrdered<_> = script_req
Expand Down Expand Up @@ -208,13 +208,14 @@ impl WalletSync for EsploraBlockchain {
.collect::<Result<_, Error>>()?;
tx_req.satisfy(full_txs)?
}
Request::Finish(batch_update) => break batch_update,
Request::Finish(batch_update, missing_count) => {
break (batch_update, missing_count)
}
}
};

database.commit_batch(batch_update)?;

Ok(())
Ok(missing_count)
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/blockchain/esplora/ureq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ impl WalletSync for EsploraBlockchain {
&self,
database: &mut D,
_progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
use crate::blockchain::script_sync::Request;
let mut request = script_sync::start(database, self.stop_gap)?;
let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
let batch_update = loop {
let (batch_update, missing_count) = loop {
request = match request {
Request::Script(script_req) => {
let scripts = script_req
Expand Down Expand Up @@ -206,13 +206,15 @@ impl WalletSync for EsploraBlockchain {
.collect::<Result<_, Error>>()?;
tx_req.satisfy(full_txs)?
}
Request::Finish(batch_update) => break batch_update,
Request::Finish(batch_update, missing_count) => {
break (batch_update, missing_count)
}
}
};

database.commit_batch(batch_update)?;

Ok(())
Ok(missing_count)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub trait WalletSync {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error>;
) -> Result<usize, Error>;

/// If not overridden, it defaults to calling [`Self::wallet_setup`] internally.
///
Expand All @@ -158,7 +158,7 @@ pub trait WalletSync {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
maybe_await!(self.wallet_setup(database, progress_update))
}
}
Expand Down Expand Up @@ -379,15 +379,15 @@ impl<T: WalletSync> WalletSync for Arc<T> {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
maybe_await!(self.deref().wallet_setup(database, progress_update))
}

fn wallet_sync<D: BatchDatabase>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
maybe_await!(self.deref().wallet_sync(database, progress_update))
}
}
6 changes: 3 additions & 3 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl WalletSync for RpcBlockchain {
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?;
scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?);
debug!(
Expand Down Expand Up @@ -262,7 +262,7 @@ impl WalletSync for RpcBlockchain {
&self,
db: &mut D,
_progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
) -> Result<usize, Error> {
let mut indexes = HashMap::new();
for keykind in &[KeychainKind::External, KeychainKind::Internal] {
indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0));
Expand Down Expand Up @@ -395,7 +395,7 @@ impl WalletSync for RpcBlockchain {
db.set_last_index(keykind, index)?;
}

Ok(())
Ok(0)
}
}

Expand Down
95 changes: 65 additions & 30 deletions src/blockchain/script_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ pub enum Request<'a, D: BatchDatabase> {
/// A request for full transaction details of some transactions.
Tx(TxReq<'a, D>),
/// Requests are finished here's a batch database update to reflect data gathered.
Finish(D::Batch),
/// we also return a value which represents the minimum number of scripts that we should cache
/// for next round (or the missing cache count)
Finish(D::Batch, usize),
}

/// starts a sync
Expand All @@ -34,7 +36,8 @@ pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>
let scripts_needed = db
.iter_script_pubkeys(Some(keychain))?
.into_iter()
.collect();
.collect::<VecDeque<_>>();
println!("scripts_needed count: {}", scripts_needed.len());
let state = State::new(db);

Ok(Request::Script(ScriptReq {
Expand Down Expand Up @@ -117,39 +120,57 @@ impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
self.scripts_needed.pop_front();
}

let last_active_index = self
// last active index
// 0 => No last active
let last = self
.state
.last_active_index
.get(&self.keychain)
.map(|x| x + 1)
.unwrap_or(0); // so no addresses active maps to 0
.map(|&l| l + 1)
.unwrap_or(0);

Ok(
if self.script_index > last_active_index + self.stop_gap
|| self.scripts_needed.is_empty()
{
// difference between current index and last active index
let diff = self.script_index - last;

if diff <= self.stop_gap {
if !self.scripts_needed.is_empty() {
// we have not finished requesting txs with script batches, so continue
return Ok(Request::Script(self));
}

// if we obtained an active index in this sync, we have missing scripts in db cache,
// report it
if last > 0 {
debug!(
"finished scanning for transactions for keychain {:?} at index {}",
self.keychain, last_active_index
"reporting missing with: current={}, last={}, diff={}, gap={}",
self.script_index, last, diff, self.stop_gap
);
// we're done here -- check if we need to do the next keychain
if let Some(keychain) = self.next_keychains.pop() {
self.keychain = keychain;
self.script_index = 0;
self.scripts_needed = self
.state
.db
.iter_script_pubkeys(Some(keychain))?
.into_iter()
.collect();
Request::Script(self)
} else {
Request::Tx(TxReq { state: self.state })
}
} else {
Request::Script(self)
},
)
self.state
.missing_script_counts
.insert(self.keychain, self.stop_gap - diff);
}
}

debug!(
"finished scanning for txs of keychain {:?} at index {:?}",
self.keychain, last
);

if let Some(keychain) = self.next_keychains.pop() {
// we still have another keychain to request txs with
self.keychain = keychain;
self.script_index = 0;
self.scripts_needed = self
.state
.db
.iter_script_pubkeys(Some(keychain))?
.into_iter()
.collect();
return Ok(Request::Script(self));
}

// We have finished requesting txids, let's get the actual txs.
Ok(Request::Tx(TxReq { state: self.state }))
}
}

Expand Down Expand Up @@ -276,7 +297,18 @@ impl<'a, D: BatchDatabase> ConftimeReq<'a, D> {
}

if self.state.tx_missing_conftime.is_empty() {
Ok(Request::Finish(self.state.into_db_update()?))
// Obtain largest missing count (between external/internal) for simplicity
// The reasoning is that there is no point returning a map - in the future, a single
// database will only handle a single descriptor
let missing = self
.state
.missing_script_counts
.clone()
.into_values()
.reduce(std::cmp::max)
.unwrap_or(0);

Ok(Request::Finish(self.state.into_db_update()?, missing))
} else {
Ok(Request::Conftime(self))
}
Expand All @@ -294,6 +326,8 @@ struct State<'a, D> {
tx_missing_conftime: BTreeMap<Txid, TransactionDetails>,
/// The start of the sync
start_time: Instant,
/// Missing number of scripts to cache per keychain
missing_script_counts: HashMap<KeychainKind, usize>,
}

impl<'a, D: BatchDatabase> State<'a, D> {
Expand All @@ -305,6 +339,7 @@ impl<'a, D: BatchDatabase> State<'a, D> {
tx_needed: BTreeSet::default(),
tx_missing_conftime: BTreeMap::default(),
start_time: Instant::new(),
missing_script_counts: HashMap::default(),
}
}
fn into_db_update(self) -> Result<D::Batch, Error> {
Expand Down
Loading

0 comments on commit c699aea

Please sign in to comment.