Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch tx requests #1759

Merged
merged 11 commits into from
Feb 20, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [".", "test-bitcoincore-rpc"]
anyhow = { version = "1.0.56", features = ["backtrace"] }
axum = { version = "0.6.1", features = ["headers"] }
axum-server = "0.4.0"
base64 = "0.13.1"
raphjaph marked this conversation as resolved.
Show resolved Hide resolved
bech32 = "0.9.1"
bip39 = "1.0.1"
bitcoin = { version = "0.29.1", features = ["rand"] }
Expand All @@ -31,6 +32,7 @@ futures = "0.3.21"
hex = "0.4.3"
html-escaper = "0.2.0"
http = "0.2.6"
hyper = { version = "0.14.24", features = ["http1", "client"] }
indicatif = "0.17.1"
lazy_static = "1.4.0"
log = "0.4.14"
Expand Down
53 changes: 53 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use {
};

mod entry;
mod fetcher;
mod rtx;
mod updater;

Expand Down Expand Up @@ -1026,6 +1027,58 @@ mod tests {
}
}

#[test]
fn inscriptions_below_first_inscription_height_are_skipped() {
let inscription = inscription("text/plain;charset=utf-8", "hello");
let template = TransactionTemplate {
inputs: &[(1, 0, 0)],
witness: inscription.to_witness(),
..Default::default()
};

{
let context = Context::builder().build();
context.mine_blocks(1);
let txid = context.rpc_server.broadcast_tx(template.clone());
let inscription_id = InscriptionId::from(txid);
context.mine_blocks(1);

assert_eq!(
context.index.get_inscription_by_id(inscription_id).unwrap(),
Some(inscription)
);

assert_eq!(
context
.index
.get_inscription_satpoint_by_id(inscription_id)
.unwrap(),
Some(SatPoint {
outpoint: OutPoint { txid, vout: 0 },
offset: 0,
})
);
}

{
let context = Context::builder()
.arg("--first-inscription-height=3")
.build();
context.mine_blocks(1);
let txid = context.rpc_server.broadcast_tx(template);
let inscription_id = InscriptionId::from(txid);
context.mine_blocks(1);

assert_eq!(
context
.index
.get_inscription_satpoint_by_id(inscription_id)
.unwrap(),
None,
);
}
}

