Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch tx requests #1759

Merged
merged 11 commits into from
Feb 20, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [".", "test-bitcoincore-rpc"]
anyhow = { version = "1.0.56", features = ["backtrace"] }
axum = { version = "0.6.1", features = ["headers"] }
axum-server = "0.4.0"
base64 = "0.13.1"
raphjaph marked this conversation as resolved.
Show resolved Hide resolved
bech32 = "0.9.1"
bip39 = "1.0.1"
bitcoin = { version = "0.29.1", features = ["rand"] }
Expand All @@ -31,6 +32,7 @@ futures = "0.3.21"
hex = "0.4.3"
html-escaper = "0.2.0"
http = "0.2.6"
hyper = { version = "0.14.24", features = ["http1", "client"] }
indicatif = "0.17.1"
lazy_static = "1.4.0"
log = "0.4.14"
Expand Down
53 changes: 53 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use {

mod entry;
mod rtx;
mod tx_fetcher;
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
mod updater;

const SCHEMA_VERSION: u64 = 3;
Expand Down Expand Up @@ -1026,6 +1027,58 @@ mod tests {
}
}

#[test]
fn inscriptions_below_first_inscription_height_are_skipped() {
let inscription = inscription("text/plain", "hello");
raphjaph marked this conversation as resolved.
Show resolved Hide resolved
let template = TransactionTemplate {
inputs: &[(1, 0, 0)],
witness: inscription.to_witness(),
..Default::default()
};

{
let context = Context::builder().build();
context.mine_blocks(1);
let txid = context.rpc_server.broadcast_tx(template.clone());
let inscription_id = InscriptionId::from(txid);
context.mine_blocks(1);

assert_eq!(
context.index.get_inscription_by_id(inscription_id).unwrap(),
Some(inscription)
);

assert_eq!(
context
.index
.get_inscription_satpoint_by_id(inscription_id)
.unwrap(),
Some(SatPoint {
outpoint: OutPoint { txid, vout: 0 },
offset: 0,
})
);
}

{
let context = Context::builder()
.arg("--first-inscription-height=3")
.build();
context.mine_blocks(1);
let txid = context.rpc_server.broadcast_tx(template);
let inscription_id = InscriptionId::from(txid);
context.mine_blocks(1);

assert_eq!(
context
.index
.get_inscription_satpoint_by_id(inscription_id)
.unwrap(),
None,
);
}
}

#[test]
fn list_first_coinbase_transaction() {
let context = Context::builder().arg("--index-sats").build();
Expand Down
96 changes: 96 additions & 0 deletions src/index/tx_fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use {
anyhow::{anyhow, Result},
bitcoin::{consensus::deserialize, Transaction, Txid},
bitcoincore_rpc::Auth,
hyper::{client::HttpConnector, Body, Client, Method, Request},
serde::Deserialize,
};

pub(crate) struct TxFetcher {
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
client: Client<HttpConnector>,
url: String,
auth: String,
}

#[derive(Deserialize, Debug)]
struct JsonResponse {
result: Option<String>,
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
error: Option<JsonError>,
id: String,
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Deserialize, Debug)]
struct JsonError {
code: i32,
message: String,
}

impl TxFetcher {
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn new(url: &str, auth: Auth) -> Result<Self> {
if auth == Auth::None {
return Err(anyhow!("No authentication provided"));
}

let client = Client::new();

let url = if url.starts_with("http://") {
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
url.to_string()
} else {
"http://".to_string() + url
};

let (user, password) = auth.get_user_pass()?;
let auth = format!("{}:{}", user.unwrap(), password.unwrap());
let auth = format!("Basic {}", &base64::encode(auth));
Ok(TxFetcher { client, url, auth })
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) async fn get_transactions(&self, txids: Vec<Txid>) -> Result<Vec<Transaction>> {
if txids.is_empty() {
return Ok(Vec::new());
}

let mut reqs = Vec::with_capacity(txids.len());
for (i, txid) in txids.iter().enumerate() {
let req =
format!("{{\"jsonrpc\":\"2.0\",\"id\":\"{i}\",\"method\":\"getrawtransaction\",\"params\":[\"{txid:x}\"]}}");
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
reqs.push(req);
}

let body = format!("[{}]", reqs.join(","));
let req = Request::builder()
.method(Method::POST)
.uri(&self.url)
.header(hyper::header::AUTHORIZATION, &self.auth)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Body::from(body))?;

let response = self.client.request(req).await?;

let buf = hyper::body::to_bytes(response).await?;

let mut results: Vec<JsonResponse> = serde_json::from_slice(&buf)?;

if let Some(err) = results.iter().find_map(|res| res.error.as_ref()) {
return Err(anyhow!(
"Failed to fetch raw transaction: code {} message {}",
err.code,
err.message
));
}

results.sort_by(|a, b| {
raphjaph marked this conversation as resolved.
Show resolved Hide resolved
a.id
.parse::<usize>()
.unwrap()
.cmp(&b.id.parse::<usize>().unwrap())
});

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an assert if it's critical that it's sorted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not sure what I would be asserting here? It needs to be sorted before we return, but this is sorting it so we know it's sorted?

Ok(
results
.into_iter()
.map(|res| deserialize(&hex::decode(res.result.unwrap()).unwrap()).unwrap())
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
.collect(),
)
}
}
113 changes: 107 additions & 6 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use {self::inscription_updater::InscriptionUpdater, super::*, std::sync::mpsc};
use {
self::inscription_updater::InscriptionUpdater,
super::{tx_fetcher::TxFetcher, *},
futures::future::try_join_all,
std::sync::mpsc,
tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender},
};

