Skip to content

Commit

Permalink
Introduce API to subscribe to peer connection events
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Jun 15, 2024
1 parent 6da24b0 commit 33cd67c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
29 changes: 29 additions & 0 deletions aquadoggo/src/api/api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::{bail, Result};
use tokio::sync::mpsc::Receiver;

use crate::api::{migrate, LockFile};
use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;

#[derive(Debug, Clone)]
pub enum NodeEvent {
PeerConnected,
PeerDisconnected,
}

/// Interface to interact with the node in a programmatic, "low-level" way.
#[derive(Debug)]
pub struct NodeInterface {
Expand Down Expand Up @@ -42,4 +49,26 @@ impl NodeInterface {

Ok(did_migration_happen)
}

pub async fn subscribe(&self) -> Receiver<NodeEvent> {
let mut rx = self.tx.subscribe();
let (events_tx, events_rx) = tokio::sync::mpsc::channel::<NodeEvent>(256);

tokio::task::spawn(async move {
loop {
match rx.recv().await {
Ok(ServiceMessage::PeerConnected(_)) => {
let _ = events_tx.send(NodeEvent::PeerConnected).await;
}
Ok(ServiceMessage::PeerDisconnected(_)) => {
let _ = events_tx.send(NodeEvent::PeerDisconnected).await;
}
Ok(_) => continue,
Err(_) => break,
}
}
});

events_rx
}
}
2 changes: 1 addition & 1 deletion aquadoggo/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod config_file;
mod lock_file;
mod migration;

pub use api::NodeInterface;
pub use api::{NodeEvent, NodeInterface};
pub use config_file::ConfigFile;
pub use lock_file::LockFile;
pub use migration::migrate;
9 changes: 8 additions & 1 deletion aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use anyhow::Result;
use p2panda_rs::identity::KeyPair;
use tokio::sync::mpsc::Receiver;

use crate::api::NodeInterface;
use crate::api::{NodeEvent, NodeInterface};
use crate::bus::ServiceMessage;
use crate::config::Configuration;
use crate::context::Context;
Expand Down Expand Up @@ -131,4 +132,10 @@ impl Node {
pub async fn migrate(&self, lock_file: LockFile) -> Result<bool> {
self.api.migrate(lock_file).await
}

/// Subscribe to channel reporting on significant node events which can be interesting for
/// clients, for example when peers connect or disconnect.
pub async fn subscribe(&self) -> Receiver<NodeEvent> {
self.api.subscribe().await
}
}

0 comments on commit 33cd67c

Please sign in to comment.