Skip to content

Commit

Permalink
Optimize get transactions query
Browse files Browse the repository at this point in the history
Optimized the get transactiosn query for transactions that
need to be broadcast/rebroadcast by sending a single diesel
sql query that only returns the result, isntead of multiple
queries that return all the transactions in the database
with filtering and selection in the Rust code.
  • Loading branch information
hansieodendaal committed Oct 26, 2021
1 parent 762cb9a commit ac42d5e
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 13 deletions.
25 changes: 12 additions & 13 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ where
}

trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols");
self.broadcast_all_completed_transactions(broadcast_join_handles)
self.broadcast_selected_completed_transactions(broadcast_join_handles)
.await
.map_err(|resp| {
error!(
Expand Down Expand Up @@ -1682,22 +1682,21 @@ where
Ok(())
}

/// Go through all completed transactions that have not yet been broadcast and broadcast all of them to the base
/// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base
/// node.
async fn broadcast_all_completed_transactions(
async fn broadcast_selected_completed_transactions(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError> {
trace!(target: LOG_TARGET, "Attempting to Broadcast all Completed Transactions");
let completed_txs = self.db.get_completed_transactions().await?;
for (_, completed_tx) in completed_txs {
if completed_tx.valid &&
(completed_tx.status == TransactionStatus::Completed ||
completed_tx.status == TransactionStatus::Broadcast) &&
!completed_tx.is_coinbase()
{
self.broadcast_completed_transaction(completed_tx, join_handles).await?;
}
trace!(
target: LOG_TARGET,
"Attempting to Broadcast all valid and not cancelled Completed Transactions with status 'Completed' and \
'Broadcast'"
);
let txn_list = self.db.get_transactions_to_be_broadcast().await?;
for completed_txn in txn_list {
self.broadcast_completed_transaction(completed_txn, join_handles)
.await?;
}

Ok(())
Expand Down
7 changes: 7 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub trait TransactionBackend: Send + Sync + Clone {

fn fetch_unconfirmed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

/// Check if a record with the provided key exists in the backend.
fn contains(&self, key: &DbKey) -> Result<bool, TransactionStorageError>;
/// Modify the state the of the backend with a write operation
Expand Down Expand Up @@ -424,6 +426,11 @@ where T: TransactionBackend + 'static
self.db.fetch_unconfirmed_transactions()
}

/// This method returns all completed transactions that must be re-broadcast
pub async fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.db.get_transactions_to_be_broadcast()
}

pub async fn get_completed_transaction_cancelled_or_not(
&self,
tx_id: TxId,
Expand Down
106 changes: 106 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,42 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
Ok(result)
}

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let txs = completed_transactions::table
.filter(completed_transactions::valid.eq(true as i32))
.filter(
completed_transactions::status
.eq(TransactionStatus::Completed as i32)
.or(completed_transactions::status.eq(TransactionStatus::Broadcast as i32)),
)
.filter(
completed_transactions::coinbase_block_height
.is_null()
.or(completed_transactions::coinbase_block_height.eq(0)),
)
.filter(completed_transactions::cancelled.eq(false as i32))
.order_by(completed_transactions::tx_id)
.load::<CompletedTransactionSql>(&*conn)?;

let mut result = vec![];
for mut tx in txs {
self.decrypt_if_necessary(&mut tx)?;
result.push(tx.try_into()?);
}
trace!(
target: LOG_TARGET,
"sqlite profile - get_transactions_to_be_broadcast: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);

Ok(result)
}

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
Expand Down Expand Up @@ -2362,4 +2398,74 @@ mod test {
assert!(db3.fetch(&DbKey::PendingOutboundTransactions).is_ok());
assert!(db3.fetch(&DbKey::CompletedTransactions).is_ok());
}

#[test]
fn test_get_tranactions_to_be_rebroadcast() {
let db_name = format!("{}.sqlite3", string(8).as_str());
let temp_dir = tempdir().unwrap();
let db_folder = temp_dir.path().to_str().unwrap().to_string();
let db_path = format!("{}{}", db_folder, db_name);

embed_migrations!("./migrations");
let conn = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path));

embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed");

for i in 0..1000 {
let (valid, cancelled, status, coinbase_block_height) = match i % 13 {
0 => (true, i % 3 == 0, TransactionStatus::Completed, None),
1 => (true, i % 5 == 0, TransactionStatus::Broadcast, None),
2 => (true, i % 7 == 0, TransactionStatus::Completed, Some(i % 2)),
3 => (true, i % 11 == 0, TransactionStatus::Broadcast, Some(i % 2)),
4 => (i % 13 == 0, false, TransactionStatus::Completed, None),
5 => (i % 17 == 0, false, TransactionStatus::Broadcast, None),
6 => (true, false, TransactionStatus::Pending, None),
7 => (true, false, TransactionStatus::Coinbase, None),
8 => (true, false, TransactionStatus::MinedUnconfirmed, None),
9 => (true, false, TransactionStatus::Imported, None),
10 => (true, false, TransactionStatus::MinedConfirmed, None),
_ => (true, false, TransactionStatus::Completed, Some(i)),
};
let completed_tx = CompletedTransaction {
tx_id: i,
source_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
destination_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
amount: MicroTari::from(100),
fee: MicroTari::from(100),
transaction: Transaction::new(
vec![],
vec![],
vec![],
PrivateKey::random(&mut OsRng),
PrivateKey::random(&mut OsRng),
),
status,
message: "Yo!".to_string(),
timestamp: Utc::now().naive_utc(),
cancelled,
direction: TransactionDirection::Unknown,
coinbase_block_height,
send_count: 0,
last_send_timestamp: None,
valid,
confirmations: None,
mined_height: None,
mined_in_block: None,
};
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap();
completed_tx_sql.commit(&conn).unwrap();
}

let connection = WalletDbConnection::new(conn, None);
let db1 = TransactionServiceSqliteDatabase::new(connection, None);

let txn_list = db1.get_transactions_to_be_broadcast().unwrap();
assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185);
for txn in &txn_list {
assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast);
assert!(txn.valid);
assert!(!txn.cancelled);
assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0));
}
}
}
17 changes: 17 additions & 0 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,23 @@ async fn test_txo_validation() {
.await
.unwrap();

// This is needed on a fast computer, otherwise the balance have not been updated correctly yet with the next step
let mut event_stream = oms.get_event_stream();
let delay = sleep(Duration::from_secs(10));
tokio::pin!(delay);
loop {
tokio::select! {
event = event_stream.recv() => {
if let OutputManagerEvent::TxoValidationSuccess(_) = &*event.unwrap(){
break;
}
},
() = &mut delay => {
break;
},
}
}

let balance = oms.get_balance().await.unwrap();
assert_eq!(
balance.available_balance,
Expand Down

0 comments on commit ac42d5e

Please sign in to comment.