Skip to content

Commit

Permalink
Add blockchain.outpoint.subscribe RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
romanz committed Aug 21, 2021
1 parent 31765e3 commit ea5a907
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 6 deletions.
51 changes: 47 additions & 4 deletions src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
use bitcoin::{
consensus::{deserialize, serialize},
hashes::hex::{FromHex, ToHex},
BlockHash, Txid,
BlockHash, OutPoint, Txid,
};
use rayon::prelude::*;
use serde_derive::Deserialize;
Expand All @@ -17,7 +17,7 @@ use crate::{
daemon::{self, extract_bitcoind_error, Daemon},
merkle::Proof,
metrics::Histogram,
status::ScriptHashStatus,
status::{OutPointStatus, ScriptHashStatus},
tracker::Tracker,
types::ScriptHash,
};
Expand All @@ -32,6 +32,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
pub struct Client {
tip: Option<BlockHash>,
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
outpoints: HashMap<OutPoint, OutPointStatus>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -158,7 +159,25 @@ impl Rpc {
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update status")?;
.context("failed to update scripthash status")?;

notifications.extend(
client
.outpoints
.par_iter_mut()
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
match self.tracker.update_outpoint_status(status, &self.daemon) {
Ok(true) => Some(Ok(notification(
"blockchain.outpoint.subscribe",
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
))),
Ok(false) => None, // outpoint status is the same
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update scripthash status")?,
);

if let Some(old_tip) = client.tip {
let new_tip = self.tracker.chain().tip();
Expand Down Expand Up @@ -270,6 +289,24 @@ impl Rpc {
Ok(json!(statushash))
}

fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
let outpoint = OutPoint::new(txid, vout);
let mut status = OutPointStatus::new(outpoint);
self.tracker
.update_outpoint_status(&mut status, &self.daemon)?;
let result = json!(status);
client.outpoints.insert(outpoint, status);
Ok(result)
}

fn outpoint_unsubscribe(
&self,
client: &mut Client,
(txid, vout): (Txid, u32),
) -> Result<Value> {
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
}

fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
let mut status = ScriptHashStatus::new(scripthash);
self.tracker
Expand Down Expand Up @@ -404,6 +441,8 @@ impl Rpc {
Call::HeadersSubscribe => self.headers_subscribe(client),
Call::MempoolFeeHistogram => self.get_fee_histogram(),
Call::PeersSubscribe => Ok(json!([])),
Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args),
Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args),
Call::Ping => Ok(Value::Null),
Call::RelayFee => self.relayfee(),
Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args),
Expand Down Expand Up @@ -436,18 +475,20 @@ enum Call {
Banner,
BlockHeader((usize,)),
BlockHeaders((usize, usize)),
TransactionBroadcast((String,)),
Donation,
EstimateFee((u16,)),
Features,
HeadersSubscribe,
MempoolFeeHistogram,
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
OutPointUnsubscribe((Txid, u32)),
PeersSubscribe,
Ping,
RelayFee,
ScriptHashGetBalance((ScriptHash,)),
ScriptHashGetHistory((ScriptHash,)),
ScriptHashSubscribe((ScriptHash,)),
TransactionBroadcast((String,)),
TransactionGet(TxGetArgs),
TransactionGetMerkle((Txid, usize)),
Version((String, Version)),
Expand All @@ -464,6 +505,8 @@ impl Call {
"blockchain.scripthash.get_balance" => Call::ScriptHashGetBalance(convert(params)?),
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
"blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?),
"blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?),
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),
Expand Down
141 changes: 140 additions & 1 deletion src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bitcoin::{
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
};
use rayon::prelude::*;
use serde::ser::{Serialize, Serializer};
use serde::ser::{Serialize, SerializeMap, Serializer};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -45,12 +45,26 @@ impl TxEntry {
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Height {
Confirmed { height: usize },
Unconfirmed { has_unconfirmed_inputs: bool },
}

impl Height {
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
let height = chain
.get_block_height(&blockhash)
.expect("missing block in chain");
Self::Confirmed { height }
}

fn unconfirmed(e: &crate::mempool::Entry) -> Self {
Self::Unconfirmed {
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
}
}

fn as_i64(&self) -> i64 {
match self {
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
Expand Down Expand Up @@ -449,6 +463,131 @@ fn filter_inputs(tx: &Transaction, outpoints: &HashSet<OutPoint>) -> Vec<OutPoin
.collect()
}

pub struct OutPointStatus {
outpoint: OutPoint,
funding: Option<Height>,
spending: Option<(Txid, Height)>,
tip: BlockHash,
}

impl Serialize for OutPointStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
if let Some(funding) = &self.funding {
map.serialize_entry("height", &funding)?;
}
if let Some((txid, height)) = &self.spending {
map.serialize_entry("spender_txhash", &txid)?;
map.serialize_entry("spender_height", &height)?;
}
map.end()
}
}

impl OutPointStatus {
pub(crate) fn new(outpoint: OutPoint) -> Self {
Self {
outpoint,
funding: None,
spending: None,
tip: BlockHash::default(),
}
}

pub(crate) fn sync(
&mut self,
index: &Index,
mempool: &Mempool,
daemon: &Daemon,
) -> Result<bool> {
let funding = self.sync_funding(index, daemon, mempool)?;
let spending = self.sync_spending(index, daemon, mempool)?;
let same_status = (self.funding == funding) && (self.spending == spending);
self.funding = funding;
self.spending = spending;
self.tip = index.chain().tip();
Ok(!same_status)
}

/// Return true iff current tip became unconfirmed
fn is_reorg(&self, chain: &Chain) -> bool {
chain.get_block_height(&self.tip).is_none()
}

fn sync_funding(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<Height>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some(Height::Confirmed { .. }) = &self.funding {
return Ok(self.funding);
}
}
let mut confirmed = None;
daemon.for_blocks(
index.filter_by_txid(self.outpoint.txid),
|blockhash, block| {
for tx in block.txdata {
let txid = tx.txid();
let output_len = u32::try_from(tx.output.len()).unwrap();
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
if confirmed.is_none() {
confirmed = Some(Height::from_blockhash(blockhash, chain));
return;
}
}
}
},
)?;
Ok(confirmed.or_else(|| {
mempool
.get(&self.outpoint.txid)
.map(|entry| Height::unconfirmed(entry))
}))
}

fn sync_spending(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<(Txid, Height)>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some((_, Height::Confirmed { .. })) = &self.spending {
return Ok(self.spending);
}
}
let spending_blockhashes = index.filter_by_spending(self.outpoint);
let mut confirmed = None;
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
for tx in block.txdata {
for txi in &tx.input {
if txi.previous_output == self.outpoint {
// TODO: there should be only one spending input
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
return;
}
}
}
})?;
Ok(confirmed.or_else(|| {
let entries = mempool.filter_by_spending(&self.outpoint);
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
entries
.first()
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
}))
}
}

#[cfg(test)]
mod tests {
use super::HistoryEntry;
Expand Down
10 changes: 9 additions & 1 deletion src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
index::Index,
mempool::{Histogram, Mempool},
metrics::Metrics,
status::{Balance, HistoryEntry, ScriptHashStatus},
status::{Balance, HistoryEntry, OutPointStatus, ScriptHashStatus},
};

/// Electrum protocol subscriptions' tracker
Expand Down Expand Up @@ -76,6 +76,14 @@ impl Tracker {
Ok(prev_statushash != status.statushash())
}

pub fn update_outpoint_status(
&self,
status: &mut OutPointStatus,
daemon: &Daemon,
) -> Result<bool> {
status.sync(&self.index, &self.mempool, daemon)
}

pub fn get_balance(&self, status: &ScriptHashStatus, cache: &Cache) -> Balance {
let get_amount_fn = |outpoint: OutPoint| {
cache
Expand Down

0 comments on commit ea5a907

Please sign in to comment.