Skip to content

Commit

Permalink
Wait for closing transaction before deleting channel
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jun 18, 2024
1 parent db66051 commit ab86b66
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 82 deletions.
18 changes: 12 additions & 6 deletions src/ckb/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub struct ChannelActor<S> {
keep_on_closed: bool,
}

impl<S> ChannelActor<S> {
impl<S: ChannelActorStateStore> ChannelActor<S> {
pub fn new(
peer_id: PeerId,
network: ActorRef<NetworkActorMessage>,
Expand Down Expand Up @@ -824,6 +824,12 @@ impl<S> ChannelActor<S> {
ChannelEvent::PeerDisconnected => {
myself.stop(Some("PeerDisconnected".to_string()));
}
ChannelEvent::ClosingTransactionConfirmed => {
myself.stop(Some("ChannelClosed".to_string()));
if !self.keep_on_closed {
self.store.delete_channel_actor_state(&state.get_id());
}
}
}
Ok(())
}
Expand Down Expand Up @@ -1088,10 +1094,9 @@ where
}
match state.state {
ChannelState::Closed => {
myself.stop(Some("ChannelClosed".to_string()));
if self.keep_on_closed {
self.store.delete_channel_actor_state(&state.get_id());
}
debug!(
"The channel is closed, waiting for the closing transaction to be confirmed."
);
}
_ => {
self.store.insert_channel_actor_state(state.clone());
Expand Down Expand Up @@ -1332,8 +1337,9 @@ pub struct ClosedChannel {}

#[derive(Debug)]
pub enum ChannelEvent {
FundingTransactionConfirmed,
PeerDisconnected,
FundingTransactionConfirmed,
ClosingTransactionConfirmed,
}

pub type ProcessingChannelResult = Result<(), ProcessingChannelError>;
Expand Down
194 changes: 118 additions & 76 deletions src/ckb/network.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use ckb_jsonrpc_types::Status;
use ckb_types::core::TransactionView;
use ckb_types::packed::{OutPoint, Script, Transaction};
use ckb_types::packed::{Byte32, OutPoint, Script, Transaction};
use ckb_types::prelude::{IntoTransactionView, Pack, Unpack};
use log::{debug, error, info, warn};

use ractor::{
async_trait as rasync_trait, call_t, Actor, ActorCell, ActorProcessingErr, ActorRef,
async_trait as rasync_trait, call_t, Actor, ActorCell, ActorProcessingErr, ActorRef, RactorErr,
RpcReplyPort, SupervisionEvent,
};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -199,6 +199,12 @@ pub enum NetworkActorEvent {
/// A commitment transaction is signed by us and has sent to the other party.
LocalCommitmentSigned(PeerId, Hash256, u64, TransactionView, Vec<u8>),

/// A closing transaction has been confirmed.
ClosingTransactionConfirmed(PeerId, Hash256, Byte32),

/// A closing transaction has failed (either because of invalid transaction or timeout)
ClosingTransactionFailed(PeerId, Hash256, Byte32),

/// Network service events to be sent to outside observers.
/// These events may be both present at `NetworkActorEvent` and
/// this branch of `NetworkActorEvent`. This is because some events
Expand Down Expand Up @@ -431,28 +437,7 @@ where
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
NetworkActorEvent::ChannelClosed(channel_id, peer_id, tx) => {
state.on_channel_closed(&channel_id, &peer_id);
info!(
"Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.",
channel_id, peer_id, tx
);
call_t!(
self.chain_actor,
CkbChainMessage::SendTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
tx.clone()
)
.expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW)
.expect("valid closing tx");

// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed(
peer_id, channel_id, tx,
)),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
state.on_channel_closed(channel_id, peer_id, tx).await;
}
NetworkActorEvent::PeerMessage(peer_id, session, message) => {
self.handle_peer_message(state, peer_id, session, message)
Expand All @@ -470,8 +455,27 @@ where
NetworkActorEvent::FundingTransactionConfirmed(outpoint) => {
state.on_funding_transaction_confirmed(outpoint).await;
}
NetworkActorEvent::FundingTransactionFailed(_outpoint) => {
unimplemented!("handling funding transaction failed");
NetworkActorEvent::FundingTransactionFailed(outpoint) => {
error!("Funding transaction failed: {:?}", outpoint);
}
NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, _tx_hash) => {
// TODO: We should remove the channel from the session_channels_map.
state.channels.remove(&channel_id);
if let Some(session) = state.get_peer_session(&peer_id) {
state.session_channels_map.get_mut(&session).map(|set| {
set.remove(&channel_id);
});
}
state.send_message_to_channel_actor(
channel_id,
ChannelActorMessage::Event(ChannelEvent::ClosingTransactionConfirmed),
)
}
NetworkActorEvent::ClosingTransactionFailed(peer_id, tx_hash, channel_id) => {
error!(
"Closing transaction failed for channel {:?}, tx hash: {:?}, peer id: {:?}",
&channel_id, &tx_hash, &peer_id
);
}
NetworkActorEvent::LocalCommitmentSigned(
peer_id,
Expand Down Expand Up @@ -842,6 +846,44 @@ impl NetworkActorState {
Ok((channel, temp_channel_id, new_id))
}

async fn broadcast_tx_with_callback<F, R>(&self, transaction: TransactionView, callback: F)
where
F: Send + 'static + FnOnce(Result<Status, RactorErr<CkbChainMessage>>) -> R,
{
debug!("Trying to broadcast transaction {:?}", &transaction);
let chain = self.chain_actor.clone();
call_t!(
&chain,
CkbChainMessage::SendTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
transaction.clone()
)
.expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW)
.expect("valid tx to broadcast");

let tx_hash = transaction.hash();
info!("Transactoin sent to the network: {}", tx_hash);

// TODO: make number of confirmation to transaction configurable.
const NUM_CONFIRMATIONS: u64 = 4;
let request = TraceTxRequest {
tx_hash: tx_hash.clone().into(),
confirmations: NUM_CONFIRMATIONS,
};

// Spawn a new task to avoid blocking current actor message processing.
ractor::concurrency::tokio_primatives::spawn(async move {
debug!("Tracing transaction status {:?}", &request.tx_hash);
let result = call_t!(
chain,
CkbChainMessage::TraceTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
request.clone()
);
callback(result);
});
}

fn get_peer_session(&self, peer_id: &PeerId) -> Option<SessionId> {
self.peer_session_map.get(peer_id).cloned()
}
Expand Down Expand Up @@ -942,13 +984,45 @@ impl NetworkActorState {
}
}

fn on_channel_closed(&mut self, id: &Hash256, peer_id: &PeerId) {
self.channels.remove(&id);
if let Some(session) = self.get_peer_session(peer_id) {
self.session_channels_map.get_mut(&session).map(|set| {
set.remove(&id);
});
}
async fn on_channel_closed(
&mut self,
channel_id: Hash256,
peer_id: PeerId,
transaction: TransactionView,
) {
let tx_hash: Byte32 = transaction.hash();
info!(
"Channel ({:?}) to peer {:?} is closed. Broadcasting closing transaction ({:?}) now.",
&channel_id, &peer_id, &tx_hash
);
let network: ActorRef<NetworkActorMessage> = self.network.clone();
self.broadcast_tx_with_callback(transaction, move |result| {
let message = match result {
Ok(status) if status == Status::Committed => {
info!("Cloisng transaction {:?} confirmed", &tx_hash);
NetworkActorEvent::ClosingTransactionConfirmed(
peer_id,
channel_id,
tx_hash.into(),
)
}
Ok(status) => {
error!(
"Closing transaction {:?} failed to be confirmed with final status {:?}",
&tx_hash, &status
);
NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash.into())
}
Err(err) => {
error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err);
NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash.into())
}
};
network
.send_message(NetworkActorMessage::new_event(message))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
})
.await;
}

