diff --git a/base_layer/core/Cargo.toml b/base_layer/core/Cargo.toml index 083d5d7a31..f7ff9793bc 100644 --- a/base_layer/core/Cargo.toml +++ b/base_layer/core/Cargo.toml @@ -15,7 +15,7 @@ transactions = [] mempool_proto = [] base_node = ["tari_mmr", "transactions", "mempool_proto", "base_node_proto", "monero", "randomx-rs"] base_node_proto = [] -benches = ["base_node", "criterion"] +benches = ["base_node"] [dependencies] tari_common = { path = "../../common" } @@ -44,7 +44,6 @@ borsh = { version = "0.10", features = ["const-generics"] } bytes = "0.5" chacha20poly1305 = "0.10.1" chrono = { version = "0.4.19", default-features = false, features = ["serde"] } -criterion = { version = "0.4.0", optional = true } decimal-rs = "0.1.42" derivative = "2.2.0" digest = "0.10" @@ -77,6 +76,7 @@ uint = { version = "0.9", default-features = false } zeroize = "1" [dev-dependencies] +criterion = { version = "0.4.0" } tari_p2p = { path = "../../base_layer/p2p", features = ["test-mocks"] } tari_test_utils = { path = "../../infrastructure/test_utils" } curve25519-dalek = { package = "tari-curve25519-dalek", version = "4.0.3" } diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 7765c055fc..2346022546 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -163,7 +163,10 @@ where B: BlockchainBackend + 'static }, NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => { let mut res = Vec::with_capacity(utxo_hashes.len()); - for (output, spent) in (self.blockchain_db.fetch_utxos(utxo_hashes).await?) + for (output, spent) in (self + .blockchain_db + .fetch_outputs_with_spend_status_at_tip(utxo_hashes) + .await?) .into_iter() .flatten() { diff --git a/base_layer/core/src/base_node/proto/rpc.rs b/base_layer/core/src/base_node/proto/rpc.rs index 6110f2890f..be4a049ebd 100644 --- a/base_layer/core/src/base_node/proto/rpc.rs +++ b/base_layer/core/src/base_node/proto/rpc.rs @@ -37,23 +37,6 @@ impl TryFrom for proto::BlockBodyResponse { } } -// impl TryFrom for proto::SyncUtxo { -// type Error = String; -// -// fn try_from(output: PrunedOutput) -> Result { -// Ok(match output { -// PrunedOutput::Pruned { output_hash } => proto::SyncUtxo { -// utxo: Some(proto::sync_utxo::Utxo::PrunedOutput(proto::PrunedOutput { -// hash: output_hash.to_vec(), -// })), -// }, -// PrunedOutput::NotPruned { output } => proto::SyncUtxo { -// utxo: Some(proto::sync_utxo::Utxo::Output(output.try_into()?)), -// }, -// }) -// } -// } - impl From> for proto::GetMempoolFeePerGramStatsResponse { fn from(stats: Vec) -> Self { Self { diff --git a/base_layer/core/src/base_node/proto/wallet_rpc.proto b/base_layer/core/src/base_node/proto/wallet_rpc.proto index d70f3ee566..e00dcce3c3 100644 --- a/base_layer/core/src/base_node/proto/wallet_rpc.proto +++ b/base_layer/core/src/base_node/proto/wallet_rpc.proto @@ -70,17 +70,17 @@ message FetchUtxosResponse { message QueryDeletedRequest { repeated bytes hashes = 1; - google.protobuf.BytesValue chain_must_include_header = 2; + bytes chain_must_include_header = 2; } message QueryDeletedResponse { repeated QueryDeletedData data = 1; - bytes best_block = 2; - uint64 height_of_longest_chain = 3; + bytes best_block_hash = 2; + uint64 best_block_height = 3; } message QueryDeletedData{ - uint64 mined_height = 1; + uint64 mined_at_height = 1; bytes block_mined_in = 2; uint64 height_deleted_at = 3; bytes block_deleted_in = 4; @@ -92,13 +92,13 @@ message UtxoQueryRequest { message UtxoQueryResponses { repeated UtxoQueryResponse responses = 1; - bytes best_block = 3; - uint64 height_of_longest_chain = 4; + bytes best_block_hash = 3; + uint64 best_block_height = 4; } message UtxoQueryResponse { tari.types.TransactionOutput output = 1; - uint64 mined_height = 2; + uint64 mined_at_height = 2; bytes mined_in_block = 3; bytes output_hash = 4; uint64 mined_timestamp = 5; diff --git a/base_layer/core/src/base_node/rpc/service.rs b/base_layer/core/src/base_node/rpc/service.rs index d3c431ef6a..6718f328ff 100644 --- a/base_layer/core/src/base_node/rpc/service.rs +++ b/base_layer/core/src/base_node/rpc/service.rs @@ -349,7 +349,7 @@ impl BaseNodeWalletService for BaseNodeWalletRpc .collect::>() .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?; let utxos = db - .fetch_utxos(hashes) + .fetch_outputs_with_spend_status_at_tip(hashes) .await .rpc_status_internal_error(LOG_TARGET)? .into_iter() @@ -403,7 +403,7 @@ impl BaseNodeWalletService for BaseNodeWalletRpc ); let mined_info_resp = db - .fetch_utxos_and_mined_info(hashes) + .fetch_outputs_mined_info(hashes) .await .rpc_status_internal_error(LOG_TARGET)?; @@ -421,14 +421,14 @@ impl BaseNodeWalletService for BaseNodeWalletRpc .rpc_status_internal_error(LOG_TARGET)?; Ok(Response::new(UtxoQueryResponses { - height_of_longest_chain: metadata.height_of_longest_chain(), - best_block: metadata.best_block().to_vec(), + best_block_height: metadata.height_of_longest_chain(), + best_block_hash: metadata.best_block().to_vec(), responses: mined_info_resp .into_iter() .flatten() .map(|utxo| { Ok(UtxoQueryResponse { - mined_height: utxo.mined_height, + mined_at_height: utxo.mined_height, mined_in_block: utxo.header_hash.to_vec(), output_hash: utxo.output.hash().to_vec(), output: match utxo.output.try_into() { @@ -445,9 +445,6 @@ impl BaseNodeWalletService for BaseNodeWalletRpc })) } - /// Currently the wallet cannot use the deleted bitmap because it can't compile croaring - /// at some point in the future, it might be better to send the wallet the actual bitmap so - /// it can check itself async fn query_deleted( &self, request: Request, @@ -458,7 +455,7 @@ impl BaseNodeWalletService for BaseNodeWalletRpc &"Received more hashes than we allow".to_string(), )); } - let chain_include_header = message.chain_must_include_header.unwrap_or_default(); + let chain_include_header = message.chain_must_include_header; if !chain_include_header.is_empty() { let hash = chain_include_header .try_into() @@ -478,18 +475,18 @@ impl BaseNodeWalletService for BaseNodeWalletRpc let hashes: Vec = message .hashes .into_iter() - .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string())) + .map(|hash| hash.try_into()) .collect::>() - .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?; + .map_err(|_| RpcStatus::bad_request(&"Malformed utxo hash received".to_string()))?; let mut return_data = Vec::with_capacity(hashes.len()); let utxos = self .db - .fetch_utxos_and_mined_info(hashes.clone()) + .fetch_outputs_mined_info(hashes.clone()) .await .rpc_status_internal_error(LOG_TARGET)?; let txos = self .db - .fetch_txos_and_mined_info(hashes) + .fetch_inputs_mined_info(hashes) .await .rpc_status_internal_error(LOG_TARGET)?; if utxos.len() != txos.len() { @@ -498,13 +495,13 @@ impl BaseNodeWalletService for BaseNodeWalletRpc for (utxo, txo) in utxos.iter().zip(txos.iter()) { let mut data = match utxo { None => QueryDeletedData { - mined_height: 0, + mined_at_height: 0, block_mined_in: Vec::new(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, Some(u) => QueryDeletedData { - mined_height: u.mined_height, + mined_at_height: u.mined_height, block_mined_in: u.header_hash.to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), @@ -523,8 +520,8 @@ impl BaseNodeWalletService for BaseNodeWalletRpc .rpc_status_internal_error(LOG_TARGET)?; Ok(Response::new(QueryDeletedResponse { - height_of_longest_chain: metadata.height_of_longest_chain(), - best_block: metadata.best_block().to_vec(), + best_block_height: metadata.height_of_longest_chain(), + best_block_hash: metadata.best_block().to_vec(), data: return_data, })) } diff --git a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs index 49a1ebc121..41bccc8cfc 100644 --- a/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs +++ b/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs @@ -122,40 +122,41 @@ where B: BlockchainBackend + 'static break; } - let utxos = self + let outputs_with_statuses = self .db - .fetch_utxos_in_block(current_header.hash(), None) + .fetch_outputs_in_block_with_spend_state(current_header.hash(), None) .await .rpc_status_internal_error(LOG_TARGET)?; - let utxos = utxos - .into_iter() - // Don't include pruned UTXOs - .map(|(utxo, _spent)| utxo.try_into()).collect::, String>>().map_err(|err| RpcStatus::general(&err))?; + let outputs = outputs_with_statuses + .into_iter() + .map(|(output, _spent)| output.try_into()) + .collect::, String>>() + .map_err(|err| RpcStatus::general(&err))?; debug!( target: LOG_TARGET, "Streaming {} UTXO(s) for block #{} (Hash: {})", - utxos.len(), + outputs.len(), current_header.height, current_header_hash.to_hex(), ); - for utxo_chunk in utxos.chunks(2000) { - let utxo_block_response = SyncUtxosByBlockResponse { - outputs: utxo_chunk.to_vec(), + for output_chunk in outputs.chunks(2000) { + let output_block_response = SyncUtxosByBlockResponse { + outputs: output_chunk.to_vec(), height: current_header.height, header_hash: current_header_hash.to_vec(), mined_timestamp: current_header.timestamp.as_u64(), }; // Ensure task stops if the peer prematurely stops their RPC session - if tx.send(Ok(utxo_block_response)).await.is_err() { + if tx.send(Ok(output_block_response)).await.is_err() { break; } } - if utxos.is_empty() { + if outputs.is_empty() { // if its empty, we need to send an empty vec of outputs. let utxo_block_response = SyncUtxosByBlockResponse { - outputs: utxos, + outputs: Vec::new(), height: current_header.height, header_hash: current_header_hash.to_vec(), mined_timestamp: current_header.timestamp.as_u64(), diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 9de647b8ee..0725309461 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -484,7 +484,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let remote_num_outputs = to_header.output_smt_size; self.num_outputs = remote_num_outputs; - // todo we need to be able to pause and resume this let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs { current: 0, total: self.num_outputs, @@ -669,7 +668,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let mut kernel_sum = HomomorphicCommitment::default(); let mut burned_sum = HomomorphicCommitment::default(); - let mut prev_mmr = 0; let mut prev_kernel_mmr = 0; let height = header.height(); @@ -680,13 +678,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let curr_header = db.fetch_chain_header(h)?; trace!( target: LOG_TARGET, - "Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}", + "Fetching utxos from db: height:{}", curr_header.height(), - curr_header.header().output_smt_size, - prev_mmr, - curr_header.header().output_smt_size - 1 ); - let utxos = db.fetch_utxos_in_block(*curr_header.hash(), Some(header_hash))?; + let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?; debug!( target: LOG_TARGET, "{} output(s) loaded for height {}", @@ -708,7 +703,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { utxo_sum = &u.commitment + &utxo_sum; } } - prev_mmr = curr_header.header().output_smt_size; let kernels = db.fetch_kernels_in_block(*curr_header.hash())?; trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); @@ -720,7 +714,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } prev_kernel_mmr = curr_header.header().kernel_mmr_size; - if h % 1000 == 0 { + if h % 1000 == 0 && height != 0 { debug!( target: LOG_TARGET, "Final Validation: {:.2}% complete. Height: {} sync", diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index 779d41fecc..24945676f8 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -83,6 +83,12 @@ where B: BlockchainBackend + 'static .await .rpc_status_internal_error(LOG_TARGET)? .ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?; + if start_header.height > end_header.height { + return Err(RpcStatus::bad_request(&format!( + "Start header height({}) cannot be greater than the end header height({})", + start_header.height, end_header.height + ))); + } task::spawn(async move { debug!( @@ -138,9 +144,9 @@ where B: BlockchainBackend + 'static break; } - let utxos = self + let outputs_with_statuses = self .db - .fetch_utxos_in_block(current_header.hash(), Some(end_header.hash())) + .fetch_outputs_in_block_with_spend_state(current_header.hash(), Some(end_header.hash())) .await .rpc_status_internal_error(LOG_TARGET)?; debug!( @@ -156,14 +162,14 @@ where B: BlockchainBackend + 'static break; } - let utxos = utxos + let utxos = outputs_with_statuses .into_iter() - .filter_map(|(utxo, spent)| { + .filter_map(|(output, spent)| { // We only send unspent utxos if spent { None } else { - match utxo.try_into() { + match output.try_into() { Ok(tx_ouput) => Some(Ok(SyncUtxosResponse { output: Some(tx_ouput), mined_header: current_header.hash().to_vec(), @@ -178,13 +184,15 @@ where B: BlockchainBackend + 'static .map(Ok); // Ensure task stops if the peer prematurely stops their RPC session + let utxos_len = utxos.len(); if utils::mpsc::send_all(tx, utxos).await.is_err() { break; } debug!( target: LOG_TARGET, - "Streamed utxos in {:.2?} (including stream backpressure)", + "Streamed {} utxos in {:.2?} (including stream backpressure)", + utxos_len, timer.elapsed() ); diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 7e74828660..76b68b338c 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -44,7 +44,7 @@ use crate::{ }, chain_storage::{ blockchain_database::MmrRoots, - utxo_mined_info::{TxoMinedInfo, UtxoMinedInfo}, + utxo_mined_info::{InputMinedInfo, OutputMinedInfo}, BlockAddResult, BlockchainBackend, BlockchainDatabase, @@ -152,15 +152,14 @@ impl AsyncBlockchainDb { make_async_fn!(fetch_horizon_data() -> HorizonData, "fetch_horizon_data"); //---------------------------------- TXO --------------------------------------------// - make_async_fn!(fetch_utxo(hash: HashOutput) -> Option, "fetch_utxo"); - make_async_fn!(fetch_utxos(hashes: Vec) -> Vec>, "fetch_utxos"); + make_async_fn!(fetch_outputs_with_spend_status_at_tip(hashes: Vec) -> Vec>, "fetch_outputs_with_spend_status_at_tip"); - make_async_fn!(fetch_utxos_and_mined_info(hashes: Vec) -> Vec>, "fetch_utxos_and_mined_info"); + make_async_fn!(fetch_outputs_mined_info(hashes: Vec) -> Vec>, "fetch_outputs_mined_info"); - make_async_fn!(fetch_txos_and_mined_info(hashes: Vec) -> Vec>, "fetch_txos_and_mined_info"); + make_async_fn!(fetch_inputs_mined_info(hashes: Vec) -> Vec>, "fetch_inputs_mined_info"); - make_async_fn!(fetch_utxos_in_block(hash: HashOutput, spend_header: Option) -> Vec<(TransactionOutput, bool)>, "fetch_utxos_in_block"); + make_async_fn!(fetch_outputs_in_block_with_spend_state(hash: HashOutput, spend_header: Option) -> Vec<(TransactionOutput, bool)>, "fetch_outputs_in_block_with_spend_state"); make_async_fn!(fetch_outputs_in_block(hash: HashOutput) -> Vec, "fetch_outputs_in_block"); @@ -377,11 +376,6 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> { self } - pub fn prune_outputs_at_positions(&mut self, block_hash: BlockHash) -> &mut Self { - self.transaction.prune_outputs_spent_at_hash(block_hash); - self - } - pub async fn commit(&mut self) -> Result<(), ChainStorageError> { let transaction = mem::take(&mut self.transaction); self.db.write(transaction).await diff --git a/base_layer/core/src/chain_storage/blockchain_backend.rs b/base_layer/core/src/chain_storage/blockchain_backend.rs index aef4cb6519..875ba9b3a5 100644 --- a/base_layer/core/src/chain_storage/blockchain_backend.rs +++ b/base_layer/core/src/chain_storage/blockchain_backend.rs @@ -17,10 +17,10 @@ use crate::{ DbTransaction, DbValue, HorizonData, + InputMinedInfo, MmrTree, + OutputMinedInfo, Reorg, - TxoMinedInfo, - UtxoMinedInfo, }, transactions::transaction_components::{TransactionInput, TransactionKernel, TransactionOutput}, OutputSmt, @@ -88,17 +88,17 @@ pub trait BlockchainBackend: Send + Sync { ) -> Result, ChainStorageError>; /// Fetch all UTXOs and spends in the block - fn fetch_utxos_in_block( + fn fetch_outputs_in_block_with_spend_state( &self, header_hash: &HashOutput, - spend_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError>; /// Fetch a specific output. Returns the output and the leaf index in the output MMR - fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError>; + fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError>; /// Fetch a specific input. Returns the output and the leaf index in the output MMR - fn fetch_input(&self, input_hash: &HashOutput) -> Result, ChainStorageError>; + fn fetch_input(&self, input_hash: &HashOutput) -> Result, ChainStorageError>; /// Returns the unspent TransactionOutput output that matches the given commitment if it exists in the current UTXO /// set, otherwise None is returned. diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 895396411c..b53f285d83 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -67,18 +67,18 @@ use crate::{ }, db_transaction::{DbKey, DbTransaction, DbValue}, error::ChainStorageError, - utxo_mined_info::UtxoMinedInfo, + utxo_mined_info::OutputMinedInfo, BlockAddResult, BlockchainBackend, DbBasicStats, DbTotalSizeStats, HorizonData, + InputMinedInfo, MmrTree, Optional, OrNotFound, Reorg, TargetDifficulties, - TxoMinedInfo, }, common::rolling_vec::RollingVec, consensus::{ @@ -382,12 +382,6 @@ where B: BlockchainBackend db.fetch_chain_metadata() } - // Fetch the utxo - pub fn fetch_utxo(&self, hash: HashOutput) -> Result, ChainStorageError> { - let db = self.db_read_access()?; - Ok(db.fetch_output(&hash)?.map(|mined_info| mined_info.output)) - } - pub fn fetch_unspent_output_by_commitment( &self, commitment: &Commitment, @@ -398,7 +392,7 @@ where B: BlockchainBackend /// Return a list of matching utxos, with each being `None` if not found. If found, the transaction /// output, and a boolean indicating if the UTXO was spent as of the current tip. - pub fn fetch_utxos( + pub fn fetch_outputs_with_spend_status_at_tip( &self, hashes: Vec, ) -> Result>, ChainStorageError> { @@ -418,10 +412,10 @@ where B: BlockchainBackend Ok(result) } - pub fn fetch_utxos_and_mined_info( + pub fn fetch_outputs_mined_info( &self, hashes: Vec, - ) -> Result>, ChainStorageError> { + ) -> Result>, ChainStorageError> { let db = self.db_read_access()?; let mut result = Vec::with_capacity(hashes.len()); @@ -432,10 +426,10 @@ where B: BlockchainBackend Ok(result) } - pub fn fetch_txos_and_mined_info( + pub fn fetch_inputs_mined_info( &self, hashes: Vec, - ) -> Result>, ChainStorageError> { + ) -> Result>, ChainStorageError> { let db = self.db_read_access()?; let mut result = Vec::with_capacity(hashes.len()); @@ -459,13 +453,13 @@ where B: BlockchainBackend db.fetch_kernels_in_block(&hash) } - pub fn fetch_utxos_in_block( + pub fn fetch_outputs_in_block_with_spend_state( &self, hash: HashOutput, - spend_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError> { let db = self.db_read_access()?; - db.fetch_utxos_in_block(&hash, spend_header) + db.fetch_outputs_in_block_with_spend_state(&hash, spend_status_at_header) } pub fn fetch_outputs_in_block(&self, hash: HashOutput) -> Result, ChainStorageError> { diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index afb6883c8a..3c91cafd8e 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -80,16 +80,16 @@ use crate::{ TransactionOutputRowData, }, stats::DbTotalSizeStats, - utxo_mined_info::UtxoMinedInfo, + utxo_mined_info::OutputMinedInfo, BlockchainBackend, ChainTipData, DbBasicStats, DbSize, HorizonData, + InputMinedInfo, MmrTree, Reorg, TemplateRegistrationEntry, - TxoMinedInfo, ValidatorNodeEntry, }, consensus::{ConsensusConstants, ConsensusManager}, @@ -1457,7 +1457,7 @@ impl LMDBDatabase { &self, txn: &ConstTransaction<'_>, output_hash: &[u8], - ) -> Result, ChainStorageError> { + ) -> Result, ChainStorageError> { if let Some(key) = lmdb_get::<_, Vec>(txn, &self.txos_hash_to_index_db, output_hash)? { debug!( target: LOG_TARGET, @@ -1472,7 +1472,7 @@ impl LMDBDatabase { header_hash, mined_timestamp, .. - }) => Ok(Some(UtxoMinedInfo { + }) => Ok(Some(OutputMinedInfo { output: o, mined_height, header_hash, @@ -1495,7 +1495,7 @@ impl LMDBDatabase { &self, txn: &ConstTransaction<'_>, input_hash: &[u8], - ) -> Result, ChainStorageError> { + ) -> Result, ChainStorageError> { if let Some(key) = lmdb_get::<_, Vec>(txn, &self.deleted_txo_hash_to_header_index, input_hash)? { debug!( target: LOG_TARGET, @@ -1510,7 +1510,7 @@ impl LMDBDatabase { header_hash, mined_timestamp, .. - }) => Ok(Some(TxoMinedInfo { + }) => Ok(Some(InputMinedInfo { input: i, spent_height: height, header_hash, @@ -1828,10 +1828,10 @@ impl BlockchainBackend for LMDBDatabase { } } - fn fetch_utxos_in_block( + fn fetch_outputs_in_block_with_spend_state( &self, header_hash: &HashOutput, - spend_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError> { let txn = self.read_transaction()?; @@ -1840,7 +1840,7 @@ impl BlockchainBackend for LMDBDatabase { .into_iter() .map(|row| (row.output, false)) .collect(); - if let Some(header) = spend_header { + if let Some(header) = spend_status_at_header { let header_height = self.fetch_height_from_hash(&txn, header_hash)? .ok_or(ChainStorageError::ValueNotFound { @@ -1864,13 +1864,13 @@ impl BlockchainBackend for LMDBDatabase { Ok(utxos) } - fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { + fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { debug!(target: LOG_TARGET, "Fetch output: {}", output_hash.to_hex()); let txn = self.read_transaction()?; self.fetch_output_in_txn(&txn, output_hash.as_slice()) } - fn fetch_input(&self, input_hash: &HashOutput) -> Result, ChainStorageError> { + fn fetch_input(&self, input_hash: &HashOutput) -> Result, ChainStorageError> { debug!(target: LOG_TARGET, "Fetch input: {}", input_hash.to_hex()); let txn = self.read_transaction()?; self.fetch_input_in_txn(&txn, input_hash.as_slice()) diff --git a/base_layer/core/src/chain_storage/utxo_mined_info.rs b/base_layer/core/src/chain_storage/utxo_mined_info.rs index c8777d13d7..789a821d3a 100644 --- a/base_layer/core/src/chain_storage/utxo_mined_info.rs +++ b/base_layer/core/src/chain_storage/utxo_mined_info.rs @@ -26,7 +26,7 @@ use tari_common_types::types::BlockHash; use crate::transactions::transaction_components::{TransactionInput, TransactionOutput}; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UtxoMinedInfo { +pub struct OutputMinedInfo { pub output: TransactionOutput, pub mined_height: u64, pub header_hash: BlockHash, @@ -34,7 +34,7 @@ pub struct UtxoMinedInfo { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TxoMinedInfo { +pub struct InputMinedInfo { pub input: TransactionInput, pub spent_height: u64, pub header_hash: BlockHash, diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index c1836f3dd8..9e10947acd 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -52,12 +52,12 @@ use crate::{ DbTransaction, DbValue, HorizonData, + InputMinedInfo, LMDBDatabase, MmrTree, + OutputMinedInfo, Reorg, TemplateRegistrationEntry, - TxoMinedInfo, - UtxoMinedInfo, Validators, }, consensus::{chain_strength_comparer::ChainStrengthComparerBuilder, ConsensusConstantsBuilder, ConsensusManager}, @@ -267,22 +267,22 @@ impl BlockchainBackend for TempDatabase { self.db.as_ref().unwrap().fetch_kernel_by_excess_sig(excess_sig) } - fn fetch_utxos_in_block( + fn fetch_outputs_in_block_with_spend_state( &self, header_hash: &HashOutput, - spend_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError> { self.db .as_ref() .unwrap() - .fetch_utxos_in_block(header_hash, spend_header) + .fetch_outputs_in_block_with_spend_state(header_hash, spend_status_at_header) } - fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { + fn fetch_output(&self, output_hash: &HashOutput) -> Result, ChainStorageError> { self.db.as_ref().unwrap().fetch_output(output_hash) } - fn fetch_input(&self, input_hash: &HashOutput) -> Result, ChainStorageError> { + fn fetch_input(&self, input_hash: &HashOutput) -> Result, ChainStorageError> { self.db.as_ref().unwrap().fetch_input(input_hash) } diff --git a/base_layer/core/tests/chain_storage_tests/chain_backend.rs b/base_layer/core/tests/chain_storage_tests/chain_backend.rs index 596655f74e..abb92a8df6 100644 --- a/base_layer/core/tests/chain_storage_tests/chain_backend.rs +++ b/base_layer/core/tests/chain_storage_tests/chain_backend.rs @@ -176,7 +176,7 @@ fn test_utxo_order() { db.write(tx).unwrap(); - let read_utxos = db.fetch_utxos_in_block(&block_hash).unwrap(); + let read_utxos = db.fetch_outputs_in_block_with_spend_state(&block_hash).unwrap(); assert_eq!(utxos.len(), read_utxos.len()); for i in 0..2000 { assert_eq!(&utxos[i], read_utxos[i].as_transaction_output().unwrap()); diff --git a/base_layer/core/tests/tests/async_db.rs b/base_layer/core/tests/tests/async_db.rs index 5b95613a4e..2a263cbda9 100644 --- a/base_layer/core/tests/tests/async_db.rs +++ b/base_layer/core/tests/tests/async_db.rs @@ -26,12 +26,7 @@ use tari_common::configuration::Network; use tari_core::{ blocks::Block, chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult}, - transactions::{ - key_manager::TransactionKeyManagerInterface, - tari_amount::T, - test_helpers::{schema_to_transaction, TestKeyManager}, - transaction_components::{TransactionOutput, WalletOutput}, - }, + transactions::{tari_amount::T, test_helpers::schema_to_transaction}, txn_schema, }; use tari_test_utils::runtime::test_async; @@ -42,21 +37,6 @@ use crate::helpers::{ sample_blockchains::{create_blockchain_db_no_cut_through, create_new_blockchain}, }; -/// Finds the UTXO in a block corresponding to the wallet output. We have to search for outputs because UTXOs get -/// sorted in blocks, and so the order they were inserted in can change. -async fn find_utxo(output: &WalletOutput, block: &Block, key_manager: &TestKeyManager) -> Option { - let commitment = key_manager - .get_commitment(&output.spending_key_id, &output.value.into()) - .await - .unwrap(); - for utxo in block.body.outputs().iter() { - if commitment == utxo.commitment { - return Some(utxo.clone()); - } - } - None -} - #[test] fn fetch_async_headers() { test_async(move |rt| { @@ -92,29 +72,6 @@ fn async_rewind_to_height() { }); } -#[test] -fn fetch_async_utxo() { - test_async(move |rt| { - rt.spawn(async move { - let (adb, blocks, outputs, _, key_manager) = create_blockchain_db_no_cut_through().await; - // Retrieve a UTXO and an STXO - let utxo = find_utxo(&outputs[4][0], blocks[4].block(), &key_manager) - .await - .unwrap(); - let stxo = find_utxo(&outputs[1][0], blocks[1].block(), &key_manager) - .await - .unwrap(); - let db = AsyncBlockchainDb::new(adb.clone()); - let db2 = AsyncBlockchainDb::new(adb); - - let utxo_check = db.fetch_utxo(utxo.hash()).await.unwrap().unwrap(); - assert_eq!(utxo_check, utxo); - let stxo_check = db2.fetch_utxo(stxo.hash()).await.unwrap().unwrap(); - assert_eq!(stxo_check, stxo); - }); - }); -} - #[test] fn fetch_async_block() { test_async(move |rt| { diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs index 4c8efda1e6..0d9d231c66 100644 --- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs +++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs @@ -198,7 +198,7 @@ where // This assumes that the base node has not reorged since the last time we asked. let response = wallet_client .query_deleted(QueryDeletedRequest { - chain_must_include_header: last_mined_header_hash.map(|v| v.to_vec()), + chain_must_include_header: last_mined_header_hash.map(|v| v.to_vec()).unwrap_or_default(), hashes: batch.iter().map(|o| o.hash.to_vec()).collect(), }) .await @@ -214,7 +214,8 @@ where } for (output, data) in batch.iter().zip(response.data.iter()) { - if data.mined_height == 0 { + // when checking mined height, 0 can be valid so we need to check the hash + if data.block_mined_in.is_empty() { // base node thinks this is unmined or does not know of it. self.db .set_output_to_unmined_and_invalid(output.hash) @@ -231,14 +232,14 @@ where "Updating output comm:{}: hash {} as unspent at tip height {} (Operation ID: {})", output.commitment.to_hex(), output.hash.to_hex(), - response.height_of_longest_chain, + response.best_block_height, self.operation_id ); continue; }; if data.height_deleted_at > 0 { - let confirmed = (response.height_of_longest_chain.saturating_sub(data.height_deleted_at)) >= + let confirmed = (response.best_block_height.saturating_sub(data.height_deleted_at)) >= self.config.num_confirmations_required; let block_hash = data.block_deleted_in.clone().try_into().map_err(|_| { OutputManagerProtocolError::new( @@ -254,7 +255,7 @@ where "Updating output comm:{}: hash {} as spent at tip height {} (Operation ID: {})", output.commitment.to_hex(), output.hash.to_hex(), - response.height_of_longest_chain, + response.best_block_height, self.operation_id ); } @@ -495,7 +496,7 @@ where match returned_output.mined_in_block.clone().try_into() { Ok(block_hash) => mined.push(( output.clone(), - returned_output.mined_height, + returned_output.mined_at_height, block_hash, returned_output.mined_timestamp, )), @@ -511,7 +512,7 @@ where } } - Ok((mined, unmined, batch_response.height_of_longest_chain)) + Ok((mined, unmined, batch_response.best_block_height)) } #[allow(clippy::ptr_arg)] diff --git a/base_layer/wallet/tests/output_manager_service_tests/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs index 1a6f337bba..b8e4da968c 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -1378,14 +1378,14 @@ async fn test_txo_validation() { let responses = vec![ UtxoQueryResponse { output: Some(output1_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output1_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output2_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output2_tx_output.hash().to_vec(), mined_timestamp: 0, @@ -1393,8 +1393,8 @@ async fn test_txo_validation() { ]; let utxo_query_responses = UtxoQueryResponses { - best_block: block4_header.hash().to_vec(), - height_of_longest_chain: 4, + best_block_hash: block4_header.hash().to_vec(), + best_block_height: 4, responses, }; @@ -1403,17 +1403,17 @@ async fn test_txo_validation() { // This response sets output1 and output2 as mined, not spent let query_deleted_response = QueryDeletedResponse { - best_block: block4_header.hash().to_vec(), - height_of_longest_chain: 4, + best_block_hash: block4_header.hash().to_vec(), + best_block_height: 4, data: vec![ QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), @@ -1533,35 +1533,35 @@ async fn test_txo_validation() { let responses = vec![ UtxoQueryResponse { output: Some(output1_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output1_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output2_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output2_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output4_tx_output.clone().try_into().unwrap()), - mined_height: 5, + mined_at_height: 5, mined_in_block: block5_header.hash().to_vec(), output_hash: output4_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output5_tx_output.clone().try_into().unwrap()), - mined_height: 5, + mined_at_height: 5, mined_in_block: block5_header.hash().to_vec(), output_hash: output5_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output6_tx_output.clone().try_into().unwrap()), - mined_height: 5, + mined_at_height: 5, mined_in_block: block5_header.hash().to_vec(), output_hash: output6_tx_output.hash().to_vec(), mined_timestamp: 0, @@ -1569,8 +1569,8 @@ async fn test_txo_validation() { ]; let mut utxo_query_responses = UtxoQueryResponses { - best_block: block5_header.hash().to_vec(), - height_of_longest_chain: 5, + best_block_hash: block5_header.hash().to_vec(), + best_block_height: 5, responses, }; @@ -1579,35 +1579,35 @@ async fn test_txo_validation() { // This response sets output1 as spent in the transaction that produced output4 let mut query_deleted_response = QueryDeletedResponse { - best_block: block5_header.hash().to_vec(), - height_of_longest_chain: 5, + best_block_hash: block5_header.hash().to_vec(), + best_block_height: 5, data: vec![ QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 5, block_deleted_in: block5_header.hash().to_vec(), }, QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, QueryDeletedData { - mined_height: 5, + mined_at_height: 5, block_mined_in: block5_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, QueryDeletedData { - mined_height: 5, + mined_at_height: 5, block_mined_in: block5_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, QueryDeletedData { - mined_height: 5, + mined_at_height: 5, block_mined_in: block5_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), @@ -1654,13 +1654,13 @@ async fn test_txo_validation() { // Output 5: Received in Block 5 - Confirmed // Output 6: Coinbase from Block 5 - Confirmed - utxo_query_responses.height_of_longest_chain = 8; - utxo_query_responses.best_block = [8u8; 16].to_vec(); + utxo_query_responses.best_block_height = 8; + utxo_query_responses.best_block_hash = [8u8; 16].to_vec(); oms.base_node_wallet_rpc_mock_state .set_utxo_query_response(utxo_query_responses); - query_deleted_response.height_of_longest_chain = 8; - query_deleted_response.best_block = [8u8; 16].to_vec(); + query_deleted_response.best_block_height = 8; + query_deleted_response.best_block_hash = [8u8; 16].to_vec(); oms.base_node_wallet_rpc_mock_state .set_query_deleted_response(query_deleted_response); @@ -1735,21 +1735,21 @@ async fn test_txo_validation() { let responses = vec![ UtxoQueryResponse { output: Some(output1_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output1_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output2_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output2_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output4_tx_output.clone().try_into().unwrap()), - mined_height: 5, + mined_at_height: 5, mined_in_block: block5_header_reorg.hash().to_vec(), output_hash: output4_tx_output.hash().to_vec(), mined_timestamp: 0, @@ -1757,8 +1757,8 @@ async fn test_txo_validation() { ]; let mut utxo_query_responses = UtxoQueryResponses { - best_block: block5_header_reorg.hash().to_vec(), - height_of_longest_chain: 5, + best_block_hash: block5_header_reorg.hash().to_vec(), + best_block_height: 5, responses, }; @@ -1767,23 +1767,23 @@ async fn test_txo_validation() { // This response sets output1 as spent in the transaction that produced output4 let mut query_deleted_response = QueryDeletedResponse { - best_block: block5_header_reorg.hash().to_vec(), - height_of_longest_chain: 5, + best_block_hash: block5_header_reorg.hash().to_vec(), + best_block_height: 5, data: vec![ QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 5, block_deleted_in: block5_header_reorg.hash().to_vec(), }, QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, QueryDeletedData { - mined_height: 5, + mined_at_height: 5, block_mined_in: block5_header_reorg.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), @@ -1858,13 +1858,13 @@ async fn test_txo_validation() { // Output 5: Reorged out // Output 6: Reorged out - utxo_query_responses.height_of_longest_chain = 8; - utxo_query_responses.best_block = [8u8; 16].to_vec(); + utxo_query_responses.best_block_height = 8; + utxo_query_responses.best_block_hash = [8u8; 16].to_vec(); oms.base_node_wallet_rpc_mock_state .set_utxo_query_response(utxo_query_responses); - query_deleted_response.height_of_longest_chain = 8; - query_deleted_response.best_block = [8u8; 16].to_vec(); + query_deleted_response.best_block_height = 8; + query_deleted_response.best_block_hash = [8u8; 16].to_vec(); oms.base_node_wallet_rpc_mock_state .set_query_deleted_response(query_deleted_response); @@ -1979,14 +1979,14 @@ async fn test_txo_revalidation() { let responses = vec![ UtxoQueryResponse { output: Some(output1_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output1_tx_output.hash().to_vec(), mined_timestamp: 0, }, UtxoQueryResponse { output: Some(output2_tx_output.clone().try_into().unwrap()), - mined_height: 1, + mined_at_height: 1, mined_in_block: block1_header.hash().to_vec(), output_hash: output2_tx_output.hash().to_vec(), mined_timestamp: 0, @@ -1994,8 +1994,8 @@ async fn test_txo_revalidation() { ]; let utxo_query_responses = UtxoQueryResponses { - best_block: block4_header.hash().to_vec(), - height_of_longest_chain: 4, + best_block_hash: block4_header.hash().to_vec(), + best_block_height: 4, responses, }; @@ -2004,26 +2004,22 @@ async fn test_txo_revalidation() { // This response sets output1 as spent let query_deleted_response = QueryDeletedResponse { - best_block: block4_header.hash().to_vec(), - height_of_longest_chain: 4, + best_block_hash: block4_header.hash().to_vec(), + best_block_height: 4, data: vec![ QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, ], - // deleted_positions: vec![], - // not_deleted_positions: vec![1, 2], - // heights_deleted_at: vec![], - // blocks_deleted_in: vec![], }; oms.base_node_wallet_rpc_mock_state @@ -2045,26 +2041,22 @@ async fn test_txo_revalidation() { // This response sets output1 as spent let query_deleted_response = QueryDeletedResponse { - best_block: block4_header.hash().to_vec(), - height_of_longest_chain: 4, + best_block_hash: block4_header.hash().to_vec(), + best_block_height: 4, data: vec![ QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 4, block_deleted_in: block4_header.hash().to_vec(), }, QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 0, block_deleted_in: Vec::new(), }, ], - // deleted_positions: vec![1], - // not_deleted_positions: vec![2], - // heights_deleted_at: vec![4], - // blocks_deleted_in: vec![block4_header.hash().to_vec()], }; oms.base_node_wallet_rpc_mock_state @@ -2086,26 +2078,22 @@ async fn test_txo_revalidation() { // This response sets output1 and 2 as spent let query_deleted_response = QueryDeletedResponse { - best_block: block4_header.hash().to_vec(), - height_of_longest_chain: 4, + best_block_hash: block4_header.hash().to_vec(), + best_block_height: 4, data: vec![ QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 4, block_deleted_in: block4_header.hash().to_vec(), }, QueryDeletedData { - mined_height: 1, + mined_at_height: 1, block_mined_in: block1_header.hash().to_vec(), height_deleted_at: 4, block_deleted_in: block4_header.hash().to_vec(), }, ], - // deleted_positions: vec![1, 2], - // not_deleted_positions: vec![], - // heights_deleted_at: vec![4, 4], - // blocks_deleted_in: vec![block4_header.hash().to_vec(), block4_header.hash().to_vec()], }; oms.base_node_wallet_rpc_mock_state diff --git a/base_layer/wallet/tests/support/comms_rpc.rs b/base_layer/wallet/tests/support/comms_rpc.rs index 765388bb82..6d4a41e4b6 100644 --- a/base_layer/wallet/tests/support/comms_rpc.rs +++ b/base_layer/wallet/tests/support/comms_rpc.rs @@ -159,12 +159,12 @@ impl BaseNodeWalletRpcMockState { })), utxo_query_response: Arc::new(Mutex::new(UtxoQueryResponses { responses: vec![], - best_block: vec![], - height_of_longest_chain: 1, + best_block_hash: vec![], + best_block_height: 1, })), query_deleted_response: Arc::new(Mutex::new(QueryDeletedResponse { - best_block: vec![], - height_of_longest_chain: 1, + best_block_hash: vec![], + best_block_height: 1, data: Vec::new(), })), fetch_utxos_calls: Arc::new(Mutex::new(Vec::new())),