Skip to content

Commit

Permalink
refactor: provide topic in pubsub event (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Nov 22, 2024
1 parent 66f9909 commit 40fc428
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.13.0
- refactor: provide topic in pubsub event. If a topic filter is supplied, the topic will be excluded from the event. [PR 337](https://github.com/dariusc93/rust-ipfs/pull/337)

# 0.12.2
- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR 320](https://github.com/dariusc93/rust-ipfs/pull/320)

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "rust-ipfs"
readme = "README.md"
repository = "https://github.com/dariusc93/rust-ipfs"
description = "IPFS node implementation"
version = "0.12.2"
version = "0.13.0"

[features]
default = []
Expand Down
153 changes: 132 additions & 21 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use clap::Parser;
use futures::{pin_mut, FutureExt};
use futures::FutureExt;
use ipld_core::ipld;
use libp2p::futures::StreamExt;
use libp2p::Multiaddr;
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::{Ipfs, Keypair, PubsubEvent, UninitializedIpfs};
use rust_ipfs::{ConnectionEvents, Ipfs, Keypair, PubsubEvent, UninitializedIpfs};

use parking_lot::Mutex;
use pollable_map::stream::StreamMap;
use rustyline_async::Readline;
use std::time::Duration;
use std::{io::Write, sync::Arc};
Expand Down Expand Up @@ -41,6 +43,8 @@ async fn main() -> anyhow::Result<()> {

let topic = opt.topic.unwrap_or_else(|| String::from("ipfs-chat"));

let main_topic = Arc::new(Mutex::new(topic.clone()));

let keypair = Keypair::generate_ed25519();

let peer_id = keypair.public().to_peer_id();
Expand Down Expand Up @@ -95,6 +99,16 @@ async fn main() -> anyhow::Result<()> {

let mut st = ipfs.connection_events().await?;

let mut main_events = StreamMap::new();

let mut listener_st = StreamMap::new();

let mut main_event_st = ipfs.pubsub_events(None).await?;

let stream = ipfs.pubsub_subscribe(topic.clone()).await?;

listener_st.insert(topic.clone(), stream);

for addr in opt.connect {
let Some(peer_id) = addr.peer_id() else {
writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?;
Expand All @@ -109,41 +123,138 @@ async fn main() -> anyhow::Result<()> {
writeln!(stdout, "Connected to {}", peer_id)?;
}

let mut event_stream = ipfs.pubsub_events(&topic).await?;

let stream = ipfs.pubsub_subscribe(&topic).await?;

pin_mut!(stream);

tokio::spawn(topic_discovery(ipfs.clone(), topic.clone()));
let owned_topic = topic.to_string();
tokio::spawn(topic_discovery(ipfs.clone(), owned_topic));

tokio::task::yield_now().await;

loop {
tokio::select! {
data = stream.next() => {
if let Some(msg) = data {
writeln!(stdout, "{}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
Some((topic, msg)) = listener_st.next() => {
writeln!(stdout, "> {topic}: {}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
}
Some(conn_ev) = st.next() => {
match conn_ev {
ConnectionEvents::IncomingConnection{ peer_id, .. } => {
writeln!(stdout, "> {peer_id} connected")?;
}
ConnectionEvents::OutgoingConnection{ peer_id, .. } => {
writeln!(stdout, "> {peer_id} connected")?;
}
ConnectionEvents::ClosedConnection{ peer_id, .. } => {
writeln!(stdout, "> {peer_id} disconnected")?;
}
}
}
conn_ev = st.next() => {
if let Some(ev) = conn_ev {
writeln!(stdout, "connection event: {ev:?}")?;
Some(event) = main_event_st.next() => {
match event {
PubsubEvent::Subscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
PubsubEvent::Unsubscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
_ => unreachable!(),
}
}
Some(event) = event_stream.next() => {
Some((topic, event)) = main_events.next() => {
match event {
PubsubEvent::Subscribe { peer_id } => writeln!(stdout, "{} subscribed", peer_id)?,
PubsubEvent::Unsubscribe { peer_id } => writeln!(stdout, "{} unsubscribed", peer_id)?,
PubsubEvent::Subscribe { peer_id, topic: None } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
PubsubEvent::Unsubscribe { peer_id, topic: None } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
_ => unreachable!()
}
}
line = rl.readline().fuse() => match line {
Ok(rustyline_async::ReadlineEvent::Line(line)) => {
if let Err(e) = ipfs.pubsub_publish(topic.clone(), line.as_bytes().to_vec()).await {
writeln!(stdout, "Error publishing message: {e}")?;
let line = line.trim();
if !line.starts_with('/') {
if !line.is_empty() {
let topic_to_publish = &*main_topic.lock();
if let Err(e) = ipfs.pubsub_publish(topic_to_publish.clone(), line.as_bytes().to_vec()).await {
writeln!(stdout, "> error publishing message: {e}")?;
continue;
}
writeln!(stdout, "{peer_id}: {line}")?;
}
continue;
}
writeln!(stdout, "{peer_id}: {line}")?;

let mut command = line.split(' ');

match command.next() {
Some("/subscribe") => {
let topic = match command.next() {
Some(topic) => topic.to_string(),
None => {
writeln!(stdout, "> topic must be provided")?;
continue;
}
};
let event_st = ipfs.pubsub_events(topic.clone()).await?;
let Ok(st) = ipfs.pubsub_subscribe(topic.clone()).await else {
writeln!(stdout, "> already subscribed to topic")?;
continue;
};

listener_st.insert(topic.clone(), st);
main_events.insert(topic.clone(), event_st);
writeln!(stdout, "> subscribed to {}", topic)?;
*main_topic.lock() = topic;
continue;
}
Some("/unsubscribe") => {
let topic = match command.next() {
Some(topic) => topic.to_string(),
None => main_topic.lock().clone()
};

listener_st.remove(&topic);
main_events.remove(&topic);

if !ipfs.pubsub_unsubscribe(&topic).await.unwrap_or_default() {
writeln!(stdout, "> unable to unsubscribe from {}", topic)?;
continue;
}

writeln!(stdout, "> unsubscribe from {}", topic)?;
if let Some(some_topic) = main_events.keys().next() {
*main_topic.lock() = some_topic.clone();
writeln!(stdout, "> setting current topic to {}", some_topic)?;
}
continue;
}
Some("/list-topics") => {
let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
if topics.is_empty() {
writeln!(stdout, "> not subscribed to any topics")?;
continue;
}

let current_topic = main_topic.lock().clone();

writeln!(stdout, "> list of topics")?;
for topic in topics {
writeln!(stdout, "\t{topic} {}", if current_topic == topic { "- current" } else { "" } )?;
}
}
Some("/set-current-topic") => {
let topic = match command.next() {
Some(topic) if !topic.is_empty() => topic.to_string(),
None | _ => {
writeln!(stdout, "> topic must be provided")?;
continue;
}
};

let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
if topics.is_empty() || !topics.contains(&topic) {
writeln!(stdout, "> not subscribed to topic \"{topic}\"")?;
continue;
}

*main_topic.lock() = topic.clone();

writeln!(stdout, "> topic set to {topic}")?;
}
_ => continue
}

}
Ok(rustyline_async::ReadlineEvent::Eof) => {
cancel.notify_one();
Expand Down
68 changes: 41 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,19 @@ impl From<DhtMode> for Option<Mode> {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum PubsubEvent {
/// Subscription event to a given topic
Subscribe { peer_id: PeerId },
Subscribe {
peer_id: PeerId,
topic: Option<String>,
},

/// Unsubscribing event to a given topic
Unsubscribe { peer_id: PeerId },
Unsubscribe {
peer_id: PeerId,
topic: Option<String>,
},
}

#[derive(Debug, Clone)]
Expand All @@ -467,15 +473,6 @@ pub(crate) enum InnerPubsubEvent {
Unsubscribe { topic: String, peer_id: PeerId },
}

impl From<InnerPubsubEvent> for PubsubEvent {
fn from(event: InnerPubsubEvent) -> Self {
match event {
InnerPubsubEvent::Subscribe { peer_id, .. } => PubsubEvent::Subscribe { peer_id },
InnerPubsubEvent::Unsubscribe { peer_id, .. } => PubsubEvent::Unsubscribe { peer_id },
}
}
}

type TSwarmEvent<C> = <TSwarm<C> as Stream>::Item;
type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
type TTransportFn = Box<
Expand Down Expand Up @@ -1527,38 +1524,55 @@ impl Ipfs {
.await
}

/// Stream that returns [`PubsubEvent`] for a given topic
/// Stream that returns [`PubsubEvent`] for a given topic. if a topic is not supplied, it will provide all events emitted for any topic.
pub async fn pubsub_events(
&self,
topic: impl Into<String>,
topic: impl Into<Option<String>>,
) -> Result<BoxStream<'static, PubsubEvent>, Error> {
async move {
let topic = topic.into();
let (tx, rx) = oneshot_channel();

self.to_task
.clone()
.send(IpfsEvent::PubsubEventStream(tx))
.await?;

let mut receiver = rx
.await?;
let receiver = rx.await?;

let defined_topic = topic.to_string();
let defined_topic = topic.into();

let stream = async_stream::stream! {
while let Some(event) = receiver.next().await {
match &event {
InnerPubsubEvent::Subscribe { topic, .. } | InnerPubsubEvent::Unsubscribe { topic, .. } if topic.eq(&defined_topic) => yield event.into(),
_ => {}
}
let stream = receiver.filter_map(move |event| {
let defined_topic = defined_topic.clone();
async move {
let ev = match event {
InnerPubsubEvent::Subscribe { topic, peer_id } => {
let topic = match defined_topic {
Some(defined_topic) if defined_topic.eq(&topic) => None,
Some(defined_topic) if defined_topic.ne(&topic) => return None,
Some(_) => return None,
None => Some(topic),
};
PubsubEvent::Subscribe { peer_id, topic }
}
InnerPubsubEvent::Unsubscribe { topic, peer_id } => {
let topic = match defined_topic {
Some(defined_topic) if defined_topic.eq(&topic) => None,
Some(defined_topic) if defined_topic.ne(&topic) => return None,
Some(_) => return None,
None => Some(topic),
};
PubsubEvent::Unsubscribe { peer_id, topic }
}
};

Some(ev)
}
};
});

Ok(stream.boxed())
}
.instrument(self.span.clone())
.await
.instrument(self.span.clone())
.await
}

/// Publishes to the topic which may have been subscribed to earlier
Expand Down
Loading

0 comments on commit 40fc428

Please sign in to comment.