Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize get transactions query #3496

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1625,12 +1625,14 @@ where
}

trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols");
self.broadcast_all_completed_transactions(broadcast_join_handles)
self.broadcast_completed_and_broadcast_transactions(broadcast_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error broadcasting all completed transactions: {:?}", resp
"Error broadcasting all valid and not cancelled Completed Transactions with status 'Completed' \
and 'Broadcast': {:?}",
resp
);
resp
})?;
Expand Down Expand Up @@ -1682,22 +1684,21 @@ where
Ok(())
}

/// Go through all completed transactions that have not yet been broadcast and broadcast all of them to the base
/// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base
/// node.
async fn broadcast_all_completed_transactions(
async fn broadcast_completed_and_broadcast_transactions(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError> {
trace!(target: LOG_TARGET, "Attempting to Broadcast all Completed Transactions");
let completed_txs = self.db.get_completed_transactions().await?;
for (_, completed_tx) in completed_txs {
if completed_tx.valid &&
(completed_tx.status == TransactionStatus::Completed ||
completed_tx.status == TransactionStatus::Broadcast) &&
!completed_tx.is_coinbase()
{
self.broadcast_completed_transaction(completed_tx, join_handles).await?;
}
trace!(
target: LOG_TARGET,
"Attempting to Broadcast all valid and not cancelled Completed Transactions with status 'Completed' and \
'Broadcast'"
);
let txn_list = self.db.get_transactions_to_be_broadcast().await?;
for completed_txn in txn_list {
self.broadcast_completed_transaction(completed_txn, join_handles)
.await?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub trait TransactionBackend: Send + Sync + Clone {

fn fetch_unconfirmed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

/// Check if a record with the provided key exists in the backend.
fn contains(&self, key: &DbKey) -> Result<bool, TransactionStorageError>;
/// Modify the state the of the backend with a write operation
Expand Down Expand Up @@ -424,6 +426,11 @@ where T: TransactionBackend + 'static
self.db.fetch_unconfirmed_transactions()
}

/// This method returns all completed transactions that must be broadcast
pub async fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.db.get_transactions_to_be_broadcast()
}

pub async fn get_completed_transaction_cancelled_or_not(
&self,
tx_id: TxId,
Expand Down
106 changes: 106 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,42 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
Ok(result)
}

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let txs = completed_transactions::table
.filter(completed_transactions::valid.eq(true as i32))
.filter(
completed_transactions::status
.eq(TransactionStatus::Completed as i32)
.or(completed_transactions::status.eq(TransactionStatus::Broadcast as i32)),
)
.filter(
completed_transactions::coinbase_block_height
.is_null()
.or(completed_transactions::coinbase_block_height.eq(0)),
)
.filter(completed_transactions::cancelled.eq(false as i32))
.order_by(completed_transactions::tx_id)
.load::<CompletedTransactionSql>(&*conn)?;

let mut result = vec![];
for mut tx in txs {
self.decrypt_if_necessary(&mut tx)?;
result.push(tx.try_into()?);
}
trace!(
target: LOG_TARGET,
"sqlite profile - get_transactions_to_be_broadcast: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);

Ok(result)
}

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
Expand Down Expand Up @@ -2362,4 +2398,74 @@ mod test {
assert!(db3.fetch(&DbKey::PendingOutboundTransactions).is_ok());
assert!(db3.fetch(&DbKey::CompletedTransactions).is_ok());
}

#[test]
fn test_get_tranactions_to_be_rebroadcast() {
let db_name = format!("{}.sqlite3", string(8).as_str());
let temp_dir = tempdir().unwrap();
let db_folder = temp_dir.path().to_str().unwrap().to_string();
let db_path = format!("{}{}", db_folder, db_name);

embed_migrations!("./migrations");
let conn = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path));

embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed");

for i in 0..1000 {
let (valid, cancelled, status, coinbase_block_height) = match i % 13 {
0 => (true, i % 3 == 0, TransactionStatus::Completed, None),
1 => (true, i % 5 == 0, TransactionStatus::Broadcast, None),
2 => (true, i % 7 == 0, TransactionStatus::Completed, Some(i % 2)),
3 => (true, i % 11 == 0, TransactionStatus::Broadcast, Some(i % 2)),
4 => (i % 13 == 0, false, TransactionStatus::Completed, None),
5 => (i % 17 == 0, false, TransactionStatus::Broadcast, None),
6 => (true, false, TransactionStatus::Pending, None),
7 => (true, false, TransactionStatus::Coinbase, None),
8 => (true, false, TransactionStatus::MinedUnconfirmed, None),
9 => (true, false, TransactionStatus::Imported, None),
10 => (true, false, TransactionStatus::MinedConfirmed, None),
_ => (true, false, TransactionStatus::Completed, Some(i)),
};
let completed_tx = CompletedTransaction {
tx_id: i,
source_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
destination_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
amount: MicroTari::from(100),
fee: MicroTari::from(100),
transaction: Transaction::new(
vec![],
vec![],
vec![],
PrivateKey::random(&mut OsRng),
PrivateKey::random(&mut OsRng),
),
status,
message: "Yo!".to_string(),
timestamp: Utc::now().naive_utc(),
cancelled,
direction: TransactionDirection::Unknown,
coinbase_block_height,
send_count: 0,
last_send_timestamp: None,
valid,
confirmations: None,
mined_height: None,
mined_in_block: None,
};
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap();
completed_tx_sql.commit(&conn).unwrap();
}

let connection = WalletDbConnection::new(conn, None);
let db1 = TransactionServiceSqliteDatabase::new(connection, None);

let txn_list = db1.get_transactions_to_be_broadcast().unwrap();
assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185);
for txn in &txn_list {
assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast);
assert!(txn.valid);
assert!(!txn.cancelled);
assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0));
}
}
}
17 changes: 17 additions & 0 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,23 @@ async fn test_txo_validation() {
.await
.unwrap();

// This is needed on a fast computer, otherwise the balance have not been updated correctly yet with the next step
let mut event_stream = oms.get_event_stream();
hansieodendaal marked this conversation as resolved.
Show resolved Hide resolved
let delay = sleep(Duration::from_secs(10));
philipr-za marked this conversation as resolved.
Show resolved Hide resolved
tokio::pin!(delay);
loop {
tokio::select! {
event = event_stream.recv() => {
if let OutputManagerEvent::TxoValidationSuccess(_) = &*event.unwrap(){
break;
}
},
() = &mut delay => {
break;
},
}
}

let balance = oms.get_balance().await.unwrap();
assert_eq!(
balance.available_balance,
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/features/WalletCli.feature
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Feature: Wallet CLI
# TODO: base node connection.
And I wait 30 seconds
And I stop wallet SENDER
And I make it rain from wallet SENDER 1 tx / sec 10 sec 8000 uT 100 increment to RECEIVER via command line
And I make it rain from wallet SENDER 1 tx per sec 10 sec 8000 uT 100 increment to RECEIVER via command line
Then wallet SENDER has at least 10 transactions that are all TRANSACTION_STATUS_BROADCAST and valid
Then wallet RECEIVER has at least 10 transactions that are all TRANSACTION_STATUS_BROADCAST and valid
And mining node MINE mines 5 blocks
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/features/support/steps.js
Original file line number Diff line number Diff line change
Expand Up @@ -3547,7 +3547,7 @@ When(
);

Then(
"I make it rain from wallet {word} {int} tx / sec {int} sec {int} uT {int} increment to {word} via command line",
"I make it rain from wallet {word} {int} tx per sec {int} sec {int} uT {int} increment to {word} via command line",
{ timeout: 300 * 1000 },
async function (sender, freq, duration, amount, amount_inc, receiver) {
let wallet = this.getWallet(sender);
Expand Down