Skip to content

Commit

Permalink
Merge pull request #301 from contrun/panic-identical-graph-syncer-name
Browse files Browse the repository at this point in the history
Fix panic because of identical graph syncer name
  • Loading branch information
quake authored Nov 13, 2024
2 parents 34e8b35 + 5d80f6d commit 185be24
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 26 deletions.
68 changes: 45 additions & 23 deletions src/fiber/graph_syncer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This is the main module for the graph syncer. It is responsible for
//! syncing the graph with one specific peer.

use ractor::{async_trait as rasync_trait, call, Actor, ActorProcessingErr, ActorRef};
use ractor::{async_trait as rasync_trait, call, Actor, ActorCell, ActorProcessingErr, ActorRef};
use tentacle::secio::PeerId;
use tracing::{debug, error};

Expand All @@ -23,7 +23,6 @@ const ASSUME_MAX_MESSAGE_TIMESTAMP_GAP: u64 = 1000 * 3600 * 12;

#[derive(Debug)]
pub enum GraphSyncerMessage {
PeerDisConnected,
// The u64 is the starting height of the channels we want to sync.
// The ending height is left to the syncer actor to decide.
GetChannels(u64),
Expand All @@ -32,12 +31,10 @@ pub enum GraphSyncerMessage {
GetBroadcastMessages(u64),
}

pub struct GraphSyncerState {}

impl GraphSyncerState {
fn new() -> Self {
Self {}
}
#[derive(Default)]
pub struct GraphSyncerState {
// The exit status of the graph syncer, default to Failed.
exit_status: GraphSyncerExitStatus,
}

pub struct GraphSyncer {
Expand Down Expand Up @@ -72,13 +69,14 @@ impl GraphSyncer {
}

impl GraphSyncer {
fn tell_network_we_want_to_exit(&self, status: GraphSyncerExitStatus) {
let peer_id = self.peer_id.clone();
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::GraphSyncerExited(peer_id, status),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
fn exit_with_status(
&self,
actor: ActorCell,
state: &mut GraphSyncerState,
status: GraphSyncerExitStatus,
) {
state.exit_status = status;
actor.stop(Some(format!("Actively exiting with status {:?}", status)));
}
}

Expand Down Expand Up @@ -107,20 +105,17 @@ impl Actor for GraphSyncer {
.into());
}
myself.send_message(GraphSyncerMessage::GetChannels(starting_height))?;
Ok(Self::State::new())
Ok(Self::State::default())
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
debug!("Graph syncer handling message {:?}", &message);
match message {
GraphSyncerMessage::PeerDisConnected => {
self.tell_network_we_want_to_exit(GraphSyncerExitStatus::Failed);
}
GraphSyncerMessage::GetChannels(starting_height) => {
if starting_height > self.ending_height {
panic!("Starting height to high (starting height {}, ending height {}), should have exited syncing earlier", starting_height, self.ending_height);
Expand Down Expand Up @@ -153,7 +148,11 @@ impl Actor for GraphSyncer {
}
Err(e) => {
error!("Failed to get channels from peer: {:?}", e);
self.tell_network_we_want_to_exit(GraphSyncerExitStatus::Failed);
self.exit_with_status(
myself.get_cell(),
state,
GraphSyncerExitStatus::Failed,
);
}
}
}
Expand All @@ -174,7 +173,11 @@ impl Actor for GraphSyncer {
debug!("Get broadcast messages from peer successfully.");
if next_time > self.ending_time {
debug!("Graph syncer finished syncing with peer.");
self.tell_network_we_want_to_exit(GraphSyncerExitStatus::Succeeded);
self.exit_with_status(
myself.get_cell(),
state,
GraphSyncerExitStatus::Succeeded,
);
} else {
myself.send_message(GraphSyncerMessage::GetBroadcastMessages(
next_time,
Expand All @@ -183,11 +186,30 @@ impl Actor for GraphSyncer {
}
Err(e) => {
error!("Failed to get broadcast messages from peer: {:?}", e);
self.tell_network_we_want_to_exit(GraphSyncerExitStatus::Failed);
self.exit_with_status(
myself.get_cell(),
state,
GraphSyncerExitStatus::Failed,
);
}
}
}
}
Ok(())
}

async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
let peer_id = self.peer_id.clone();
debug!("Graph syncer to {:?} stopped", &peer_id);
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::GraphSyncerExited(peer_id, state.exit_status),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Ok(())
}
}
16 changes: 13 additions & 3 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,12 @@ pub enum GraphSyncerExitStatus {
Failed,
}

impl Default for GraphSyncerExitStatus {
fn default() -> Self {
Self::Failed
}
}

#[derive(Debug)]
pub enum NetworkActorMessage {
Command(NetworkActorCommand),
Expand Down Expand Up @@ -2445,7 +2451,11 @@ impl NetworkSyncState {

if should_create {
let graph_syncer = Actor::spawn_linked(
Some(format!("Graph syncer to {}", peer_id)),
Some(format!(
"Graph syncer to {} started at {:?}",
peer_id,
SystemTime::now()
)),
GraphSyncer::new(
network.clone(),
peer_id.clone(),
Expand Down Expand Up @@ -3306,7 +3316,7 @@ where

debug!("Reestablishing channel {:x}", &channel_id);
let (channel, _) = Actor::spawn_linked(
None,
Some(generate_channel_actor_name(&self.peer_id, peer_id)),
ChannelActor::new(
self.get_public_key(),
remote_pubkey,
Expand Down Expand Up @@ -3440,7 +3450,7 @@ where
fn maybe_tell_syncer_peer_disconnected(&self, peer_id: &PeerId) {
if let NetworkSyncStatus::Running(ref state) = self.sync_status {
if let Some(syncer) = state.get_graph_syncer(peer_id) {
let _ = syncer.send_message(GraphSyncerMessage::PeerDisConnected);
syncer.stop(Some("Peer disconnected".to_string()));
}
}
}
Expand Down

0 comments on commit 185be24

Please sign in to comment.