Skip to content

Commit

Permalink
feat: smt code review (#5876)
Browse files Browse the repository at this point in the history
Description
---
Updates the SMT branch with code review

---------

Co-authored-by: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com>
  • Loading branch information
SWvheerden and hansieodendaal committed Nov 2, 2023
1 parent 2302562 commit d7d61e0
Show file tree
Hide file tree
Showing 19 changed files with 169 additions and 249 deletions.
4 changes: 2 additions & 2 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ transactions = []
mempool_proto = []
base_node = ["tari_mmr", "transactions", "mempool_proto", "base_node_proto", "monero", "randomx-rs"]
base_node_proto = []
benches = ["base_node", "criterion"]
benches = ["base_node"]

[dependencies]
tari_common = { path = "../../common" }
Expand Down Expand Up @@ -44,7 +44,6 @@ borsh = { version = "0.10", features = ["const-generics"] }
bytes = "0.5"
chacha20poly1305 = "0.10.1"
chrono = { version = "0.4.19", default-features = false, features = ["serde"] }
criterion = { version = "0.4.0", optional = true }
decimal-rs = "0.1.42"
derivative = "2.2.0"
digest = "0.10"
Expand Down Expand Up @@ -77,6 +76,7 @@ uint = { version = "0.9", default-features = false }
zeroize = "1"

[dev-dependencies]
criterion = { version = "0.4.0" }
tari_p2p = { path = "../../base_layer/p2p", features = ["test-mocks"] }
tari_test_utils = { path = "../../infrastructure/test_utils" }
curve25519-dalek = { package = "tari-curve25519-dalek", version = "4.0.3" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ where B: BlockchainBackend + 'static
},
NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => {
let mut res = Vec::with_capacity(utxo_hashes.len());
for (output, spent) in (self.blockchain_db.fetch_utxos(utxo_hashes).await?)
for (output, spent) in (self
.blockchain_db
.fetch_outputs_with_spend_status_at_tip(utxo_hashes)
.await?)
.into_iter()
.flatten()
{
Expand Down
17 changes: 0 additions & 17 deletions base_layer/core/src/base_node/proto/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,6 @@ impl TryFrom<Block> for proto::BlockBodyResponse {
}
}

// impl TryFrom<PrunedOutput> for proto::SyncUtxo {
// type Error = String;
//
// fn try_from(output: PrunedOutput) -> Result<Self, Self::Error> {
// Ok(match output {
// PrunedOutput::Pruned { output_hash } => proto::SyncUtxo {
// utxo: Some(proto::sync_utxo::Utxo::PrunedOutput(proto::PrunedOutput {
// hash: output_hash.to_vec(),
// })),
// },
// PrunedOutput::NotPruned { output } => proto::SyncUtxo {
// utxo: Some(proto::sync_utxo::Utxo::Output(output.try_into()?)),
// },
// })
// }
// }

impl From<Vec<FeePerGramStat>> for proto::GetMempoolFeePerGramStatsResponse {
fn from(stats: Vec<FeePerGramStat>) -> Self {
Self {
Expand Down
14 changes: 7 additions & 7 deletions base_layer/core/src/base_node/proto/wallet_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ message FetchUtxosResponse {

message QueryDeletedRequest {
repeated bytes hashes = 1;
google.protobuf.BytesValue chain_must_include_header = 2;
bytes chain_must_include_header = 2;
}

message QueryDeletedResponse {
repeated QueryDeletedData data = 1;
bytes best_block = 2;
uint64 height_of_longest_chain = 3;
bytes best_block_hash = 2;
uint64 best_block_height = 3;
}

message QueryDeletedData{
uint64 mined_height = 1;
uint64 mined_at_height = 1;
bytes block_mined_in = 2;
uint64 height_deleted_at = 3;
bytes block_deleted_in = 4;
Expand All @@ -92,13 +92,13 @@ message UtxoQueryRequest {

message UtxoQueryResponses {
repeated UtxoQueryResponse responses = 1;
bytes best_block = 3;
uint64 height_of_longest_chain = 4;
bytes best_block_hash = 3;
uint64 best_block_height = 4;
}

message UtxoQueryResponse {
tari.types.TransactionOutput output = 1;
uint64 mined_height = 2;
uint64 mined_at_height = 2;
bytes mined_in_block = 3;
bytes output_hash = 4;
uint64 mined_timestamp = 5;
Expand Down
31 changes: 14 additions & 17 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
.collect::<Result<_, _>>()
.map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
let utxos = db
.fetch_utxos(hashes)
.fetch_outputs_with_spend_status_at_tip(hashes)
.await
.rpc_status_internal_error(LOG_TARGET)?
.into_iter()
Expand Down Expand Up @@ -403,7 +403,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
);

let mined_info_resp = db
.fetch_utxos_and_mined_info(hashes)
.fetch_outputs_mined_info(hashes)
.await
.rpc_status_internal_error(LOG_TARGET)?;

Expand All @@ -421,14 +421,14 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
.rpc_status_internal_error(LOG_TARGET)?;

Ok(Response::new(UtxoQueryResponses {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().to_vec(),
best_block_height: metadata.height_of_longest_chain(),
best_block_hash: metadata.best_block().to_vec(),
responses: mined_info_resp
.into_iter()
.flatten()
.map(|utxo| {
Ok(UtxoQueryResponse {
mined_height: utxo.mined_height,
mined_at_height: utxo.mined_height,
mined_in_block: utxo.header_hash.to_vec(),
output_hash: utxo.output.hash().to_vec(),
output: match utxo.output.try_into() {
Expand All @@ -445,9 +445,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
}))
}

/// Currently the wallet cannot use the deleted bitmap because it can't compile croaring
/// at some point in the future, it might be better to send the wallet the actual bitmap so
/// it can check itself
async fn query_deleted(
&self,
request: Request<QueryDeletedRequest>,
Expand All @@ -458,7 +455,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
&"Received more hashes than we allow".to_string(),
));
}
let chain_include_header = message.chain_must_include_header.unwrap_or_default();
let chain_include_header = message.chain_must_include_header;
if !chain_include_header.is_empty() {
let hash = chain_include_header
.try_into()
Expand All @@ -478,18 +475,18 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
let hashes: Vec<FixedHash> = message
.hashes
.into_iter()
.map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
.map(|hash| hash.try_into())
.collect::<Result<_, _>>()
.map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
.map_err(|_| RpcStatus::bad_request(&"Malformed utxo hash received".to_string()))?;
let mut return_data = Vec::with_capacity(hashes.len());
let utxos = self
.db
.fetch_utxos_and_mined_info(hashes.clone())
.fetch_outputs_mined_info(hashes.clone())
.await
.rpc_status_internal_error(LOG_TARGET)?;
let txos = self
.db
.fetch_txos_and_mined_info(hashes)
.fetch_inputs_mined_info(hashes)
.await
.rpc_status_internal_error(LOG_TARGET)?;
if utxos.len() != txos.len() {
Expand All @@ -498,13 +495,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
for (utxo, txo) in utxos.iter().zip(txos.iter()) {
let mut data = match utxo {
None => QueryDeletedData {
mined_height: 0,
mined_at_height: 0,
block_mined_in: Vec::new(),
height_deleted_at: 0,
block_deleted_in: Vec::new(),
},
Some(u) => QueryDeletedData {
mined_height: u.mined_height,
mined_at_height: u.mined_height,
block_mined_in: u.header_hash.to_vec(),
height_deleted_at: 0,
block_deleted_in: Vec::new(),
Expand All @@ -523,8 +520,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
.rpc_status_internal_error(LOG_TARGET)?;

Ok(Response::new(QueryDeletedResponse {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().to_vec(),
best_block_height: metadata.height_of_longest_chain(),
best_block_hash: metadata.best_block().to_vec(),
data: return_data,
}))
}
Expand Down
27 changes: 14 additions & 13 deletions base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,40 +122,41 @@ where B: BlockchainBackend + 'static
break;
}

let utxos = self
let outputs_with_statuses = self
.db
.fetch_utxos_in_block(current_header.hash(), None)
.fetch_outputs_in_block_with_spend_state(current_header.hash(), None)
.await
.rpc_status_internal_error(LOG_TARGET)?;
let utxos = utxos
.into_iter()
// Don't include pruned UTXOs
.map(|(utxo, _spent)| utxo.try_into()).collect::<Result<Vec<proto::types::TransactionOutput>, String>>().map_err(|err| RpcStatus::general(&err))?;
let outputs = outputs_with_statuses
.into_iter()
.map(|(output, _spent)| output.try_into())
.collect::<Result<Vec<proto::types::TransactionOutput>, String>>()
.map_err(|err| RpcStatus::general(&err))?;

debug!(
target: LOG_TARGET,
"Streaming {} UTXO(s) for block #{} (Hash: {})",
utxos.len(),
outputs.len(),
current_header.height,
current_header_hash.to_hex(),
);

for utxo_chunk in utxos.chunks(2000) {
let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxo_chunk.to_vec(),
for output_chunk in outputs.chunks(2000) {
let output_block_response = SyncUtxosByBlockResponse {
outputs: output_chunk.to_vec(),
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
};
// Ensure task stops if the peer prematurely stops their RPC session
if tx.send(Ok(utxo_block_response)).await.is_err() {
if tx.send(Ok(output_block_response)).await.is_err() {
break;
}
}
if utxos.is_empty() {
if outputs.is_empty() {
// if its empty, we need to send an empty vec of outputs.
let utxo_block_response = SyncUtxosByBlockResponse {
outputs: utxos,
outputs: Vec::new(),
height: current_header.height,
header_hash: current_header_hash.to_vec(),
mined_timestamp: current_header.timestamp.as_u64(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let remote_num_outputs = to_header.output_smt_size;
self.num_outputs = remote_num_outputs;

// todo we need to be able to pause and resume this
let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
current: 0,
total: self.num_outputs,
Expand Down Expand Up @@ -669,7 +668,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let mut kernel_sum = HomomorphicCommitment::default();
let mut burned_sum = HomomorphicCommitment::default();

let mut prev_mmr = 0;
let mut prev_kernel_mmr = 0;

let height = header.height();
Expand All @@ -680,13 +678,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let curr_header = db.fetch_chain_header(h)?;
trace!(
target: LOG_TARGET,
"Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}",
"Fetching utxos from db: height:{}",
curr_header.height(),
curr_header.header().output_smt_size,
prev_mmr,
curr_header.header().output_smt_size - 1
);
let utxos = db.fetch_utxos_in_block(*curr_header.hash(), Some(header_hash))?;
let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?;
debug!(
target: LOG_TARGET,
"{} output(s) loaded for height {}",
Expand All @@ -708,7 +703,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
utxo_sum = &u.commitment + &utxo_sum;
}
}
prev_mmr = curr_header.header().output_smt_size;

let kernels = db.fetch_kernels_in_block(*curr_header.hash())?;
trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
Expand All @@ -720,7 +714,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
prev_kernel_mmr = curr_header.header().kernel_mmr_size;

if h % 1000 == 0 {
if h % 1000 == 0 && height != 0 {
debug!(
target: LOG_TARGET,
"Final Validation: {:.2}% complete. Height: {} sync",
Expand Down
20 changes: 14 additions & 6 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ where B: BlockchainBackend + 'static
.await
.rpc_status_internal_error(LOG_TARGET)?
.ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?;
if start_header.height > end_header.height {
return Err(RpcStatus::bad_request(&format!(
"Start header height({}) cannot be greater than the end header height({})",
start_header.height, end_header.height
)));
}

task::spawn(async move {
debug!(
Expand Down Expand Up @@ -138,9 +144,9 @@ where B: BlockchainBackend + 'static
break;
}

let utxos = self
let outputs_with_statuses = self
.db
.fetch_utxos_in_block(current_header.hash(), Some(end_header.hash()))
.fetch_outputs_in_block_with_spend_state(current_header.hash(), Some(end_header.hash()))
.await
.rpc_status_internal_error(LOG_TARGET)?;
debug!(
Expand All @@ -156,14 +162,14 @@ where B: BlockchainBackend + 'static
break;
}

let utxos = utxos
let utxos = outputs_with_statuses
.into_iter()
.filter_map(|(utxo, spent)| {
.filter_map(|(output, spent)| {
// We only send unspent utxos
if spent {
None
} else {
match utxo.try_into() {
match output.try_into() {
Ok(tx_ouput) => Some(Ok(SyncUtxosResponse {
output: Some(tx_ouput),
mined_header: current_header.hash().to_vec(),
Expand All @@ -178,13 +184,15 @@ where B: BlockchainBackend + 'static
.map(Ok);

// Ensure task stops if the peer prematurely stops their RPC session
let utxos_len = utxos.len();
if utils::mpsc::send_all(tx, utxos).await.is_err() {
break;
}

debug!(
target: LOG_TARGET,
"Streamed utxos in {:.2?} (including stream backpressure)",
"Streamed {} utxos in {:.2?} (including stream backpressure)",
utxos_len,
timer.elapsed()
);

Expand Down
Loading

0 comments on commit d7d61e0

Please sign in to comment.