Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: provide topic in pubsub event #337

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.13.0
- refactor: provide topic in pubsub event. If a topic filter is supplied, the topic will be excluded from the event. [PR 337](https://github.com/dariusc93/rust-ipfs/pull/337)

# 0.12.2
- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR 320](https://github.com/dariusc93/rust-ipfs/pull/320)

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "rust-ipfs"
readme = "README.md"
repository = "https://github.com/dariusc93/rust-ipfs"
description = "IPFS node implementation"
version = "0.12.2"
version = "0.13.0"

[features]
default = []
Expand Down
153 changes: 132 additions & 21 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use clap::Parser;
use futures::{pin_mut, FutureExt};
use futures::FutureExt;
use ipld_core::ipld;
use libp2p::futures::StreamExt;
use libp2p::Multiaddr;
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::{Ipfs, Keypair, PubsubEvent, UninitializedIpfs};
use rust_ipfs::{ConnectionEvents, Ipfs, Keypair, PubsubEvent, UninitializedIpfs};

use parking_lot::Mutex;
use pollable_map::stream::StreamMap;
use rustyline_async::Readline;
use std::time::Duration;
use std::{io::Write, sync::Arc};
Expand Down Expand Up @@ -41,6 +43,8 @@ async fn main() -> anyhow::Result<()> {

let topic = opt.topic.unwrap_or_else(|| String::from("ipfs-chat"));

let main_topic = Arc::new(Mutex::new(topic.clone()));

let keypair = Keypair::generate_ed25519();

let peer_id = keypair.public().to_peer_id();
Expand Down Expand Up @@ -95,6 +99,16 @@ async fn main() -> anyhow::Result<()> {

let mut st = ipfs.connection_events().await?;

let mut main_events = StreamMap::new();

let mut listener_st = StreamMap::new();

let mut main_event_st = ipfs.pubsub_events(None).await?;

let stream = ipfs.pubsub_subscribe(topic.clone()).await?;

listener_st.insert(topic.clone(), stream);

for addr in opt.connect {
let Some(peer_id) = addr.peer_id() else {
writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?;
Expand All @@ -109,41 +123,138 @@ async fn main() -> anyhow::Result<()> {
writeln!(stdout, "Connected to {}", peer_id)?;
}

let mut event_stream = ipfs.pubsub_events(&topic).await?;

let stream = ipfs.pubsub_subscribe(&topic).await?;

pin_mut!(stream);

tokio::spawn(topic_discovery(ipfs.clone(), topic.clone()));
let owned_topic = topic.to_string();
tokio::spawn(topic_discovery(ipfs.clone(), owned_topic));

tokio::task::yield_now().await;

loop {
tokio::select! {
data = stream.next() => {
if let Some(msg) = data {
writeln!(stdout, "{}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
Some((topic, msg)) = listener_st.next() => {
writeln!(stdout, "> {topic}: {}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
}
Some(conn_ev) = st.next() => {
match conn_ev {
ConnectionEvents::IncomingConnection{ peer_id, .. } => {
writeln!(stdout, "> {peer_id} connected")?;
}
ConnectionEvents::OutgoingConnection{ peer_id, .. } => {
writeln!(stdout, "> {peer_id} connected")?;
}
ConnectionEvents::ClosedConnection{ peer_id, .. } => {
writeln!(stdout, "> {peer_id} disconnected")?;
}
}
}
conn_ev = st.next() => {
if let Some(ev) = conn_ev {
writeln!(stdout, "connection event: {ev:?}")?;
Some(event) = main_event_st.next() => {
match event {
PubsubEvent::Subscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
PubsubEvent::Unsubscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
_ => unreachable!(),
}
}
Some(event) = event_stream.next() => {
Some((topic, event)) = main_events.next() => {
match event {
PubsubEvent::Subscribe { peer_id } => writeln!(stdout, "{} subscribed", peer_id)?,
PubsubEvent::Unsubscribe { peer_id } => writeln!(stdout, "{} unsubscribed", peer_id)?,
PubsubEvent::Subscribe { peer_id, topic: None } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
PubsubEvent::Unsubscribe { peer_id, topic: None } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
_ => unreachable!()
}
}
line = rl.readline().fuse() => match line {
Ok(rustyline_async::ReadlineEvent::Line(line)) => {
if let Err(e) = ipfs.pubsub_publish(topic.clone(), line.as_bytes().to_vec()).await {
writeln!(stdout, "Error publishing message: {e}")?;
let line = line.trim();
if !line.starts_with('/') {
if !line.is_empty() {
let topic_to_publish = &*main_topic.lock();
if let Err(e) = ipfs.pubsub_publish(topic_to_publish.clone(), line.as_bytes().to_vec()).await {
writeln!(stdout, "> error publishing message: {e}")?;
continue;
}
writeln!(stdout, "{peer_id}: {line}")?;
}
continue;
}
writeln!(stdout, "{peer_id}: {line}")?;

let mut command = line.split(' ');

match command.next() {
Some("/subscribe") => {
let topic = match command.next() {
Some(topic) => topic.to_string(),
None => {
writeln!(stdout, "> topic must be provided")?;
continue;
}
};
let event_st = ipfs.pubsub_events(topic.clone()).await?;
let Ok(st) = ipfs.pubsub_subscribe(topic.clone()).await else {
writeln!(stdout, "> already subscribed to topic")?;
continue;
};

listener_st.insert(topic.clone(), st);
main_events.insert(topic.clone(), event_st);
writeln!(stdout, "> subscribed to {}", topic)?;
*main_topic.lock() = topic;
continue;
}
Some("/unsubscribe") => {
let topic = match command.next() {
Some(topic) => topic.to_string(),
None => main_topic.lock().clone()
};

listener_st.remove(&topic);
main_events.remove(&topic);

if !ipfs.pubsub_unsubscribe(&topic).await.unwrap_or_default() {
writeln!(stdout, "> unable to unsubscribe from {}", topic)?;
continue;
}

writeln!(stdout, "> unsubscribe from {}", topic)?;
if let Some(some_topic) = main_events.keys().next() {
*main_topic.lock() = some_topic.clone();
writeln!(stdout, "> setting current topic to {}", some_topic)?;
}
continue;
}
Some("/list-topics") => {
let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
if topics.is_empty() {
writeln!(stdout, "> not subscribed to any topics")?;
continue;
}

let current_topic = main_topic.lock().clone();

writeln!(stdout, "> list of topics")?;
for topic in topics {
writeln!(stdout, "\t{topic} {}", if current_topic == topic { "- current" } else { "" } )?;
}
}
Some("/set-current-topic") => {
let topic = match command.next() {
Some(topic) if !topic.is_empty() => topic.to_string(),
None | _ => {
writeln!(stdout, "> topic must be provided")?;
continue;
}
};

let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
if topics.is_empty() || !topics.contains(&topic) {
writeln!(stdout, "> not subscribed to topic \"{topic}\"")?;
continue;
}

*main_topic.lock() = topic.clone();

writeln!(stdout, "> topic set to {topic}")?;
}
_ => continue
}

}
Ok(rustyline_async::ReadlineEvent::Eof) => {
cancel.notify_one();
Expand Down
68 changes: 41 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,19 @@ impl From<DhtMode> for Option<Mode> {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum PubsubEvent {
/// Subscription event to a given topic
Subscribe { peer_id: PeerId },
Subscribe {
peer_id: PeerId,
topic: Option<String>,
},

/// Unsubscribing event to a given topic
Unsubscribe { peer_id: PeerId },
Unsubscribe {
peer_id: PeerId,
topic: Option<String>,
},
}

#[derive(Debug, Clone)]
Expand All @@ -467,15 +473,6 @@ pub(crate) enum InnerPubsubEvent {
Unsubscribe { topic: String, peer_id: PeerId },
}

impl From<InnerPubsubEvent> for PubsubEvent {
fn from(event: InnerPubsubEvent) -> Self {
match event {
InnerPubsubEvent::Subscribe { peer_id, .. } => PubsubEvent::Subscribe { peer_id },
InnerPubsubEvent::Unsubscribe { peer_id, .. } => PubsubEvent::Unsubscribe { peer_id },
}
}
}

type TSwarmEvent<C> = <TSwarm<C> as Stream>::Item;
type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
type TTransportFn = Box<
Expand Down Expand Up @@ -1527,38 +1524,55 @@ impl Ipfs {
.await
}

/// Stream that returns [`PubsubEvent`] for a given topic
/// Stream that returns [`PubsubEvent`] for a given topic. if a topic is not supplied, it will provide all events emitted for any topic.
pub async fn pubsub_events(
&self,
topic: impl Into<String>,
topic: impl Into<Option<String>>,
) -> Result<BoxStream<'static, PubsubEvent>, Error> {
async move {
let topic = topic.into();
let (tx, rx) = oneshot_channel();

self.to_task
.clone()
.send(IpfsEvent::PubsubEventStream(tx))
.await?;

let mut receiver = rx
.await?;
let receiver = rx.await?;

let defined_topic = topic.to_string();
let defined_topic = topic.into();

let stream = async_stream::stream! {
while let Some(event) = receiver.next().await {
match &event {
InnerPubsubEvent::Subscribe { topic, .. } | InnerPubsubEvent::Unsubscribe { topic, .. } if topic.eq(&defined_topic) => yield event.into(),
_ => {}
}
let stream = receiver.filter_map(move |event| {
let defined_topic = defined_topic.clone();
async move {
let ev = match event {
InnerPubsubEvent::Subscribe { topic, peer_id } => {
let topic = match defined_topic {
Some(defined_topic) if defined_topic.eq(&topic) => None,
Some(defined_topic) if defined_topic.ne(&topic) => return None,
Some(_) => return None,
None => Some(topic),
};
PubsubEvent::Subscribe { peer_id, topic }
}
InnerPubsubEvent::Unsubscribe { topic, peer_id } => {
let topic = match defined_topic {
Some(defined_topic) if defined_topic.eq(&topic) => None,
Some(defined_topic) if defined_topic.ne(&topic) => return None,
Some(_) => return None,
None => Some(topic),
};
PubsubEvent::Unsubscribe { peer_id, topic }
}
};

Some(ev)
}
};
});

Ok(stream.boxed())
}
.instrument(self.span.clone())
.await
.instrument(self.span.clone())
.await
}

/// Publishes to the topic which may have been subscribed to earlier
Expand Down
Loading