diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index eca2c433..0162c704 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -241,6 +241,49 @@ impl Persister { Ok(()) } + + pub(crate) fn set_send_swap_lockup_tx_id( + &self, + swap_id: &str, + lockup_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + + let row_count = con + .execute( + "UPDATE send_swaps + SET lockup_tx_id = :lockup_tx_id + WHERE id = :id AND lockup_tx_id IS NULL", + named_params! { + ":id": swap_id, + ":lockup_tx_id": lockup_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + match row_count { + 1 => Ok(()), + _ => Err(PaymentError::PaymentInProgress), + } + } + + pub(crate) fn unset_send_swap_lockup_tx_id( + &self, + swap_id: &str, + lockup_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + con.execute( + "UPDATE send_swaps + SET lockup_tx_id = NULL + WHERE id = :id AND lockup_tx_id = :lockup_tx_id", + named_params! { + ":id": swap_id, + ":lockup_tx_id": lockup_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + Ok(()) + } } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index c5e99223..f3876be6 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1006,14 +1006,20 @@ impl LiquidSdk { let swap = match self.persister.fetch_send_swap_by_invoice(invoice)? { Some(swap) => match swap.state { + Created => swap, + TimedOut => { + self.send_swap_handler + .update_swap_info(&swap.id, PaymentState::Created, None, None, None) + .await?; + swap + } Pending => return Err(PaymentError::PaymentInProgress), Complete => return Err(PaymentError::AlreadyPaid), - RefundPending | Failed => { + RefundPending | Refundable | Failed => { return Err(PaymentError::invalid_invoice( "Payment has already failed. Please try with another invoice", )) } - _ => swap, }, None => { let keypair = utils::generate_keypair(); @@ -1072,8 +1078,12 @@ impl LiquidSdk { }; self.status_stream.track_swap_id(&swap.id)?; - let accept_zero_conf = swap.get_boltz_create_response()?.accept_zero_conf; - self.wait_for_payment(Swap::Send(swap), accept_zero_conf) + let create_response = swap.get_boltz_create_response()?; + self.send_swap_handler + .try_lockup(&swap, &create_response) + .await?; + + self.wait_for_payment(Swap::Send(swap), create_response.accept_zero_conf) .await .map(|payment| SendPaymentResponse { payment }) } @@ -1311,8 +1321,9 @@ impl LiquidSdk { webhook, })?; - let swap_id = &create_response.id; - let create_response_json = ChainSwap::from_boltz_struct_to_json(&create_response, swap_id)?; + let create_response_json = + ChainSwap::from_boltz_struct_to_json(&create_response, &create_response.id)?; + let swap_id = create_response.id; let accept_zero_conf = server_lockup_amount_sat <= pair.limits.maximal_zero_conf; let payer_amount_sat = req.prepare_response.total_fees_sat + receiver_amount_sat; @@ -1341,7 +1352,7 @@ impl LiquidSdk { state: PaymentState::Created, }; self.persister.insert_chain_swap(&swap)?; - self.status_stream.track_swap_id(&swap.id)?; + self.status_stream.track_swap_id(&swap_id)?; self.wait_for_payment(Swap::Chain(swap), accept_zero_conf) .await @@ -2665,20 +2676,6 @@ mod tests { assert_eq!(persisted_swap.state, PaymentState::Failed); } - // Verify that `InvoiceSet` correctly sets the state to `Pending` and - // assigns the `lockup_tx_id` to the payment - let persisted_swap = trigger_swap_update!( - "send", - NewSwapArgs::default(), - persister, - status_stream, - SubSwapStates::InvoiceSet, - None, - None - ); - assert_eq!(persisted_swap.state, PaymentState::Pending); - assert!(persisted_swap.lockup_tx_id.is_some()); - // Verify that `TransactionClaimPending` correctly sets the state to `Complete` // and stores the preimage let persisted_swap = trigger_swap_update!( diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index 94cc28d9..29b67060 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -15,13 +15,13 @@ use tokio::sync::{broadcast, Mutex}; use crate::chain::liquid::LiquidChainService; use crate::model::{Config, PaymentState::*, SendSwap}; -use crate::prelude::Swap; +use crate::prelude::{PaymentTxData, PaymentType, Swap}; use crate::swapper::Swapper; use crate::wallet::OnchainWallet; use crate::{ensure_sdk, utils}; use crate::{ error::PaymentError, - model::{PaymentState, PaymentTxData, PaymentType, Transaction as SdkTransaction}, + model::{PaymentState, Transaction as SdkTransaction}, persist::Persister, }; @@ -71,40 +71,9 @@ impl SendSwapHandler { // See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps match SubSwapStates::from_str(swap_state) { - // Boltz has locked the HTLC, we proceed with locking up the funds + // Boltz has locked the HTLC Ok(SubSwapStates::InvoiceSet) => { - match (swap.state, swap.lockup_tx_id.clone()) { - (PaymentState::Created, None) | (PaymentState::TimedOut, None) => { - let create_response = swap.get_boltz_create_response()?; - let lockup_tx = self.lockup_funds(id, &create_response).await?; - let lockup_tx_id = lockup_tx.txid().to_string(); - let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum(); - - // We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while - // This makes the tx known to the SDK (get_info, list_payments) instantly - self.persister.insert_or_update_payment( - PaymentTxData { - tx_id: lockup_tx_id.clone(), - timestamp: Some(utils::now()), - amount_sat: swap.payer_amount_sat, - fees_sat: lockup_tx_fees_sat, - payment_type: PaymentType::Send, - is_confirmed: false, - }, - None, - None, - )?; - - self.update_swap_info(id, Pending, None, Some(&lockup_tx_id), None) - .await?; - } - (_, Some(lockup_tx_id)) => { - warn!("Lockup tx for Send Swap {id} was already broadcast: txid {lockup_tx_id}") - } - (state, _) => { - debug!("Send Swap {id} is in an invalid state for {swap_state}: {state:?}") - } - } + warn!("Received `invoice.set` state for Send Swap {id}"); Ok(()) } @@ -203,11 +172,17 @@ impl SendSwapHandler { } } - async fn lockup_funds( + pub(crate) async fn try_lockup( &self, - swap_id: &str, + swap: &SendSwap, create_response: &CreateSubmarineResponse, ) -> Result { + if swap.lockup_tx_id.is_some() { + debug!("Lockup tx was already broadcast for Send Swap {}", swap.id); + return Err(PaymentError::PaymentInProgress); + } + + let swap_id = &swap.id; debug!( "Initiated Send Swap: send {} sats to liquid address {}", create_response.expected_amount, create_response.address @@ -223,17 +198,48 @@ impl SendSwapHandler { create_response.expected_amount, ) .await?; + let lockup_tx_id = lockup_tx.txid().to_string(); + + self.persister + .set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?; + + info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",); - info!("broadcasting lockup tx {}", lockup_tx.txid()); - let lockup_tx_id = self + let broadcast_result = self .chain_service .lock() .await .broadcast(&lockup_tx, Some(swap_id)) - .await? - .to_string(); + .await; + + if let Err(err) = broadcast_result { + debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}"); + self.persister + .unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?; + return Err(err.into()); + } info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}"); + + // We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while + // This makes the tx known to the SDK (get_info, list_payments) instantly + let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum(); + self.persister.insert_or_update_payment( + PaymentTxData { + tx_id: lockup_tx_id.clone(), + timestamp: Some(utils::now()), + amount_sat: swap.payer_amount_sat, + fees_sat: lockup_tx_fees_sat, + payment_type: PaymentType::Send, + is_confirmed: false, + }, + None, + None, + )?; + + self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None) + .await?; + Ok(lockup_tx) }