mod inscription_updater;

Expand Down Expand Up @@ -92,6 +98,8 @@ impl Updater {

let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?;

let (mut outpoint_sender, mut value_receiver) = Self::spawn_tx_fetcher(index)?;
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved

let mut uncommitted = 0;
let mut value_cache = HashMap::new();
loop {
Expand All @@ -100,7 +108,14 @@ impl Updater {
Err(mpsc::RecvError) => break,
};

self.index_block(index, &mut wtx, block, &mut value_cache)?;
self.index_block(
index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)?;

if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
Expand Down Expand Up @@ -168,8 +183,7 @@ impl Updater {
let client =
Client::new(&index.rpc_url, index.auth.clone()).context("failed to connect to RPC URL")?;

// NB: We temporarily always fetch transactions, to avoid expensive cache misses.
let first_inscription_height = index.first_inscription_height.min(0);
let first_inscription_height = index.first_inscription_height;

thread::spawn(move || loop {
if let Some(height_limit) = height_limit {
Expand Down Expand Up @@ -243,13 +257,101 @@ impl Updater {
}
}

fn spawn_tx_fetcher(index: &Index) -> Result<(Sender<OutPoint>, Receiver<u64>)> {
let tx_fetcher = TxFetcher::new(&index.rpc_url, index.auth.clone())?;

const CHANNEL_BUFFER_SIZE: usize = 20_000;
let (outpoint_sender, mut outpoint_receiver) =
tokio::sync::mpsc::channel::<OutPoint>(CHANNEL_BUFFER_SIZE);
let (value_sender, value_receiver) = tokio::sync::mpsc::channel::<u64>(CHANNEL_BUFFER_SIZE);

const BATCH_SIZE: usize = 2048;
const PARALLEL_REQUESTS: usize = 12;

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
loop {
let Some(outpoint) = outpoint_receiver.recv().await else {
log::info!("Outpoint channel closed");
return;
};
let mut outpoints = vec![outpoint];
for _ in 0..BATCH_SIZE-1 {
let Ok(outpoint) = outpoint_receiver.try_recv() else {
break;
};
outpoints.push(outpoint);
}
let parts = (outpoints.len() / PARALLEL_REQUESTS) + 1;
let mut futs = Vec::with_capacity(PARALLEL_REQUESTS);
for chunk in outpoints.chunks(parts) {
let txids = chunk.iter().map(|outpoint| outpoint.txid).collect();
let fut = tx_fetcher.get_transactions(txids);
futs.push(fut);
}
let txs = match try_join_all(futs).await {
Ok(txs) => txs,
Err(e) => {
log::warn!("Couldn't receive txs {e}");
return;
}
};
for (i, tx) in txs.iter().flatten().enumerate() {
let Ok(_) = value_sender.send(tx.output[usize::try_from(outpoints[i].vout).unwrap()].value).await else {
log::warn!("Value channel closed unexpectedly");
return;
};
}
}
})
});

Ok((outpoint_sender, value_receiver))
}

fn index_block(
&mut self,
index: &Index,
outpoint_sender: &mut Sender<OutPoint>,
value_receiver: &mut Receiver<u64>,
wtx: &mut WriteTransaction,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
) -> Result<()> {
let Err(TryRecvError::Empty) = value_receiver.try_recv() else { return Err(anyhow!("Previous block did not consume all input values")); };

let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;

if !self.index_sats {
let txids = block
.txdata
.iter()
.map(|(_, txid)| txid)
.collect::<HashSet<_>>();
for (tx, _) in &block.txdata {
for input in &tx.input {
let prev_output = input.previous_output;
if prev_output.is_null() {
continue;
}
if txids.contains(&prev_output.txid) {
continue;
}
if value_cache.contains_key(&prev_output) {
continue;
}
if outpoint_to_value.get(&prev_output.store())?.is_some() {
continue;
}
outpoint_sender.blocking_send(prev_output)?;
}
}
}

let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?;

let start = Instant::now();
Expand Down Expand Up @@ -279,7 +381,6 @@ impl Updater {
let mut inscription_id_to_satpoint = wtx.open_table(INSCRIPTION_ID_TO_SATPOINT)?;
let mut inscription_number_to_inscription_id =
wtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?;
let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;
let mut sat_to_inscription_id = wtx.open_table(SAT_TO_INSCRIPTION_ID)?;
let mut satpoint_to_inscription_id = wtx.open_table(SATPOINT_TO_INSCRIPTION_ID)?;
let mut statistic_to_count = wtx.open_table(STATISTIC_TO_COUNT)?;
Expand All @@ -292,7 +393,7 @@ impl Updater {
let mut inscription_updater = InscriptionUpdater::new(
self.height,
&mut inscription_id_to_satpoint,
index,
value_receiver,
&mut inscription_id_to_inscription_entry,
lost_sats,
&mut inscription_number_to_inscription_id,
Expand Down
Loading