pub async fn on_open_channel_msg(
Expand Down Expand Up @@ -1008,61 +1082,28 @@ impl NetworkActorState {
}
self.pending_channels.insert(outpoint.clone(), channel_id);
// TODO: try to broadcast the transaction to the network.
let transaction = transaction.into_view();
let tx_hash = transaction.hash();
debug!(
"Funding transaction (outpoint {:?}) for channel {:?} is now ready. We can broadcast transaction {:?} now.",
&outpoint, &channel_id, &transaction
"Funding transaction (outpoint {:?}) for channel {:?} is now ready. Broadcast it {:?} now.",
&outpoint, &channel_id, &tx_hash
);
let transaction = transaction.into_view();
debug!("Trying to broadcast funding transaction {:?}", &transaction);

call_t!(
self.chain_actor,
CkbChainMessage::SendTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
transaction.clone()
)
.expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW)
.expect("valid funding tx");

let hash = transaction.hash().into();

info!("Funding transactoin sent to the network: {}", hash);

// Trace the transaction status.

// TODO: make number of confirmation to transaction configurable.
const NUM_CONFIRMATIONS: u64 = 4;
let request = TraceTxRequest {
tx_hash: hash,
confirmations: NUM_CONFIRMATIONS,
};
let chain = self.chain_actor.clone();
let network = self.network.clone();
// Spawn a new task to avoid blocking current actor message processing.
ractor::concurrency::tokio_primatives::spawn(async move {
debug!("Tracing transaction status {:?}", &request.tx_hash);
let message = match call_t!(
chain,
CkbChainMessage::TraceTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
request.clone()
) {
self.broadcast_tx_with_callback(transaction, move |result| {
let message = match result {
Ok(status) if status == Status::Committed => {
info!("Funding transaction {:?} confirmed", &request.tx_hash,);
info!("Funding transaction {:?} confirmed", &tx_hash);
NetworkActorEvent::FundingTransactionConfirmed(outpoint)
}
Ok(status) => {
error!(
"Funding transaction {:?} failed to be confirmed with final status {:?}",
&request.tx_hash, &status
&tx_hash, &status
);
NetworkActorEvent::FundingTransactionFailed(outpoint)
}
Err(err) => {
error!(
"Failed to trace transaction {:?}: {:?}",
&request.tx_hash, &err
);
error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err);
NetworkActorEvent::FundingTransactionFailed(outpoint)
}
};
Expand All @@ -1071,7 +1112,8 @@ impl NetworkActorState {
network
.send_message(NetworkActorMessage::new_event(message))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
});
})
.await;
}

async fn on_funding_transaction_confirmed(&mut self, outpoint: OutPoint) {
Expand Down

0 comments on commit ab86b66

Please sign in to comment.