From 131e5bfbd4bf59cd806c1df8d5e46bc97e8b3dd9 Mon Sep 17 00:00:00 2001 From: yse Date: Thu, 31 Oct 2024 11:52:37 +0100 Subject: [PATCH] feat: add database locking to make send atomic --- lib/core/src/persist/send.rs | 43 ++++++++++++++++++++++++++++++++++++ lib/core/src/sdk.rs | 3 +-- lib/core/src/send_swap.rs | 25 ++++++++++++++++----- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index eca2c4336..f584bae99 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -241,6 +241,49 @@ impl Persister { Ok(()) } + + pub(crate) fn set_send_swap_lockup_tx_id( + &self, + swap_id: &str, + lockup_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + + let row_count = con + .execute( + "UPDATE send_swaps + SET lockup_tx_id = :lockup_tx_id + WHERE id = :id AND lockup_tx_id IS NULL", + named_params! { + ":id": swap_id, + ":lockup_tx_id": lockup_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + match row_count { + 1 => Ok(()), + _ => Err(PaymentError::AlreadyClaimed), + } + } + + pub(crate) fn unset_send_swap_lockup_tx_id( + &self, + swap_id: &str, + lockup_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + con.execute( + "UPDATE send_swaps + SET lockup_tx_id = NULL + WHERE id = :id AND lockup_tx_id = :lockup_tx_id", + named_params! { + ":id": swap_id, + ":lockup_tx_id": lockup_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + Ok(()) + } } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index e7ae49df7..f3173477d 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1081,8 +1081,7 @@ impl LiquidSdk { let create_response = swap.get_boltz_create_response()?; self.try_lockup(&swap, &create_response).await?; - let accept_zero_conf = create_response.accept_zero_conf; - self.wait_for_payment(Swap::Send(swap), accept_zero_conf) + self.wait_for_payment(Swap::Send(swap), create_response.accept_zero_conf) .await .map(|payment| SendPaymentResponse { payment }) } diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index b945c175c..23fbf61ca 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -192,18 +192,31 @@ impl SendSwapHandler { create_response.expected_amount, ) .await?; + let lockup_tx_id = lockup_tx.txid().to_string(); - info!("broadcasting lockup tx {}", lockup_tx.txid()); - let lockup_tx_id = self + self.persister + .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?; + + info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",); + let broadcast_result = self .chain_service .lock() .await .broadcast(&lockup_tx, Some(swap_id)) - .await? - .to_string(); + .await; - info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}"); - Ok(lockup_tx) + match broadcast_result { + Ok(_) => { + info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}"); + Ok(lockup_tx) + } + Err(err) => { + debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}"); + self.persister + .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?; + Err(err.into()) + } + } } /// Transitions a Send swap to a new state