diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index e95c5780..98552f31 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -25,11 +25,7 @@ use tentacle::secio::PeerId; use thiserror::Error; use tokio::sync::oneshot; -use std::{ - borrow::Borrow, - collections::{hash_map, HashMap}, - fmt::Debug, -}; +use std::{borrow::Borrow, collections::BTreeMap}; use crate::{ ckb::types::Shutdown, @@ -290,12 +286,6 @@ impl ChannelActor { } CFNMessage::RevokeAndAck(revoke_and_ack) => { state.handle_revoke_and_ack_message(revoke_and_ack)?; - if let ChannelState::ChannelReady(flags) = state.state { - if flags.is_empty() { - self.handle_commitment_signed_command(state)?; - } - } - state.update_state(ChannelState::ChannelReady(ChannelReadyFlags::empty())); Ok(()) } CFNMessage::ChannelReady(channel_ready) => { @@ -333,32 +323,8 @@ impl ChannelActor { CFNMessage::AddTlc(add_tlc) => { state.check_state_for_tlc_update()?; - let tlc = state.create_inbounding_tlc(add_tlc); - let id = tlc.id; - - match state.pending_received_tlcs.get(&id) { - Some(current) if current == &tlc => { - debug!( - "Repeated processing of AddTlcCommand with id {:?}: current tlc {:?}", - id, current, - ); - return Ok(()); - } - Some(current) => { - return Err(ProcessingChannelError::RepeatedProcessing(format!( - "Repeated processing of AddTlcCommand with id {:?}: current tlc {:?}, tlc to be inserted {:?}", - id, - current, - &tlc - ))); - } - None => { - debug!("Adding tlc {:?} to channel {:?}", &tlc, state.get_id()); - } - } - - state.pending_received_tlcs.insert(tlc.id, tlc); - state.to_remote_amount -= tlc.amount; + let tlc = state.create_inbounding_tlc(add_tlc)?; + state.insert_tlc(tlc)?; // TODO: here we didn't send any ack message to the peer. // The peer may falsely believe that we have already processed this message, @@ -369,40 +335,13 @@ impl ChannelActor { CFNMessage::RemoveTlc(remove_tlc) => { state.check_state_for_tlc_update()?; - match state.pending_offered_tlcs.entry(remove_tlc.tlc_id) { - hash_map::Entry::Occupied(entry) => { - let current = entry.get(); - match remove_tlc.reason { - RemoveTlcReason::RemoveTlcFail(_fail) => { - state.to_local_amount += current.amount; - } - RemoveTlcReason::RemoveTlcFulfill(fulfill) => { - let preimage = fulfill.payment_preimage; - if current.payment_hash != blake2b_256(preimage).into() { - return Err(ProcessingChannelError::InvalidParameter( - "Payment preimage does not match the hash".to_string(), - )); - } - state.to_remote_amount += current.amount; - } - } - entry.remove(); - if state.pending_offered_tlcs.is_empty() { - state.maybe_transition_to_shutdown(&self.network)?; - } - Ok(()) - } - hash_map::Entry::Vacant(_) => { - Err(ProcessingChannelError::InvalidParameter(format!( - "TLC with id {:?} not found in pending_received_tlcs", - remove_tlc.tlc_id - ))) - } - } + state + .remove_tlc_with_reason(TLCId::Offered(remove_tlc.tlc_id), remove_tlc.reason)?; + Ok(()) } CFNMessage::Shutdown(shutdown) => { let flags = match state.state { - ChannelState::ChannelReady(_) => ShuttingDownFlags::empty(), + ChannelState::ChannelReady() => ShuttingDownFlags::empty(), ChannelState::ShuttingDown(flags) if flags.contains(ShuttingDownFlags::THEIR_SHUTDOWN_SENT) => { @@ -494,15 +433,7 @@ impl ChannelActor { ); CommitmentSignedFlags::SigningCommitment(flags) } - ChannelState::ChannelReady(flags) - if flags.contains(ChannelReadyFlags::AWAITING_REMOTE_REVOKE) => - { - return Err(ProcessingChannelError::InvalidState(format!( - "Unable to process commitment_signed command in state {:?}, which requires the remote to send a revoke message first.", - &state.state - ))); - } - ChannelState::ChannelReady(flags) => CommitmentSignedFlags::ChannelReady(flags), + ChannelState::ChannelReady() => CommitmentSignedFlags::ChannelReady(), ChannelState::ShuttingDown(flags) => { if flags.contains(ShuttingDownFlags::AWAITING_PENDING_TLCS) { debug!( @@ -525,15 +456,20 @@ impl ChannelActor { } }; + debug!( + "Building and signing commitment tx for state {:?}", + &state.state + ); let PartiallySignedCommitmentTransaction { tx, signature, + witnesses, msg: _, - version: _, + version, } = state.build_and_sign_commitment_tx()?; debug!( - "Built and signed a commitment tx ({:?}) with partial signature {:?}", - &tx, &signature + "Built and signed commitment tx #{}: transaction: ({:?}), partial signature: {:?}", + version, &tx, &signature ); let commitment_signed = CommitmentSigned { @@ -553,17 +489,29 @@ impl ChannelActor { }), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); + self.network + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::LocalCommitmentSigned( + state.peer_id.clone(), + state.get_id(), + version, + tx.clone(), + witnesses, + )), + )) + .expect("myself alive"); match flags { CommitmentSignedFlags::SigningCommitment(flags) => { let flags = flags | SigningCommitmentFlags::OUR_COMMITMENT_SIGNED_SENT; + // Normally commitment number will be incremented after received a RevokeAndAck message. + // But here channel has not been etablished yet, so we will not receive RevokeAndAck message. + // We increment the commitment number here instead. + state.increment_local_commitment_number(); state.update_state(ChannelState::SigningCommitment(flags)); state.maybe_transition_to_tx_signatures(flags, &self.network)?; } - CommitmentSignedFlags::ChannelReady(flags) => { - let flags = flags | ChannelReadyFlags::AWAITING_REMOTE_REVOKE; - state.update_state(ChannelState::ChannelReady(flags)); - } + CommitmentSignedFlags::ChannelReady() => {} CommitmentSignedFlags::PendingShutdown(_) => { state.maybe_transition_to_shutdown(&self.network)?; } @@ -579,7 +527,9 @@ impl ChannelActor { state.check_state_for_tlc_update()?; let tlc = state.create_outbounding_tlc(command); + state.insert_tlc(tlc)?; + debug!("Inserted tlc into channel state: {:?}", &tlc); // TODO: Note that since message sending is async, // we can't guarantee anything about the order of message sending // and state updating. And any of these may fail while the other succeeds. @@ -591,39 +541,20 @@ impl ChannelActor { peer_id: self.peer_id.clone(), message: CFNMessage::AddTlc(AddTlc { channel_id: state.get_id(), - tlc_id: tlc.id, + tlc_id: tlc.id.into(), amount: tlc.amount, payment_hash: tlc.payment_hash, expiry: tlc.lock_time, }), }; + debug!("Sending AddTlc message: {:?}", &msg); self.network .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendCFNMessage(msg), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); - // Update the state for this tlc. - state.pending_offered_tlcs.insert(tlc.id, tlc); - state.to_local_amount -= tlc.amount; - state.next_offering_tlc_id += 1; - debug!( - "Added tlc with id {:?} to pending offered tlcs: {:?}", - tlc.id, &tlc - ); - debug!( - "Current pending offered tlcs: {:?}", - &state.pending_offered_tlcs - ); - debug!( - "Current pending received tlcs: {:?}", - &state.pending_received_tlcs - ); - debug!( - "Balance after addtlccommand: to_local_amount: {} to_remote_amount: {}", - state.to_local_amount, state.to_remote_amount - ); - Ok(tlc.id) + Ok(tlc.id.into()) } pub fn handle_remove_tlc_command( @@ -632,51 +563,27 @@ impl ChannelActor { command: RemoveTlcCommand, ) -> ProcessingChannelResult { state.check_state_for_tlc_update()?; - // Notes: state updating and message sending are not atomic. - match state.pending_received_tlcs.remove(&command.id) { - Some(tlc) => { - let msg = CFNMessageWithPeerId { - peer_id: self.peer_id.clone(), - message: CFNMessage::RemoveTlc(RemoveTlc { - channel_id: state.get_id(), - tlc_id: tlc.id, - reason: command.reason, - }), - }; - if let RemoveTlcReason::RemoveTlcFulfill(fulfill) = command.reason { - let filled_payment_hash: Hash256 = blake2b_256(fulfill.payment_preimage).into(); - if tlc.payment_hash != filled_payment_hash { - state.pending_received_tlcs.insert(tlc.id, tlc); - return Err(ProcessingChannelError::InvalidParameter(format!( - "Preimage {:?} is hashed to {}, which does not match payment hash {:?}", - fulfill.payment_preimage, filled_payment_hash, tlc.payment_hash, - ))); - } - } - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendCFNMessage(msg), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - match command.reason { - RemoveTlcReason::RemoveTlcFail(_) => { - state.to_remote_amount += tlc.amount; - } - RemoveTlcReason::RemoveTlcFulfill(_) => { - state.to_local_amount += tlc.amount; - } - } - } - None => { - return Err(ProcessingChannelError::InvalidParameter(format!( - "Trying to remove tlc with id {:?} that is not in pending received tlcs", - command.id - ))); - } - } + let tlc = state.remove_tlc_with_reason(TLCId::Received(command.id), command.reason)?; + let msg = CFNMessageWithPeerId { + peer_id: self.peer_id.clone(), + message: CFNMessage::RemoveTlc(RemoveTlc { + channel_id: state.get_id(), + tlc_id: command.id, + reason: command.reason, + }), + }; + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SendCFNMessage(msg), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + debug!( - "Balance after removetlccommand: to_local_amount: {} to_remote_amount: {}", - state.to_local_amount, state.to_remote_amount + "Channel ({:?}) balance after removing tlc {:?}: local balance: {}, remote balance: {}", + state.get_id(), + tlc, + state.to_local_amount, + state.to_remote_amount ); state.maybe_transition_to_shutdown(&self.network)?; @@ -690,7 +597,7 @@ impl ChannelActor { ) -> ProcessingChannelResult { debug!("Handling shutdown command: {:?}", &command); let flags = match state.state { - ChannelState::ChannelReady(_) => { + ChannelState::ChannelReady() => { debug!("Handling shutdown command in ChannelReady state"); ShuttingDownFlags::empty() } @@ -977,7 +884,7 @@ where *second_per_commitment_point, ); - let commitment_number = 0; + let commitment_number = INITIAL_COMMITMENT_NUMBER; let accept_channel = AcceptChannel { channel_id: *channel_id, @@ -1045,7 +952,7 @@ where LockTime::new(DEFAULT_TO_LOCAL_DELAY_BLOCKS), ); - let commitment_number = 0; + let commitment_number = INITIAL_COMMITMENT_NUMBER; let message = CFNMessage::OpenChannel(OpenChannel { chain_hash: Hash256::default(), channel_id: channel.get_id(), @@ -1202,6 +1109,126 @@ impl From for FundingTxInput { } } +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct CommitmentNumbers { + pub local: u64, + pub remote: u64, +} + +impl Default for CommitmentNumbers { + fn default() -> Self { + Self::new() + } +} + +impl CommitmentNumbers { + pub fn new() -> Self { + Self { + local: INITIAL_COMMITMENT_NUMBER, + remote: INITIAL_COMMITMENT_NUMBER, + } + } + + pub fn get_local(&self) -> u64 { + self.local + } + + pub fn get_remote(&self) -> u64 { + self.remote + } + + pub fn increment_local(&mut self) { + self.local += 1; + } + + pub fn increment_remote(&mut self) { + self.remote += 1; + } + + pub fn flip(&self) -> Self { + Self { + local: self.remote, + remote: self.local, + } + } +} + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub struct TLCIds { + pub offering: u64, + pub received: u64, +} + +impl Default for TLCIds { + fn default() -> Self { + Self::new() + } +} + +impl TLCIds { + pub fn new() -> Self { + Self { + offering: 0, + received: 0, + } + } + + pub fn get_next_offering(&self) -> u64 { + self.offering + } + + pub fn get_next_received(&self) -> u64 { + self.received + } + + pub fn increment_offering(&mut self) { + self.offering += 1; + } + + pub fn increment_received(&mut self) { + self.received += 1; + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)] +pub enum TLCId { + Offered(u64), + Received(u64), +} + +impl From for u64 { + fn from(id: TLCId) -> u64 { + match id { + TLCId::Offered(id) => id, + TLCId::Received(id) => id, + } + } +} + +impl TLCId { + pub fn is_offered(&self) -> bool { + match self { + TLCId::Offered(_) => true, + _ => false, + } + } + + pub fn is_received(&self) -> bool { + !self.is_offered() + } + + pub fn flip(&self) -> Self { + match self { + TLCId::Offered(id) => TLCId::Received(*id), + TLCId::Received(id) => TLCId::Offered(*id), + } + } + + pub fn flip_mut(&mut self) { + *self = self.flip(); + } +} + #[serde_as] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelActorState { @@ -1219,12 +1246,29 @@ pub struct ChannelActorState { // An inbound channel is one where the counterparty is the funder of the channel. pub is_acceptor: bool, + // TODO: consider transaction fee while building the commitment transaction. + + // The invariant here is that the sum of `to_local_amount` and `to_remote_amount` + // should be equal to the total amount of the channel. + // The changes of both `to_local_amount` and `to_remote_amount` + // will always happen after a revoke_and_ack message is sent/received. + // This means that while calculating the amounts for commitment transactions, + // processing add_tlc command and messages, we need to take into account that + // the amounts are not decremented/incremented yet. + + // The amount of CKB/UDT that we own in the channel. + // This value will only change after we have resolved a tlc. pub to_local_amount: u128, + // The amount of CKB/UDT that the remote owns in the channel. + // This value will only change after we have resolved a tlc. pub to_remote_amount: u128, // only used for UDT scenario: - // `to_local_amount` and `to_remote_amount` are the amount of UDT - // we need to keep track of the CKB amount from partners in the channel. + // `to_local_amount` and `to_remote_amount` are the amount of UDT, + // while `local_ckb_amount` and `remote_ckb_amount` are the amount of CKB + // of the underlying cells that bear the UDT. + // We keep track of the CKB amount from partners in the channel in + // order to construct closing/commitment transactions. pub local_ckb_amount: u64, pub remote_ckb_amount: u64, @@ -1238,23 +1282,20 @@ pub struct ChannelActorState { pub local_shutdown_script: Option