#[test]
fn list_first_coinbase_transaction() {
let context = Context::builder().arg("--index-sats").build();
Expand Down
112 changes: 112 additions & 0 deletions src/index/fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use {
anyhow::{anyhow, Result},
bitcoin::{Transaction, Txid},
bitcoincore_rpc::Auth,
hyper::{client::HttpConnector, Body, Client, Method, Request, Uri},
serde::Deserialize,
serde_json::{json, Value},
};

pub(crate) struct Fetcher {
client: Client<HttpConnector>,
url: Uri,
auth: String,
}

#[derive(Deserialize, Debug)]
struct JsonResponse<T> {
result: Option<T>,
error: Option<JsonError>,
id: usize,
}

#[derive(Deserialize, Debug)]
struct JsonError {
code: i32,
message: String,
}

impl Fetcher {
pub(crate) fn new(url: &str, auth: Auth) -> Result<Self> {
if auth == Auth::None {
return Err(anyhow!("No rpc authentication provided"));
}

let client = Client::new();

let url = if url.starts_with("http://") {
url.to_string()
} else {
"http://".to_string() + url
};

let url = Uri::try_from(&url).map_err(|e| anyhow!("Invalid rpc url {url}: {e}"))?;

let (user, password) = auth.get_user_pass()?;
let auth = format!("{}:{}", user.unwrap(), password.unwrap());
let auth = format!("Basic {}", &base64::encode(auth));
Ok(Fetcher { client, url, auth })
}

pub(crate) async fn get_transactions(&self, txids: Vec<Txid>) -> Result<Vec<Transaction>> {
if txids.is_empty() {
return Ok(Vec::new());
}

let mut reqs = Vec::with_capacity(txids.len());
for (i, txid) in txids.iter().enumerate() {
let req = json!({
"jsonrpc": "2.0",
"id": i, // Use the index as id, so we can quickly sort the response
"method": "getrawtransaction",
"params": [ txid ]
});
reqs.push(req);
}

let body = Value::Array(reqs).to_string();
let req = Request::builder()
.method(Method::POST)
.uri(&self.url)
.header(hyper::header::AUTHORIZATION, &self.auth)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Body::from(body))?;

let response = self.client.request(req).await?;

let buf = hyper::body::to_bytes(response).await?;

let mut results: Vec<JsonResponse<String>> = serde_json::from_slice(&buf)?;

// Return early on any error, because we need all results to proceed
if let Some(err) = results.iter().find_map(|res| res.error.as_ref()) {
return Err(anyhow!(
"Failed to fetch raw transaction: code {} message {}",
err.code,
err.message
));
}

// Results from batched JSON-RPC requests can come back in any order, so we must sort them by id
results.sort_by(|a, b| a.id.cmp(&b.id));

let txs = results
.into_iter()
.map(|res| {
res
.result
.ok_or_else(|| anyhow!("Missing result for batched JSON-RPC response"))
.and_then(|str| {
hex::decode(str)
.map_err(|e| anyhow!("Result for batched JSON-RPC response not valid hex: {e}"))
})
.and_then(|hex| {
bitcoin::consensus::deserialize(&hex).map_err(|e| {
anyhow!("Result for batched JSON-RPC response not valid bitcoin tx: {e}")
})
})
})
.collect::<Result<Vec<Transaction>>>()?;
Ok(txs)
}
}
135 changes: 129 additions & 6 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use {self::inscription_updater::InscriptionUpdater, super::*, std::sync::mpsc};
use {
self::inscription_updater::InscriptionUpdater,
super::{fetcher::Fetcher, *},
futures::future::try_join_all,
std::sync::mpsc,
tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender},
};

mod inscription_updater;

