diff --git a/base_layer/wallet/src/output_manager_service/storage/database/backend.rs b/base_layer/wallet/src/output_manager_service/storage/database/backend.rs index 03eb5c434b..d08e86df62 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database/backend.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database/backend.rs @@ -14,6 +14,7 @@ use crate::output_manager_service::{ storage::{ database::{DbKey, DbValue, OutputBackendQuery, WriteOperation}, models::DbWalletOutput, + sqlite_db::{ReceivedOutputInfoForBatch, SpentOutputInfoForBatch}, }, }; @@ -37,29 +38,32 @@ pub trait OutputManagerBackend: Send + Sync + Clone { /// Modify the state the of the backend with a write operation fn write(&self, op: WriteOperation) -> Result, OutputManagerStorageError>; fn fetch_pending_incoming_outputs(&self) -> Result, OutputManagerStorageError>; - - fn set_received_output_mined_height_and_status( + /// Perform a batch update of the received outputs' mined height and status + fn set_received_outputs_mined_height_and_status_batch_mode( &self, - hash: FixedHash, - mined_height: u64, - mined_in_block: FixedHash, - confirmed: bool, - mined_timestamp: u64, + updates: Vec, + ) -> Result<(), OutputManagerStorageError>; + /// Perform a batch update of the outputs' unmined and invalid state + fn set_output_to_unmined_and_invalid_batch_mode( + &self, + hashes: Vec, + ) -> Result<(), OutputManagerStorageError>; + /// Perform a batch update of the outputs' last validation timestamp + fn update_last_validation_timestamp_batch_mode( + &self, + hashes: Vec, ) -> Result<(), OutputManagerStorageError>; - - fn set_output_to_unmined_and_invalid(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError>; - fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError>; fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError>; - - fn mark_output_as_spent( + /// Perform a batch update of the outputs' spent status + fn mark_output_as_spent_batch_mode( &self, - hash: FixedHash, - mark_deleted_at_height: u64, - mark_deleted_in_block: FixedHash, - confirmed: bool, + updates: Vec, + ) -> Result<(), OutputManagerStorageError>; + /// Perform a batch update of the outputs' unspent status + fn mark_output_as_unspent_batch_mode( + &self, + hashes: Vec<(FixedHash, bool)>, ) -> Result<(), OutputManagerStorageError>; - - fn mark_output_as_unspent(&self, hash: FixedHash, confirmed: bool) -> Result<(), OutputManagerStorageError>; /// This method encumbers the specified outputs into a `PendingTransactionOutputs` record. This is a short term /// encumberance in case the app is closed or crashes before transaction neogtiation is complete. These will be /// cleared on startup of the service. diff --git a/base_layer/wallet/src/output_manager_service/storage/database/mod.rs b/base_layer/wallet/src/output_manager_service/storage/database/mod.rs index 6029caa2c4..63d0bad21e 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database/mod.rs @@ -30,7 +30,7 @@ pub use backend::OutputManagerBackend; use log::*; use tari_common_types::{ transaction::TxId, - types::{Commitment, HashOutput}, + types::{Commitment, FixedHash, HashOutput}, }; use tari_core::transactions::{ tari_amount::MicroMinotari, @@ -44,6 +44,7 @@ use crate::output_manager_service::{ service::Balance, storage::{ models::{DbWalletOutput, KnownOneSidedPaymentScript}, + sqlite_db::{ReceivedOutputInfoForBatch, SpentOutputInfoForBatch}, OutputStatus, }, }; @@ -387,15 +388,35 @@ where T: OutputManagerBackend + 'static mined_in_block: HashOutput, confirmed: bool, mined_timestamp: u64, + ) -> Result<(), OutputManagerStorageError> { + self.set_received_outputs_mined_height_and_status_batch_mode(vec![ReceivedOutputInfoForBatch { + hash, + mined_height, + mined_in_block, + confirmed, + mined_timestamp, + }]) + } + + pub fn set_received_outputs_mined_height_and_status_batch_mode( + &self, + updates: Vec, ) -> Result<(), OutputManagerStorageError> { let db = self.db.clone(); - db.set_received_output_mined_height_and_status(hash, mined_height, mined_in_block, confirmed, mined_timestamp)?; + db.set_received_outputs_mined_height_and_status_batch_mode(updates)?; Ok(()) } pub fn set_output_to_unmined_and_invalid(&self, hash: HashOutput) -> Result<(), OutputManagerStorageError> { + self.set_output_to_unmined_and_invalid_batch_mode(vec![hash]) + } + + pub fn set_output_to_unmined_and_invalid_batch_mode( + &self, + hashes: Vec, + ) -> Result<(), OutputManagerStorageError> { let db = self.db.clone(); - db.set_output_to_unmined_and_invalid(hash)?; + db.set_output_to_unmined_and_invalid_batch_mode(hashes)?; Ok(()) } @@ -406,8 +427,15 @@ where T: OutputManagerBackend + 'static } pub fn update_last_validation_timestamp(&self, hash: HashOutput) -> Result<(), OutputManagerStorageError> { + self.update_last_validation_timestamp_batch_mode(vec![hash]) + } + + pub fn update_last_validation_timestamp_batch_mode( + &self, + hashes: Vec, + ) -> Result<(), OutputManagerStorageError> { let db = self.db.clone(); - db.update_last_validation_timestamp(hash)?; + db.update_last_validation_timestamp_batch_mode(hashes)?; Ok(()) } @@ -417,15 +445,34 @@ where T: OutputManagerBackend + 'static deleted_height: u64, deleted_in_block: HashOutput, confirmed: bool, + ) -> Result<(), OutputManagerStorageError> { + self.mark_output_as_spent_batch_mode(vec![SpentOutputInfoForBatch { + hash, + confirmed, + mark_deleted_at_height: deleted_height, + mark_deleted_in_block: deleted_in_block, + }]) + } + + pub fn mark_output_as_spent_batch_mode( + &self, + updates: Vec, ) -> Result<(), OutputManagerStorageError> { let db = self.db.clone(); - db.mark_output_as_spent(hash, deleted_height, deleted_in_block, confirmed)?; + db.mark_output_as_spent_batch_mode(updates)?; Ok(()) } pub fn mark_output_as_unspent(&self, hash: HashOutput, confirmed: bool) -> Result<(), OutputManagerStorageError> { + self.mark_output_as_unspent_batch_mode(vec![(hash, confirmed)]) + } + + pub fn mark_output_as_unspent_batch_mode( + &self, + hashes: Vec<(FixedHash, bool)>, + ) -> Result<(), OutputManagerStorageError> { let db = self.db.clone(); - db.mark_output_as_unspent(hash, confirmed)?; + db.mark_output_as_unspent_batch_mode(hashes)?; Ok(()) } diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index 963c046408..b3d1b5aa35 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -25,6 +25,7 @@ use std::{convert::TryFrom, str::FromStr}; use chrono::{NaiveDateTime, Utc}; use derivative::Derivative; use diesel::{ + connection::SimpleConnection, prelude::*, r2d2::{ConnectionManager, PooledConnection}, result::Error as DieselError, @@ -415,64 +416,77 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { .collect::, _>>() } - fn set_received_output_mined_height_and_status( + // Perform a batch update of the received outputs. This is more efficient than updating each output individually. + // Note: + // `diesel` does not support batch updates, so we have to do it manually. For example, this + // `diesel::insert_into(...).values(&...).on_conflict(outputs::hash).do_update().set((...)).execute(&mut conn)?;` + // errors with + // `the trait bound `BatchInsert` is not satisfied` + fn set_received_outputs_mined_height_and_status_batch_mode( &self, - hash: FixedHash, - mined_height: u64, - mined_in_block: FixedHash, - confirmed: bool, - mined_timestamp: u64, + updates: Vec, ) -> Result<(), OutputManagerStorageError> { let start = Instant::now(); let mut conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let status = if confirmed { - OutputStatus::Unspent as i32 - } else { - OutputStatus::UnspentMinedUnconfirmed as i32 - }; + debug!( target: LOG_TARGET, - "`set_received_output_mined_height` status: {}", status + "`set_received_outputs_mined_height_and_status_batch_mode` for {} outputs", + updates.len() ); - let hash = hash.to_vec(); - let mined_in_block = mined_in_block.to_vec(); - let timestamp = NaiveDateTime::from_timestamp_opt(mined_timestamp as i64, 0).ok_or( - OutputManagerStorageError::ConversionError { - reason: format!("Could not create timestamp mined_timestamp: {}", mined_timestamp), - }, - )?; - diesel::update(outputs::table.filter(outputs::hash.eq(hash))) - .set(( - outputs::mined_height.eq(mined_height as i64), - outputs::mined_in_block.eq(mined_in_block), - outputs::status.eq(status), - outputs::mined_timestamp.eq(timestamp), - outputs::marked_deleted_at_height.eq::>(None), - outputs::marked_deleted_in_block.eq::>>(None), - outputs::last_validation_timestamp.eq::>(None), - )) - .execute(&mut conn) - .num_rows_affected_or_not_found(1)?; + let query = updates + .iter() + .map(|update| { + format!( + "UPDATE outputs SET mined_height = {}, mined_in_block = x'{}', status = {}, mined_timestamp = \ + '{}', marked_deleted_at_height = NULL, marked_deleted_in_block = NULL, last_validation_timestamp \ + = NULL WHERE hash = x'{}'; ", + update.mined_height as i64, + update.mined_in_block.to_hex(), + if update.confirmed { + OutputStatus::Unspent as i32 + } else { + OutputStatus::UnspentMinedUnconfirmed as i32 + }, + if let Some(val) = NaiveDateTime::from_timestamp_opt(update.mined_timestamp as i64, 0) { + val.to_string() + } else { + "NULL".to_string() + }, + update.hash.to_hex() + ) + }) + .collect::>() + .join(""); + let query = query.trim(); + + conn.batch_execute(query)?; + if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, - "sqlite profile - set_received_output_mined_height: lock {} + db_op {} = {} ms", + "sqlite profile - set_received_outputs_mined_height_and_status_batch_mode: lock {} + db_op {} = {} ms \ + ({} outputs)", acquire_lock.as_millis(), (start.elapsed() - acquire_lock).as_millis(), - start.elapsed().as_millis() + start.elapsed().as_millis(), + updates.len() ); } Ok(()) } - fn set_output_to_unmined_and_invalid(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError> { + fn set_output_to_unmined_and_invalid_batch_mode( + &self, + hashes: Vec, + ) -> Result<(), OutputManagerStorageError> { let start = Instant::now(); let mut conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let hash = hash.to_vec(); - diesel::update(outputs::table.filter(outputs::hash.eq(hash))) + + diesel::update(outputs::table.filter(outputs::hash.eq_any(hashes.iter().map(|hash| hash.to_vec())))) .set(( outputs::mined_height.eq::>(None), outputs::mined_in_block.eq::>>(None), @@ -482,14 +496,16 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { outputs::marked_deleted_in_block.eq::>>(None), )) .execute(&mut conn) - .num_rows_affected_or_not_found(1)?; + .num_rows_affected_or_not_found(hashes.len())?; + if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, - "sqlite profile - set_output_to_unmined: lock {} + db_op {} = {} ms", + "sqlite profile - set_output_to_unmined_and_invalid_batch_mode: lock {} + db_op {} = {} ms ({} outputs)", acquire_lock.as_millis(), (start.elapsed() - acquire_lock).as_millis(), - start.elapsed().as_millis() + start.elapsed().as_millis(), + hashes.len() ); } @@ -525,93 +541,143 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { Ok(()) } - fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError> { + fn update_last_validation_timestamp_batch_mode( + &self, + hashes: Vec, + ) -> Result<(), OutputManagerStorageError> { let start = Instant::now(); let mut conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let hash = hash.to_vec(); - diesel::update(outputs::table.filter(outputs::hash.eq(hash))) - .set((outputs::last_validation_timestamp - .eq::>(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0)),)) + + diesel::update(outputs::table.filter(outputs::hash.eq_any(hashes.iter().map(|hash| hash.to_vec())))) + .set(outputs::last_validation_timestamp.eq(Some(Utc::now().naive_utc()))) .execute(&mut conn) - .num_rows_affected_or_not_found(1)?; + .num_rows_affected_or_not_found(hashes.len())?; + if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, - "sqlite profile - set_output_to_be_revalidated_in_the_future: lock {} + db_op {} = {} ms", + "sqlite profile - update_last_validation_timestamp_batch_mode: lock {} + db_op {} = {} ms ({} outputs)", acquire_lock.as_millis(), (start.elapsed() - acquire_lock).as_millis(), - start.elapsed().as_millis() + start.elapsed().as_millis(), + hashes.len() ); } Ok(()) } - fn mark_output_as_spent( + // Perform a batch update of the spent outputs. This is more efficient than updating each output individually. + // Note: + // `diesel` does not support batch updates, so we have to do it manually. For example, this + // `diesel::insert_into(...).values(&...).on_conflict(outputs::hash).do_update().set((...)).execute(&mut conn)?;` + // errors with + // `the trait bound `BatchInsert` is not satisfied` + fn mark_output_as_spent_batch_mode( &self, - hash: FixedHash, - mark_deleted_at_height: u64, - mark_deleted_in_block: FixedHash, - confirmed: bool, + updates: Vec, ) -> Result<(), OutputManagerStorageError> { let start = Instant::now(); let mut conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let hash = hash.to_vec(); - let mark_deleted_in_block = mark_deleted_in_block.to_vec(); - let status = if confirmed { - OutputStatus::Spent as i32 - } else { - OutputStatus::SpentMinedUnconfirmed as i32 - }; - diesel::update(outputs::table.filter(outputs::hash.eq(hash))) - .set(( - outputs::marked_deleted_at_height.eq(mark_deleted_at_height as i64), - outputs::marked_deleted_in_block.eq(mark_deleted_in_block), - outputs::status.eq(status), - )) - .execute(&mut conn) - .num_rows_affected_or_not_found(1)?; + + debug!( + target: LOG_TARGET, + "`mark_output_as_spent_batch_mode` for {} outputs", + updates.len() + ); + let query = updates + .iter() + .map(|update| { + format!( + "UPDATE outputs SET marked_deleted_at_height = {}, marked_deleted_in_block = x'{}', status = {} \ + WHERE hash = x'{}'; ", + update.mark_deleted_at_height as i64, + update.mark_deleted_in_block.to_hex(), + if update.confirmed { + OutputStatus::Spent as i32 + } else { + OutputStatus::SpentMinedUnconfirmed as i32 + }, + update.hash.to_hex() + ) + }) + .collect::>() + .join(""); + let query = query.trim(); + trace!(target: LOG_TARGET, "mark_output_as_spent_batch_mode: `{}`", query); + + conn.batch_execute(query)?; + if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, - "sqlite profile - mark_output_as_spent: lock {} + db_op {} = {} ms", + "sqlite profile - mark_output_as_spent_batch_mode: lock {} + db_op {} = {} ms ({} outputs)", acquire_lock.as_millis(), (start.elapsed() - acquire_lock).as_millis(), - start.elapsed().as_millis() + start.elapsed().as_millis(), + updates.len() ); } Ok(()) } - fn mark_output_as_unspent(&self, hash: FixedHash, confirmed: bool) -> Result<(), OutputManagerStorageError> { + fn mark_output_as_unspent_batch_mode( + &self, + hashes: Vec<(FixedHash, bool)>, + ) -> Result<(), OutputManagerStorageError> { let start = Instant::now(); let mut conn = self.database_connection.get_pooled_connection()?; let acquire_lock = start.elapsed(); - let hash = hash.to_vec(); - let status = if confirmed { - OutputStatus::Unspent - } else { - OutputStatus::UnspentMinedUnconfirmed - }; - debug!(target: LOG_TARGET, "mark_output_as_unspent({})", hash.to_hex()); - diesel::update(outputs::table.filter(outputs::hash.eq(hash))) + // Split out the confirmed and unconfirmed outputs + let confirmed_hashes = hashes + .iter() + .filter(|(_hash, confirmed)| *confirmed) + .map(|(hash, _confirmed)| hash) + .collect::>(); + let unconfirmed_hashes = hashes + .iter() + .filter(|(_hash, confirmed)| !*confirmed) + .map(|(hash, _confirmed)| hash) + .collect::>(); + + if !confirmed_hashes.is_empty() { + diesel::update( + outputs::table.filter(outputs::hash.eq_any(confirmed_hashes.iter().map(|hash| hash.to_vec()))), + ) .set(( outputs::marked_deleted_at_height.eq::>(None), outputs::marked_deleted_in_block.eq::>>(None), - outputs::status.eq(status as i32), + outputs::status.eq(OutputStatus::Unspent as i32), )) .execute(&mut conn) - .num_rows_affected_or_not_found(1)?; + .num_rows_affected_or_not_found(confirmed_hashes.len())?; + } + + if !unconfirmed_hashes.is_empty() { + diesel::update( + outputs::table.filter(outputs::hash.eq_any(unconfirmed_hashes.iter().map(|hash| hash.to_vec()))), + ) + .set(( + outputs::marked_deleted_at_height.eq::>(None), + outputs::marked_deleted_in_block.eq::>>(None), + outputs::status.eq(OutputStatus::UnspentMinedUnconfirmed as i32), + )) + .execute(&mut conn) + .num_rows_affected_or_not_found(unconfirmed_hashes.len())?; + } + + debug!(target: LOG_TARGET, "mark_output_as_unspent_batch_mode: Unspent {}, UnspentMinedUnconfirmed {}", confirmed_hashes.len(), unconfirmed_hashes.len()); if start.elapsed().as_millis() > 0 { trace!( target: LOG_TARGET, - "sqlite profile - mark_output_as_unspent: lock {} + db_op {} = {} ms", + "sqlite profile - mark_output_as_unspent_batch_mode: lock {} + db_op {} = {} ms (Unspent {}, UnspentMinedUnconfirmed {})", acquire_lock.as_millis(), (start.elapsed() - acquire_lock).as_millis(), - start.elapsed().as_millis() + start.elapsed().as_millis(), + confirmed_hashes.len(), unconfirmed_hashes.len() ); } @@ -1056,6 +1122,34 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { } } +/// These are the fields to be set for the received outputs batch mode update +#[derive(Clone, Debug, Default)] +pub struct ReceivedOutputInfoForBatch { + /// The hash of the output + pub hash: FixedHash, + /// The height at which the output was mined + pub mined_height: u64, + /// The block hash in which the output was mined + pub mined_in_block: FixedHash, + /// Whether the output is confirmed + pub confirmed: bool, + /// The timestamp at which the output was mined + pub mined_timestamp: u64, +} + +/// These are the fields to be set for the spent outputs batch mode update +#[derive(Clone, Debug, Default)] +pub struct SpentOutputInfoForBatch { + /// The hash of the output + pub hash: FixedHash, + /// Whether the output is confirmed + pub confirmed: bool, + /// The height at which the output was marked as deleted + pub mark_deleted_at_height: u64, + /// The block hash in which the output was marked as deleted + pub mark_deleted_in_block: FixedHash, +} + fn update_outputs_with_tx_id_and_status_to_new_status( conn: &mut PooledConnection>, tx_id: TxId, diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs index 0a2e0b4d36..7be18663ab 100644 --- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs +++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs @@ -46,6 +46,7 @@ use crate::{ storage::{ database::{OutputManagerBackend, OutputManagerDatabase}, models::DbWalletOutput, + sqlite_db::{ReceivedOutputInfoForBatch, SpentOutputInfoForBatch}, OutputStatus, }, }, @@ -112,12 +113,13 @@ where let last_mined_header = self.check_for_reorgs(&mut base_node_client).await?; - self.update_unconfirmed_outputs(&mut base_node_client).await?; + self.update_unconfirmed_outputs_batch_mode(&mut base_node_client) + .await?; - self.update_spent_outputs(&mut base_node_client, last_mined_header) + self.update_spent_outputs_batch_mode(&mut base_node_client, last_mined_header) .await?; - self.update_invalid_outputs(&mut base_node_client).await?; + self.update_invalid_outputs_batch_mode(&mut base_node_client).await?; self.publish_event(OutputManagerEvent::TxoValidationSuccess(self.operation_id)); debug!( @@ -127,7 +129,7 @@ where Ok(self.operation_id) } - async fn update_invalid_outputs( + async fn update_invalid_outputs_batch_mode( &self, wallet_client: &mut BaseNodeWalletRpcClient, ) -> Result<(), OutputManagerProtocolError> { @@ -159,29 +161,36 @@ where unmined.len(), self.operation_id ); + + let mut mined_updates = Vec::with_capacity(mined.len()); for mined_info in &mined { info!( target: LOG_TARGET, - "Updating output comm:{}: hash {} as mined at height {} with current tip at {} (Operation ID: - {})", + "Updating output comm:{}: hash {} as mined at height {} with current tip at {} (Operation ID: {})", mined_info.output.commitment.to_hex(), mined_info.output.hash.to_hex(), mined_info.mined_at_height, tip_height, self.operation_id ); - self.update_output_as_mined( - &mined_info.output, - &mined_info.mined_block_hash, - mined_info.mined_at_height, - tip_height, - mined_info.mined_timestamp, - ) - .await?; + mined_updates.push(ReceivedOutputInfoForBatch { + hash: mined_info.output.hash, + mined_height: mined_info.mined_at_height, + mined_in_block: mined_info.mined_block_hash, + confirmed: (tip_height - mined_info.mined_at_height) >= self.config.num_confirmations_required, + mined_timestamp: mined_info.mined_timestamp, + }); } - for output in unmined { + if !mined_updates.is_empty() { self.db - .update_last_validation_timestamp(output.hash) + .set_received_outputs_mined_height_and_status_batch_mode(mined_updates) + .for_protocol(self.operation_id)?; + } + + let unmined_hashes: Vec<_> = unmined.iter().map(|o| o.hash).collect(); + if !unmined_hashes.is_empty() { + self.db + .update_last_validation_timestamp_batch_mode(unmined_hashes) .for_protocol(self.operation_id)?; } } @@ -189,7 +198,7 @@ where } #[allow(clippy::too_many_lines)] - async fn update_spent_outputs( + async fn update_spent_outputs_batch_mode( &self, wallet_client: &mut BaseNodeWalletRpcClient, last_mined_header_hash: Option, @@ -224,20 +233,19 @@ where )); } + let mut unmined_and_invalid = Vec::with_capacity(batch.len()); + let mut unspent = Vec::with_capacity(batch.len()); + let mut spent = Vec::with_capacity(batch.len()); for (output, data) in batch.iter().zip(response.data.iter()) { // when checking mined height, 0 can be valid so we need to check the hash if data.block_mined_in.is_empty() { // base node thinks this is unmined or does not know of it. - self.db - .set_output_to_unmined_and_invalid(output.hash) - .for_protocol(self.operation_id)?; + unmined_and_invalid.push(output.hash); continue; }; if data.height_deleted_at == 0 && output.marked_deleted_at_height.is_some() { // this is mined but not yet spent - self.db - .mark_output_as_unspent(output.hash, true) - .for_protocol(self.operation_id)?; + unspent.push((output.hash, true)); info!( target: LOG_TARGET, "Updating output comm:{}: hash {} as unspent at tip height {} (Operation ID: {})", @@ -258,9 +266,12 @@ where OutputManagerError::InconsistentBaseNodeDataError("Base node sent malformed hash"), ) })?; - self.db - .mark_output_as_spent(output.hash, data.height_deleted_at, block_hash, confirmed) - .for_protocol(self.operation_id)?; + spent.push(SpentOutputInfoForBatch { + hash: output.hash, + confirmed, + mark_deleted_at_height: data.height_deleted_at, + mark_deleted_in_block: block_hash, + }); info!( target: LOG_TARGET, "Updating output comm:{}: hash {} as spent at tip height {} (Operation ID: {})", @@ -271,11 +282,26 @@ where ); } } + if !unmined_and_invalid.is_empty() { + self.db + .set_output_to_unmined_and_invalid_batch_mode(unmined_and_invalid) + .for_protocol(self.operation_id)?; + } + if !unspent.is_empty() { + self.db + .mark_output_as_unspent_batch_mode(unspent) + .for_protocol(self.operation_id)?; + } + if !spent.is_empty() { + self.db + .mark_output_as_spent_batch_mode(spent) + .for_protocol(self.operation_id)?; + } } Ok(()) } - async fn update_unconfirmed_outputs( + async fn update_unconfirmed_outputs_batch_mode( &self, wallet_client: &mut BaseNodeWalletRpcClient, ) -> Result<(), OutputManagerProtocolError> { @@ -300,6 +326,8 @@ where unmined.len(), self.operation_id ); + + let mut mined_updates = Vec::with_capacity(mined.len()); for mined_info in &mined { info!( target: LOG_TARGET, @@ -310,28 +338,38 @@ where tip_height, self.operation_id ); - self.update_output_as_mined( - &mined_info.output, - &mined_info.mined_block_hash, - mined_info.mined_at_height, - tip_height, - mined_info.mined_timestamp, - ) - .await?; + mined_updates.push(ReceivedOutputInfoForBatch { + hash: mined_info.output.hash, + mined_height: mined_info.mined_at_height, + mined_in_block: mined_info.mined_block_hash, + confirmed: (tip_height - mined_info.mined_at_height) >= self.config.num_confirmations_required, + mined_timestamp: mined_info.mined_timestamp, + }); + } + if !mined_updates.is_empty() { + self.db + .set_received_outputs_mined_height_and_status_batch_mode(mined_updates) + .for_protocol(self.operation_id)?; } - for unmined_output in unmined { - if unmined_output.status == OutputStatus::UnspentMinedUnconfirmed { + + let unmined_and_invalid: Vec<_> = unmined + .iter() + .filter(|uo| uo.status == OutputStatus::UnspentMinedUnconfirmed) + .map(|uo| { info!( target: LOG_TARGET, - "Updating output comm:{}: hash {} as unmined(Operation ID: {})", - unmined_output.commitment.to_hex(), - unmined_output.hash.to_hex(), + "Updating output comm:{}: hash {} as unmined(Operation ID: {})", + uo.commitment.to_hex(), + uo.hash.to_hex(), self.operation_id ); - self.db - .set_output_to_unmined_and_invalid(unmined_output.hash) - .for_protocol(self.operation_id)?; - } + uo.hash + }) + .collect(); + if !unmined_and_invalid.is_empty() { + self.db + .set_output_to_unmined_and_invalid_batch_mode(unmined_and_invalid) + .for_protocol(self.operation_id)?; } } @@ -548,30 +586,6 @@ where Ok((mined, unmined, batch_response.best_block_height)) } - #[allow(clippy::ptr_arg)] - async fn update_output_as_mined( - &self, - tx: &DbWalletOutput, - mined_in_block: &BlockHash, - mined_height: u64, - tip_height: u64, - mined_timestamp: u64, - ) -> Result<(), OutputManagerProtocolError> { - let confirmed = (tip_height - mined_height) >= self.config.num_confirmations_required; - - self.db - .set_received_output_mined_height_and_status( - tx.hash, - mined_height, - *mined_in_block, - confirmed, - mined_timestamp, - ) - .for_protocol(self.operation_id)?; - - Ok(()) - } - fn publish_event(&self, event: OutputManagerEvent) { if let Err(e) = self.event_publisher.send(Arc::new(event)) { debug!( diff --git a/base_layer/wallet/tests/output_manager_service_tests/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs index c3107ced80..719fe06e6f 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -46,7 +46,7 @@ use rand::{rngs::OsRng, RngCore}; use tari_common::configuration::Network; use tari_common_types::{ transaction::TxId, - types::{ComAndPubSignature, PublicKey}, + types::{ComAndPubSignature, FixedHash, PublicKey}, }; use tari_comms::{ peer_manager::{NodeIdentity, PeerFeatures}, @@ -311,7 +311,7 @@ async fn fee_estimate() { .await; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let fee_calc = Fee::new(*create_consensus_constants(0).transaction_weight_params()); @@ -417,6 +417,7 @@ async fn test_utxo_selection_no_chain_metadata() { assert!(matches!(err, OutputManagerError::NotEnoughFunds)); // create 10 utxos with maturity at heights from 1 to 10 + let mut unspent = Vec::with_capacity(10); for i in 1..=10 { let uo = make_input_with_features( &mut OsRng.clone(), @@ -429,10 +430,9 @@ async fn test_utxo_selection_no_chain_metadata() { ) .await; oms.add_output(uo.clone(), None).await.unwrap(); - backend - .mark_output_as_unspent(uo.hash(&key_manager).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&key_manager).await.unwrap(), true)); } + backend.mark_output_as_unspent_batch_mode(unspent).unwrap(); // but we have no chain state so the lowest maturity should be used let stp = oms @@ -549,6 +549,7 @@ async fn test_utxo_selection_with_chain_metadata() { assert!(matches!(err, OutputManagerError::NotEnoughFunds)); // create 10 utxos with maturity at heights from 1 to 10 + let mut unspent = Vec::with_capacity(10); for i in 1..=10 { let uo = make_input_with_features( &mut OsRng.clone(), @@ -561,10 +562,9 @@ async fn test_utxo_selection_with_chain_metadata() { ) .await; oms.add_output(uo.clone(), None).await.unwrap(); - backend - .mark_output_as_unspent(uo.hash(&key_manager).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&key_manager).await.unwrap(), true)); } + backend.mark_output_as_unspent_batch_mode(unspent).unwrap(); let utxos = oms.get_unspent_outputs().await.unwrap(); assert_eq!(utxos.len(), 10); @@ -710,7 +710,7 @@ async fn test_utxo_selection_with_tx_priority() { .await .unwrap(); backend - .mark_output_as_unspent(uo_high.hash(&key_manager).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo_high.hash(&key_manager).await.unwrap(), true)]) .unwrap(); // Low priority let uo_low_2 = make_input_with_features( @@ -725,7 +725,7 @@ async fn test_utxo_selection_with_tx_priority() { .await; oms.add_output(uo_low_2.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo_low_2.hash(&key_manager).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo_low_2.hash(&key_manager).await.unwrap(), true)]) .unwrap(); let utxos = oms.get_unspent_outputs().await.unwrap(); @@ -769,7 +769,8 @@ async fn send_not_enough_funds() { let backend = OutputManagerSqliteDatabase::new(connection.clone()); let mut oms = setup_output_manager_service(backend.clone(), true).await; - let num_outputs = 20; + let num_outputs = 20usize; + let mut unspent: Vec<(FixedHash, bool)> = Vec::with_capacity(num_outputs); for _i in 0..num_outputs { let uo = make_input( &mut OsRng.clone(), @@ -779,16 +780,15 @@ async fn send_not_enough_funds() { ) .await; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); - backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&oms.key_manager_handle).await.unwrap(), true)); } + backend.mark_output_as_unspent_batch_mode(unspent).unwrap(); match oms .output_manager_handle .prepare_transaction_to_send( TxId::new_random(), - MicroMinotari::from(num_outputs * 2000), + MicroMinotari::from(num_outputs as u64 * 2000), UtxoSelectionCriteria::default(), OutputFeatures::default(), MicroMinotari::from(4), @@ -834,7 +834,7 @@ async fn send_no_change() { oms.output_manager_handle.add_output(uo_1.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo_1.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo_1.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let value2 = 8000; let uo_2 = create_wallet_output_with_data( @@ -848,7 +848,7 @@ async fn send_no_change() { .unwrap(); oms.output_manager_handle.add_output(uo_2.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo_2.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo_2.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let stp = oms @@ -900,7 +900,7 @@ async fn send_not_enough_for_change() { .unwrap(); oms.output_manager_handle.add_output(uo_1.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo_1.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo_1.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let value2 = MicroMinotari(800); let uo_2 = create_wallet_output_with_data( @@ -914,7 +914,7 @@ async fn send_not_enough_for_change() { .unwrap(); oms.output_manager_handle.add_output(uo_2.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo_2.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo_2.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); match oms @@ -945,6 +945,7 @@ async fn cancel_transaction() { let mut oms = setup_output_manager_service(backend.clone(), true).await; let num_outputs = 20; + let mut unspent: Vec<(FixedHash, bool)> = Vec::with_capacity(num_outputs); for _i in 0..num_outputs { let uo = make_input( &mut OsRng.clone(), @@ -954,10 +955,9 @@ async fn cancel_transaction() { ) .await; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); - backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&oms.key_manager_handle).await.unwrap(), true)); } + backend.mark_output_as_unspent_batch_mode(unspent).unwrap(); let stp = oms .output_manager_handle .prepare_transaction_to_send( @@ -1046,7 +1046,7 @@ async fn test_get_balance() { total += uo.value; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo = make_input( @@ -1059,7 +1059,7 @@ async fn test_get_balance() { total += uo.value; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let send_value = MicroMinotari::from(1000); @@ -1114,7 +1114,7 @@ async fn sending_transaction_persisted_while_offline() { .await; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo = make_input( &mut OsRng.clone(), @@ -1125,7 +1125,7 @@ async fn sending_transaction_persisted_while_offline() { .await; oms.output_manager_handle.add_output(uo.clone(), None).await.unwrap(); backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let balance = oms.output_manager_handle.get_balance().await.unwrap(); @@ -1215,13 +1215,13 @@ async fn coin_split_with_change() { assert!(oms.output_manager_handle.add_output(uo3.clone(), None).await.is_ok()); // lets mark them as unspent so we can use them backend - .mark_output_as_unspent(uo1.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); backend - .mark_output_as_unspent(uo2.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo2.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); backend - .mark_output_as_unspent(uo3.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo3.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let fee_per_gram = MicroMinotari::from(5); @@ -1279,13 +1279,13 @@ async fn coin_split_no_change() { assert!(oms.output_manager_handle.add_output(uo3.clone(), None).await.is_ok()); // lets mark then as unspent so we can use them backend - .mark_output_as_unspent(uo1.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); backend - .mark_output_as_unspent(uo2.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo2.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); backend - .mark_output_as_unspent(uo3.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo3.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let (_tx_id, coin_split_tx, amount) = oms .output_manager_handle @@ -1309,7 +1309,7 @@ async fn it_handles_large_coin_splits() { assert!(oms.output_manager_handle.add_output(uo.clone(), None).await.is_ok()); // lets mark them as unspent so we can use them backend - .mark_output_as_unspent(uo.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let fee_per_gram = MicroMinotari::from(1); @@ -1355,7 +1355,7 @@ async fn test_txo_validation() { .await .unwrap(); oms_db - .mark_output_as_unspent(output1.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(output1.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let output2_value = 2_000_000; @@ -1373,7 +1373,7 @@ async fn test_txo_validation() { .await .unwrap(); oms_db - .mark_output_as_unspent(output2.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(output2.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let output3_value = 4_000_000; @@ -1391,7 +1391,7 @@ async fn test_txo_validation() { .unwrap(); oms_db - .mark_output_as_unspent(output3.hash(&oms.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(output3.hash(&oms.key_manager_handle).await.unwrap(), true)]) .unwrap(); let mut block1_header = BlockHeader::new(1); diff --git a/base_layer/wallet/tests/output_manager_service_tests/storage.rs b/base_layer/wallet/tests/output_manager_service_tests/storage.rs index 13f15366c1..0c43888ea6 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/storage.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/storage.rs @@ -26,7 +26,7 @@ use minotari_wallet::output_manager_service::{ storage::{ database::{OutputManagerBackend, OutputManagerDatabase}, models::DbWalletOutput, - sqlite_db::OutputManagerSqliteDatabase, + sqlite_db::{OutputManagerSqliteDatabase, ReceivedOutputInfoForBatch, SpentOutputInfoForBatch}, OutputSource, }, }; @@ -47,6 +47,7 @@ pub async fn test_db_backend(backend: T) { // Add some unspent outputs let mut unspent_outputs = Vec::new(); let key_manager = create_memory_db_key_manager(); + let mut unspent = Vec::with_capacity(5); for i in 0..5 { let uo = make_input( &mut OsRng, @@ -60,9 +61,10 @@ pub async fn test_db_backend(backend: T) { .unwrap(); kmo.wallet_output.features.maturity = i; db.add_unspent_output(kmo.clone()).unwrap(); - db.mark_output_as_unspent(kmo.hash, true).unwrap(); + unspent.push((kmo.hash, true)); unspent_outputs.push(kmo); } + db.mark_output_as_unspent_batch_mode(unspent).unwrap(); let time_locked_outputs = db.get_timelocked_outputs(3).unwrap(); assert_eq!(time_locked_outputs.len(), 1); @@ -182,13 +184,28 @@ pub async fn test_db_backend(backend: T) { }); // Set first pending tx to mined but unconfirmed + let mut updates = Vec::new(); for o in &pending_txs[0].outputs_to_be_received { - db.set_received_output_mined_height_and_status(o.hash, 2, FixedHash::zero(), false, 0) - .unwrap(); + updates.push(ReceivedOutputInfoForBatch { + hash: o.hash, + mined_height: 2, + mined_in_block: FixedHash::zero(), + confirmed: false, + mined_timestamp: 0, + }); } + db.set_received_outputs_mined_height_and_status_batch_mode(updates) + .unwrap(); + let mut spent = Vec::new(); for o in &pending_txs[0].outputs_to_be_spent { - db.mark_output_as_spent(o.hash, 3, FixedHash::zero(), false).unwrap(); + spent.push(SpentOutputInfoForBatch { + hash: o.hash, + confirmed: false, + mark_deleted_at_height: 3, + mark_deleted_in_block: FixedHash::zero(), + }); } + db.mark_output_as_spent_batch_mode(spent).unwrap(); // Balance shouldn't change let balance = db.get_balance(None).unwrap(); @@ -201,13 +218,28 @@ pub async fn test_db_backend(backend: T) { }); // Set second pending tx to mined and confirmed + let mut updates = Vec::new(); for o in &pending_txs[1].outputs_to_be_received { - db.set_received_output_mined_height_and_status(o.hash, 4, FixedHash::zero(), true, 0) - .unwrap(); + updates.push(ReceivedOutputInfoForBatch { + hash: o.hash, + mined_height: 4, + mined_in_block: FixedHash::zero(), + confirmed: true, + mined_timestamp: 0, + }); } + db.set_received_outputs_mined_height_and_status_batch_mode(updates) + .unwrap(); + let mut spent = Vec::new(); for o in &pending_txs[1].outputs_to_be_spent { - db.mark_output_as_spent(o.hash, 5, FixedHash::zero(), true).unwrap(); + spent.push(SpentOutputInfoForBatch { + hash: o.hash, + confirmed: true, + mark_deleted_at_height: 5, + mark_deleted_in_block: FixedHash::zero(), + }); } + db.mark_output_as_spent_batch_mode(spent).unwrap(); // Balance with confirmed second pending tx let mut available_balance = unspent_outputs @@ -288,12 +320,12 @@ pub async fn test_db_backend(backend: T) { assert_eq!(mined_unspent_outputs.len(), 4); // Spend a received and confirmed output - db.mark_output_as_spent( - pending_txs[1].outputs_to_be_received[0].hash, - 6, - FixedHash::zero(), - true, - ) + db.mark_output_as_spent_batch_mode(vec![SpentOutputInfoForBatch { + hash: pending_txs[1].outputs_to_be_received[0].hash, + confirmed: true, + mark_deleted_at_height: 6, + mark_deleted_in_block: FixedHash::zero(), + }]) .unwrap(); let mined_unspent_outputs = db.fetch_mined_unspent_outputs().unwrap(); @@ -330,6 +362,10 @@ pub async fn test_db_backend(backend: T) { #[tokio::test] pub async fn test_output_manager_sqlite_db() { + //` cargo test --color=always --test wallet_integration_tests + //` output_manager_service_tests::storage::test_output_manager_sqlite_db > .\target\output.txt 2>&1 + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + let (connection, _tempdir) = get_temp_sqlite_database_connection(); test_db_backend(OutputManagerSqliteDatabase::new(connection)).await; @@ -418,7 +454,13 @@ pub async fn test_no_duplicate_outputs() { // add it to the database let result = db.add_unspent_output(kmo.clone()); assert!(result.is_ok()); - let result = db.set_received_output_mined_height_and_status(kmo.hash, 1, FixedHash::zero(), true, 0); + let result = db.set_received_outputs_mined_height_and_status_batch_mode(vec![ReceivedOutputInfoForBatch { + hash: kmo.hash, + mined_height: 1, + mined_in_block: FixedHash::zero(), + confirmed: true, + mined_timestamp: 0, + }]); assert!(result.is_ok()); let outputs = db.fetch_mined_unspent_outputs().unwrap(); assert_eq!(outputs.len(), 1); @@ -459,8 +501,14 @@ pub async fn test_mark_as_unmined() { // add it to the database db.add_unspent_output(kmo.clone()).unwrap(); - db.set_received_output_mined_height_and_status(kmo.hash, 1, FixedHash::zero(), true, 0) - .unwrap(); + db.set_received_outputs_mined_height_and_status_batch_mode(vec![ReceivedOutputInfoForBatch { + hash: kmo.hash, + mined_height: 1, + mined_in_block: FixedHash::zero(), + confirmed: true, + mined_timestamp: 0, + }]) + .unwrap(); let o = db.get_last_mined_output().unwrap().unwrap(); assert_eq!(o.hash, kmo.hash); db.set_output_to_unmined_and_invalid(kmo.hash).unwrap(); @@ -469,4 +517,55 @@ pub async fn test_mark_as_unmined() { assert_eq!(o.hash, kmo.hash); assert!(o.mined_height.is_none()); assert!(o.mined_in_block.is_none()); + + // Test batch mode operations + // - Add 5 outputs and remember the hashes + let batch_count = 7usize; + let mut batch_hashes = Vec::with_capacity(batch_count); + let mut batch_outputs = Vec::with_capacity(batch_count); + let mut batch_info = Vec::with_capacity(batch_count); + for i in 0..batch_count { + let uo = make_input( + &mut OsRng, + MicroMinotari::from(1000), + &OutputFeatures::default(), + &key_manager, + ) + .await; + let kmo = DbWalletOutput::from_wallet_output(uo, &key_manager, None, OutputSource::Standard, None, None) + .await + .unwrap(); + db.add_unspent_output(kmo.clone()).unwrap(); + batch_hashes.push(kmo.hash); + batch_info.push(ReceivedOutputInfoForBatch { + hash: kmo.hash, + mined_height: i as u64 + 1, + mined_in_block: FixedHash::zero(), + confirmed: true, + mined_timestamp: i as u64, + }); + batch_outputs.push(kmo); + } + + // - Perform batch mode operations + db.set_received_outputs_mined_height_and_status_batch_mode(batch_info) + .unwrap(); + + let last = db.get_last_mined_output().unwrap().unwrap(); + assert_eq!(last.hash, batch_outputs.last().unwrap().hash); + + db.set_output_to_unmined_and_invalid_batch_mode(batch_hashes).unwrap(); + assert!(db.get_last_mined_output().unwrap().is_none()); + + let invalid_outputs = db.get_invalid_outputs().unwrap(); + let mut batch_invalid_count = 0; + for invalid in invalid_outputs { + if let Some(kmo) = batch_outputs.iter().find(|wo| wo.hash == invalid.hash) { + assert_eq!(invalid.hash, kmo.hash); + assert!(invalid.mined_height.is_none()); + assert!(invalid.mined_in_block.is_none()); + batch_invalid_count += 1; + } + } + assert_eq!(batch_invalid_count, batch_count); } diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index f4a5ade587..21c4de5425 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -53,7 +53,7 @@ use minotari_wallet::{ storage::{ database::{OutputManagerBackend, OutputManagerDatabase}, models::KnownOneSidedPaymentScript, - sqlite_db::OutputManagerSqliteDatabase, + sqlite_db::{OutputManagerSqliteDatabase, ReceivedOutputInfoForBatch}, }, OutputManagerServiceInitializer, UtxoSelectionCriteria, @@ -90,7 +90,7 @@ use tari_common_types::{ chain_metadata::ChainMetadata, tari_address::TariAddress, transaction::{ImportStatus, TransactionDirection, TransactionStatus, TxId}, - types::{FixedHash, HashOutput, PrivateKey, PublicKey, Signature}, + types::{FixedHash, PrivateKey, PublicKey, Signature}, }; use tari_comms::{ message::EnvelopeBody, @@ -603,7 +603,7 @@ async fn manage_single_transaction() { alice_oms.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "TAKE MAH MONEYS!".to_string(); @@ -745,8 +745,9 @@ async fn large_interactive_transaction() { .unwrap(); // Alice prepares her large transaction - let outputs_count = 1250u64; + let outputs_count = 1250usize; let output_value = MicroMinotari(20000); + let mut unspent: Vec<(FixedHash, bool)> = Vec::with_capacity(outputs_count); for _ in 0..outputs_count { let uo = make_input( &mut OsRng, @@ -756,11 +757,10 @@ async fn large_interactive_transaction() { ) .await; alice_oms.add_output(uo.clone(), None).await.unwrap(); - alice_db - .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&alice_key_manager_handle).await.unwrap(), true)); } - let transaction_value = output_value * (outputs_count - 1); + alice_db.mark_output_as_unspent_batch_mode(unspent).unwrap(); + let transaction_value = output_value * (outputs_count as u64 - 1); let bob_address = TariAddress::new(bob_node_identity.public_key().clone(), network); let message = "TAKE MAH MONEYS!".to_string(); @@ -837,10 +837,7 @@ async fn large_interactive_transaction() { .get_completed_transaction(tx_id) .await .expect("Could not find tx"); - assert_eq!( - bob_completed_tx.transaction.body.inputs().len(), - usize::try_from(outputs_count).unwrap() - ); + assert_eq!(bob_completed_tx.transaction.body.inputs().len(), outputs_count); assert_eq!( bob_oms.get_balance().await.unwrap().pending_incoming_balance, transaction_value @@ -910,14 +907,14 @@ async fn test_spend_dust_to_self_in_oversized_transaction() { &alice_key_manager_handle, ) .await; + let mut unspent: Vec<(FixedHash, bool)> = Vec::with_capacity(number_of_outputs); for _ in 0..number_of_outputs { let uo = make_fake_input_from_copy(&mut uo_reference, &alice_key_manager_handle).await; alice_oms.add_output(uo.clone(), None).await.unwrap(); - alice_db - .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&alice_key_manager_handle).await.unwrap(), true)); } + alice_db.mark_output_as_unspent_batch_mode(unspent).unwrap(); let balance = alice_oms.get_balance().await.unwrap(); let initial_available_balance = balance.available_balance; @@ -1007,14 +1004,14 @@ async fn test_spend_dust_to_other_in_oversized_transaction() { &alice_key_manager_handle, ) .await; + let mut unspent: Vec<(FixedHash, bool)> = Vec::with_capacity(number_of_outputs); for _ in 0..number_of_outputs { let uo = make_fake_input_from_copy(&mut uo_reference, &alice_key_manager_handle).await; alice_oms.add_output(uo.clone(), None).await.unwrap(); - alice_db - .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&alice_key_manager_handle).await.unwrap(), true)); } + alice_db.mark_output_as_unspent_batch_mode(unspent).unwrap(); let balance = alice_oms.get_balance().await.unwrap(); let initial_available_balance = balance.available_balance; @@ -1122,14 +1119,14 @@ async fn test_spend_dust_happy_path() { &alice_key_manager_handle, ) .await; + let mut unspent: Vec<(FixedHash, bool)> = Vec::with_capacity(number_of_outputs as usize); for _ in 0..number_of_outputs { let uo = make_fake_input_from_copy(&mut uo_reference, &alice_key_manager_handle).await; alice_oms.add_output(uo.clone(), None).await.unwrap(); - alice_db - .mark_output_as_unspent(uo.hash(&alice_key_manager_handle).await.unwrap(), true) - .unwrap(); + unspent.push((uo.hash(&alice_key_manager_handle).await.unwrap(), true)); } + alice_db.mark_output_as_unspent_batch_mode(unspent).unwrap(); let balance = alice_oms.get_balance().await.unwrap(); let initial_available_balance = balance.available_balance; @@ -1279,7 +1276,7 @@ async fn single_transaction_to_self() { alice_oms.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "TAKE MAH _OWN_ MONEYS!".to_string(); let value = 10000.into(); @@ -1363,7 +1360,7 @@ async fn large_coin_split_transaction() { alice_oms.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); let fee_per_gram = MicroMinotari::from(1); @@ -1450,7 +1447,7 @@ async fn single_transaction_burn_tari() { alice_oms.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "BURN MAH _OWN_ MONEYS!".to_string(); let burn_value = 10000.into(); @@ -1598,7 +1595,7 @@ async fn send_one_sided_transaction_to_other() { let mut alice_oms_clone = alice_oms.clone(); alice_oms_clone.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "SEE IF YOU CAN CATCH THIS ONE..... SIDED TX!".to_string(); @@ -1741,7 +1738,7 @@ async fn recover_one_sided_transaction() { let mut alice_oms_clone = alice_oms; alice_oms_clone.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "".to_string(); @@ -1846,7 +1843,7 @@ async fn test_htlc_send_and_claim() { .await; alice_oms.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "".to_string(); @@ -1970,7 +1967,7 @@ async fn send_one_sided_transaction_to_self() { let mut alice_oms_clone = alice_oms; alice_oms_clone.add_output(uo1.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); let message = "SEE IF YOU CAN CATCH THIS ONE..... SIDED TX!".to_string(); @@ -2114,7 +2111,7 @@ async fn manage_multiple_transactions() { .await; bob_oms.add_output(uo2.clone(), None).await.unwrap(); bob_db - .mark_output_as_unspent(uo2.hash(&bob_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo2.hash(&bob_key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo3 = make_input( &mut OsRng, @@ -2125,7 +2122,7 @@ async fn manage_multiple_transactions() { .await; carol_oms.add_output(uo3.clone(), None).await.unwrap(); carol_db - .mark_output_as_unspent(uo3.hash(&key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo3.hash(&key_manager_handle).await.unwrap(), true)]) .unwrap(); // Add some funds to Alices wallet @@ -2138,7 +2135,7 @@ async fn manage_multiple_transactions() { .await; alice_oms.add_output(uo1a.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1a.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1a.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo1b = make_input( &mut OsRng, @@ -2149,7 +2146,7 @@ async fn manage_multiple_transactions() { .await; alice_oms.add_output(uo1b.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1b.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1b.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo1c = make_input( &mut OsRng, @@ -2160,7 +2157,7 @@ async fn manage_multiple_transactions() { .await; alice_oms.add_output(uo1c.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1c.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1c.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); // A series of interleaved transactions. First with Bob and Carol offline and then two with them online @@ -2345,7 +2342,10 @@ async fn test_accepting_unknown_tx_id_and_malformed_reply() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let bob_address = TariAddress::new(bob_node_identity.public_key().clone(), Network::LocalNet); @@ -2444,7 +2444,10 @@ async fn finalize_tx_with_incorrect_pubkey() { .unwrap(); bob_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&bob_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&bob_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let mut stp = bob_ts_interface .output_manager_service_handle @@ -2569,7 +2572,10 @@ async fn finalize_tx_with_missing_output() { .unwrap(); bob_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&bob_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&bob_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let mut stp = bob_ts_interface @@ -2745,7 +2751,7 @@ async fn discovery_async_return_test() { .await; alice_oms.add_output(uo1a.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1a.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1a.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo1b = make_input( &mut OsRng, @@ -2756,7 +2762,7 @@ async fn discovery_async_return_test() { .await; alice_oms.add_output(uo1b.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1b.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1b.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let uo1c = make_input( &mut OsRng, @@ -2767,7 +2773,7 @@ async fn discovery_async_return_test() { .await; alice_oms.add_output(uo1c.clone(), None).await.unwrap(); alice_db - .mark_output_as_unspent(uo1c.hash(&alice_key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![(uo1c.hash(&alice_key_manager_handle).await.unwrap(), true)]) .unwrap(); let initial_balance = alice_oms.get_balance().await.unwrap(); @@ -3100,7 +3106,10 @@ async fn test_transaction_cancellation() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 100000 * uT; @@ -3440,7 +3449,10 @@ async fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 100000 * uT; @@ -3635,7 +3647,10 @@ async fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 20000 * uT; @@ -3753,7 +3768,10 @@ async fn test_tx_direct_send_behaviour() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let uo = make_input( &mut OsRng, @@ -3769,7 +3787,10 @@ async fn test_tx_direct_send_behaviour() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let uo = make_input( &mut OsRng, @@ -3785,7 +3806,10 @@ async fn test_tx_direct_send_behaviour() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let uo = make_input( &mut OsRng, @@ -3801,7 +3825,10 @@ async fn test_tx_direct_send_behaviour() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 100000 * uT; @@ -4261,7 +4288,10 @@ async fn test_transaction_resending() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 100000 * uT; @@ -4776,7 +4806,10 @@ async fn test_replying_to_cancelled_tx() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 100000 * uT; let bob_address = TariAddress::new(bob_node_identity.public_key().clone(), Network::LocalNet); @@ -4908,7 +4941,10 @@ async fn test_transaction_timeout_cancellation() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent = 10000 * uT; @@ -5176,7 +5212,10 @@ async fn transaction_service_tx_broadcast() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let uo2 = make_input( @@ -5193,7 +5232,10 @@ async fn transaction_service_tx_broadcast() { .unwrap(); alice_ts_interface .oms_db - .mark_output_as_unspent(uo2.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true) + .mark_output_as_unspent_batch_mode(vec![( + uo2.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]) .unwrap(); let amount_sent1 = 100000 * uT; @@ -5739,18 +5781,19 @@ async fn test_update_faux_tx_on_oms_validation() { .add_output_with_tx_id(tx_id, uo.clone(), None) .await .unwrap(); - let _result = alice_ts_interface - .oms_db - .mark_output_as_unspent(uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), true); + let _result = alice_ts_interface.oms_db.mark_output_as_unspent_batch_mode(vec![( + uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + true, + )]); alice_ts_interface .oms_db - .set_received_output_mined_height_and_status( - uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), - 5, - HashOutput::zero(), - false, - 0, - ) + .set_received_outputs_mined_height_and_status_batch_mode(vec![ReceivedOutputInfoForBatch { + hash: uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + mined_height: 5, + mined_in_block: FixedHash::zero(), + confirmed: false, + mined_timestamp: 0, + }]) .unwrap(); } @@ -5916,13 +5959,13 @@ async fn test_update_coinbase_tx_on_oms_validation() { if uo.value != MicroMinotari::from(30000) { alice_ts_interface .oms_db - .set_received_output_mined_height_and_status( - uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), - 5, - HashOutput::zero(), - false, - 0, - ) + .set_received_outputs_mined_height_and_status_batch_mode(vec![ReceivedOutputInfoForBatch { + hash: uo.hash(&alice_ts_interface.key_manager_handle).await.unwrap(), + mined_height: 5, + mined_in_block: FixedHash::zero(), + confirmed: false, + mined_timestamp: 0, + }]) .unwrap(); } }