Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize get transactions query #3496

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ where
}

trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols");
self.broadcast_all_completed_transactions(broadcast_join_handles)
self.broadcast_selected_completed_transactions(broadcast_join_handles)
philipr-za marked this conversation as resolved.
Show resolved Hide resolved
.await
.map_err(|resp| {
error!(
Expand Down Expand Up @@ -1682,22 +1682,21 @@ where
Ok(())
}

/// Go through all completed transactions that have not yet been broadcast and broadcast all of them to the base
/// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base
/// node.
async fn broadcast_all_completed_transactions(
async fn broadcast_selected_completed_transactions(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError> {
trace!(target: LOG_TARGET, "Attempting to Broadcast all Completed Transactions");
let completed_txs = self.db.get_completed_transactions().await?;
for (_, completed_tx) in completed_txs {
if completed_tx.valid &&
(completed_tx.status == TransactionStatus::Completed ||
completed_tx.status == TransactionStatus::Broadcast) &&
!completed_tx.is_coinbase()
{
self.broadcast_completed_transaction(completed_tx, join_handles).await?;
}
trace!(
target: LOG_TARGET,
"Attempting to Broadcast all valid and not cancelled Completed Transactions with status 'Completed' and \
'Broadcast'"
);
let txn_list = self.db.get_transactions_to_be_broadcast().await?;
for completed_txn in txn_list {
self.broadcast_completed_transaction(completed_txn, join_handles)
.await?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub trait TransactionBackend: Send + Sync + Clone {

fn fetch_unconfirmed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

/// Check if a record with the provided key exists in the backend.
fn contains(&self, key: &DbKey) -> Result<bool, TransactionStorageError>;
/// Modify the state the of the backend with a write operation
Expand Down Expand Up @@ -424,6 +426,11 @@ where T: TransactionBackend + 'static
self.db.fetch_unconfirmed_transactions()
}

/// This method returns all completed transactions that must be re-broadcast
philipr-za marked this conversation as resolved.
Show resolved Hide resolved
pub async fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.db.get_transactions_to_be_broadcast()
}

pub async fn get_completed_transaction_cancelled_or_not(
&self,
tx_id: TxId,
Expand Down
106 changes: 106 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,42 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
Ok(result)
}

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let txs = completed_transactions::table
.filter(completed_transactions::valid.eq(true as i32))
.filter(
completed_transactions::status
.eq(TransactionStatus::Completed as i32)
.or(completed_transactions::status.eq(TransactionStatus::Broadcast as i32)),
)
.filter(
completed_transactions::coinbase_block_height
.is_null()
.or(completed_transactions::coinbase_block_height.eq(0)),
)
.filter(completed_transactions::cancelled.eq(false as i32))
.order_by(completed_transactions::tx_id)
.load::<CompletedTransactionSql>(&*conn)?;

let mut result = vec![];
for mut tx in txs {
self.decrypt_if_necessary(&mut tx)?;
result.push(tx.try_into()?);
}
trace!(
target: LOG_TARGET,
"sqlite profile - get_transactions_to_be_broadcast: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);

Ok(result)
}

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
Expand Down Expand Up @@ -2362,4 +2398,74 @@ mod test {
assert!(db3.fetch(&DbKey::PendingOutboundTransactions).is_ok());
assert!(db3.fetch(&DbKey::CompletedTransactions).is_ok());
}

#[test]
fn test_get_tranactions_to_be_rebroadcast() {
let db_name = format!("{}.sqlite3", string(8).as_str());
let temp_dir = tempdir().unwrap();
let db_folder = temp_dir.path().to_str().unwrap().to_string();
let db_path = format!("{}{}", db_folder, db_name);

embed_migrations!("./migrations");
let conn = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path));

embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed");

for i in 0..1000 {
let (valid, cancelled, status, coinbase_block_height) = match i % 13 {
0 => (true, i % 3 == 0, TransactionStatus::Completed, None),
1 => (true, i % 5 == 0, TransactionStatus::Broadcast, None),
2 => (true, i % 7 == 0, TransactionStatus::Completed, Some(i % 2)),
3 => (true, i % 11 == 0, TransactionStatus::Broadcast, Some(i % 2)),
4 => (i % 13 == 0, false, TransactionStatus::Completed, None),
5 => (i % 17 == 0, false, TransactionStatus::Broadcast, None),
6 => (true, false, TransactionStatus::Pending, None),
7 => (true, false, TransactionStatus::Coinbase, None),
8 => (true, false, TransactionStatus::MinedUnconfirmed, None),
9 => (true, false, TransactionStatus::Imported, None),
10 => (true, false, TransactionStatus::MinedConfirmed, None),
_ => (true, false, TransactionStatus::Completed, Some(i)),
};
let completed_tx = CompletedTransaction {
tx_id: i,
source_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
destination_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
amount: MicroTari::from(100),
fee: MicroTari::from(100),
transaction: Transaction::new(
vec![],
vec![],
vec![],
PrivateKey::random(&mut OsRng),
PrivateKey::random(&mut OsRng),
),
status,
message: "Yo!".to_string(),
timestamp: Utc::now().naive_utc(),
cancelled,
direction: TransactionDirection::Unknown,
coinbase_block_height,
send_count: 0,
last_send_timestamp: None,
valid,
confirmations: None,
mined_height: None,
mined_in_block: None,
};
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap();
completed_tx_sql.commit(&conn).unwrap();
}

let connection = WalletDbConnection::new(conn, None);
let db1 = TransactionServiceSqliteDatabase::new(connection, None);

let txn_list = db1.get_transactions_to_be_broadcast().unwrap();
assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185);
for txn in &txn_list {
assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast);
assert!(txn.valid);
assert!(!txn.cancelled);
assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0));
}
}
}
17 changes: 17 additions & 0 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,23 @@ async fn test_txo_validation() {
.await
.unwrap();

// This is needed on a fast computer, otherwise the balance have not been updated correctly yet with the next step
let mut event_stream = oms.get_event_stream();
hansieodendaal marked this conversation as resolved.
Show resolved Hide resolved
let delay = sleep(Duration::from_secs(10));
philipr-za marked this conversation as resolved.
Show resolved Hide resolved
tokio::pin!(delay);
loop {
tokio::select! {
event = event_stream.recv() => {
if let OutputManagerEvent::TxoValidationSuccess(_) = &*event.unwrap(){
break;
}
},
() = &mut delay => {
break;
},
}
}

let balance = oms.get_balance().await.unwrap();
assert_eq!(
balance.available_balance,
Expand Down