Skip to content

Commit

Permalink
Improve logging for RpcBlockchain
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Jul 30, 2022
1 parent 783b975 commit 45caf45
Showing 1 changed file with 48 additions and 20 deletions.
68 changes: 48 additions & 20 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use bitcoincore_rpc::json::{
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::debug;
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
Expand Down Expand Up @@ -185,30 +185,39 @@ impl WalletSync for RpcBlockchain {
db: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
let db_scripts = db.iter_script_pubkeys(None)?;
let db_spks = db.iter_script_pubkeys(None)?;
let db_spk_count = db_spks.len();

// this is a hack to check whether the scripts are coming from a derivable descriptor
// we assume for non-derivable descriptors, the initial script count is always 1
let is_derivable = db_scripts.len() > 1;
// we assume for non-derivable descriptors, the initial script count is always 1 and
// since we are working with two descriptors, we use 2 to determine derivability
let is_derivable = db_spk_count > 2;

// ensure db scripts meet start script count requirements
if is_derivable && db_scripts.len() < self.sync_params.start_script_count {
return Err(Error::MissingCachedScripts(MissingCachedScripts {
last_count: db_scripts.len(),
missing_count: self.sync_params.start_script_count - db_scripts.len(),
}));
if is_derivable && db_spks.len() < self.sync_params.start_script_count {
let err = MissingCachedScripts {
last_count: db_spk_count,
missing_count: self.sync_params.start_script_count - db_spk_count,
};
debug!("requesting more scriptPubKeys with: {:?}", err);
return Err(Error::MissingCachedScripts(err));
}

// this tells Core wallet where to sync from for imported scripts
let start_epoch = db
.get_sync_time()?
.map_or(self.sync_params.start_time, |st| st.block_time.timestamp);

info!(
"start import with: db_scripts={}, derivable={}, use_descriptors={} start={}",
db_spk_count, is_derivable, self.is_descriptors, start_epoch
);

// import all scripts from db into Core wallet
if self.is_descriptors {
import_descriptors(&self.client, start_epoch, db_scripts.iter())?;
import_descriptors(&self.client, start_epoch, db_spks.iter())?;
} else {
import_multi(&self.client, start_epoch, db_scripts.iter())?;
import_multi(&self.client, start_epoch, db_spks.iter())?;
}

// await sync (TODO: Maybe make this async)
Expand Down Expand Up @@ -352,6 +361,8 @@ impl DbState {
})
.collect::<Result<HashMap<_, _>, Error>>()?;

info!("initial db state: txs={} utxos={}", txs.len(), utxos.len());

let retained_txs = HashSet::with_capacity(txs.len());
let updated_txs = HashSet::with_capacity(txs.len());
let updated_utxos = HashSet::with_capacity(utxos.len());
Expand All @@ -373,11 +384,8 @@ impl DbState {
where
D: BatchDatabase,
{
let tx_iter = CoreTxIter::new(client, 10);

for tx_res in tx_iter {
for tx_res in CoreTxIter::new(client, 100) {
let tx_res = tx_res?;

let mut updated = false;

let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
Expand Down Expand Up @@ -527,7 +535,7 @@ impl DbState {
if res.info.confirmations > 0 || client.get_mempool_entry(&res.info.txid).is_ok() {
Some(res)
} else {
debug!("tx filtered: {}", res.info.txid);
debug!("tx filtered out: {}", res.info.txid);
None
}
}
Expand Down Expand Up @@ -570,6 +578,7 @@ impl DbState {
prev_tx.output.get(outpoint.vout as usize)
}

// updates the db state's last_index for the given keychain (if larger than current last_index)
fn _update_last_index(&mut self, keychain: KeychainKind, index: u32) {
let mut updated = false;

Expand All @@ -587,18 +596,25 @@ impl DbState {
});

if updated {
debug!(
"db state: last_index updated for {}: {}",
keychain.as_byte(),
index
);
self.updated_last_indexes.insert(keychain);
}
}

/// Prepare db batch operations.
fn update_batch<D: BatchDatabase>(&self, batch: &mut D::Batch) -> Result<(), Error> {
let mut del_txs = 0_u32;

// delete stale txs from db
// stale = not retained
self.txs
.keys()
.filter(|&txid| !self.retained_txs.contains(txid))
.try_for_each(|txid| batch.del_tx(txid, false).map(|_| ()))?;
.try_for_each(|txid| batch.del_tx(txid, false).map(|_| del_txs += 1))?;

// update txs
self.updated_txs
Expand All @@ -618,6 +634,13 @@ impl DbState {
.map(|keychain| self.last_indexes.get_key_value(keychain).unwrap())
.try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;

info!(
"db batch updates: del_txs={}, update_txs={}, update_utxos={}",
del_txs,
self.updated_txs.len(),
self.updated_utxos.len()
);

Ok(())
}
}
Expand Down Expand Up @@ -678,6 +701,7 @@ where
Ok(())
}

/// Iterates through results of multiple `listtransactions` calls.
struct CoreTxIter<'a> {
client: &'a Client,
page_size: usize,
Expand All @@ -688,7 +712,11 @@ struct CoreTxIter<'a> {
}

impl<'a> CoreTxIter<'a> {
fn new(client: &'a Client, page_size: usize) -> Self {
fn new(client: &'a Client, mut page_size: usize) -> Self {
if page_size > 1000 {
page_size = 1000;
}

Self {
client,
page_size,
Expand Down Expand Up @@ -768,14 +796,14 @@ fn await_wallet_scan(
loop {
match get_scanning_details(client)? {
ScanningDetails::Scanning { duration, progress } => {
println!("scanning: duration={}, progress={}", duration, progress);
debug!("scanning: duration={}, progress={}", duration, progress);
progress_update
.update(progress, Some(format!("elapsed for {} seconds", duration)))?;
thread::sleep(dur);
}
ScanningDetails::NotScanning(_) => {
progress_update.update(1.0, None)?;
println!("scanning: done!");
info!("scanning: done!");
return Ok(());
}
};
Expand Down

0 comments on commit 45caf45

Please sign in to comment.