Expand Down Expand Up @@ -92,6 +98,8 @@ impl Updater {

let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?;

let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?;

let mut uncommitted = 0;
let mut value_cache = HashMap::new();
loop {
Expand All @@ -100,7 +108,14 @@ impl Updater {
Err(mpsc::RecvError) => break,
};

self.index_block(index, &mut wtx, block, &mut value_cache)?;
self.index_block(
index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)?;

if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
Expand Down Expand Up @@ -168,8 +183,7 @@ impl Updater {
let client =
Client::new(&index.rpc_url, index.auth.clone()).context("failed to connect to RPC URL")?;

// NB: We temporarily always fetch transactions, to avoid expensive cache misses.
let first_inscription_height = index.first_inscription_height.min(0);
let first_inscription_height = index.first_inscription_height;

thread::spawn(move || loop {
if let Some(height_limit) = height_limit {
Expand Down Expand Up @@ -243,13 +257,123 @@ impl Updater {
}
}

fn spawn_fetcher(index: &Index) -> Result<(Sender<OutPoint>, Receiver<u64>)> {
let fetcher = Fetcher::new(&index.rpc_url, index.auth.clone())?;

// Not sure if any block has more than 20k inputs, but none so far after first inscription block
const CHANNEL_BUFFER_SIZE: usize = 20_000;
let (outpoint_sender, mut outpoint_receiver) =
tokio::sync::mpsc::channel::<OutPoint>(CHANNEL_BUFFER_SIZE);
let (value_sender, value_receiver) = tokio::sync::mpsc::channel::<u64>(CHANNEL_BUFFER_SIZE);

// Batch 2048 missing inputs at a time. Arbitrarily chosen for now, maybe higher or lower can be faster?
// Did rudimentary benchmarks with 1024 and 4096 and time was roughly the same.
const BATCH_SIZE: usize = 2048;
// Default rpcworkqueue in bitcoind is 16, meaning more than 16 concurrent requests will be rejected.
// Since we are already requesting blocks on a separate thread, and we don't want to break if anything
// else runs a request, we keep this to 12.
const PARALLEL_REQUESTS: usize = 12;

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
loop {
let Some(outpoint) = outpoint_receiver.recv().await else {
log::debug!("Outpoint channel closed");
return;
};
// There's no try_iter on tokio::sync::mpsc::Receiver like std::sync::mpsc::Receiver.
// So we just loop until BATCH_SIZE doing try_recv until it returns None.
let mut outpoints = vec![outpoint];
for _ in 0..BATCH_SIZE-1 {
let Ok(outpoint) = outpoint_receiver.try_recv() else {
break;
};
outpoints.push(outpoint);
}
// Break outpoints into chunks for parallel requests
let chunk_size = (outpoints.len() / PARALLEL_REQUESTS) + 1;
let mut futs = Vec::with_capacity(PARALLEL_REQUESTS);
for chunk in outpoints.chunks(chunk_size) {
let txids = chunk.iter().map(|outpoint| outpoint.txid).collect();
let fut = fetcher.get_transactions(txids);
futs.push(fut);
}
let txs = match try_join_all(futs).await {
Ok(txs) => txs,
Err(e) => {
log::error!("Couldn't receive txs {e}");
return;
}
};
// Send all tx output values back in order
for (i, tx) in txs.iter().flatten().enumerate() {
let Ok(_) = value_sender.send(tx.output[usize::try_from(outpoints[i].vout).unwrap()].value).await else {
log::error!("Value channel closed unexpectedly");
return;
};
}
}
})
});

Ok((outpoint_sender, value_receiver))
}

fn index_block(
&mut self,
index: &Index,
outpoint_sender: &mut Sender<OutPoint>,
value_receiver: &mut Receiver<u64>,
wtx: &mut WriteTransaction,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
) -> Result<()> {
// If value_receiver still has values something went wrong with the last block
// Could be an assert, shouldn't recover from this and commit the last block
let Err(TryRecvError::Empty) = value_receiver.try_recv() else {
return Err(anyhow!("Previous block did not consume all input values"));
};

let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;

if !self.index_sats {
// Send all missing input outpoints to be fetched right away
let txids = block
.txdata
.iter()
.map(|(_, txid)| txid)
.collect::<HashSet<_>>();
for (tx, _) in &block.txdata {
for input in &tx.input {
let prev_output = input.previous_output;
// We don't need coinbase input value
if prev_output.is_null() {
continue;
}
// We don't need input values from txs earlier in the block, since they'll be added to value_cache
// when the tx is indexed
if txids.contains(&prev_output.txid) {
continue;
}
// We don't need input values we already have in our value_cache from earlier blocks
if value_cache.contains_key(&prev_output) {
continue;
}
// We don't need input values we already have in our outpoint_to_value table from earlier blocks that
// were committed to db already
if outpoint_to_value.get(&prev_output.store())?.is_some() {
continue;
}
// We don't know the value of this tx input. Send this outpoint to background thread to be fetched
outpoint_sender.blocking_send(prev_output)?;
}
}
}

let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?;

let start = Instant::now();
Expand Down Expand Up @@ -279,7 +403,6 @@ impl Updater {
let mut inscription_id_to_satpoint = wtx.open_table(INSCRIPTION_ID_TO_SATPOINT)?;
let mut inscription_number_to_inscription_id =
wtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?;
let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;
let mut sat_to_inscription_id = wtx.open_table(SAT_TO_INSCRIPTION_ID)?;
let mut satpoint_to_inscription_id = wtx.open_table(SATPOINT_TO_INSCRIPTION_ID)?;
let mut statistic_to_count = wtx.open_table(STATISTIC_TO_COUNT)?;
Expand All @@ -292,7 +415,7 @@ impl Updater {
let mut inscription_updater = InscriptionUpdater::new(
self.height,
&mut inscription_id_to_satpoint,
index,
value_receiver,
&mut inscription_id_to_inscription_entry,
lost_sats,
&mut inscription_number_to_inscription_id,
Expand Down
Loading