Skip to content

Commit

Permalink
fix: Return events from gossipsub stream (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored May 3, 2023
1 parent d2acd0b commit f7b0f74
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.3.16 [unreleased]
- fix: Return events from gossipsub stream [PR 68]

[PR 68]: https://github.com/dariusc93/rust-ipfs/pull/68

# 0.3.15
- fix: Remove item from want list [PR 64]
- chore: Impl sled datastore, split stores into own modules [PR 63]
Expand Down
9 changes: 8 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use futures::{

use p2p::{
IdentifyConfiguration, KadConfig, KadStoreConfig, PeerInfo, ProviderStream, RecordStream,
RelayConfig,
RelayConfig, PubsubConfig,
};
use repo::{BlockStore, DataStore, Lock};
use tokio::{sync::Notify, task::JoinHandle};
Expand Down Expand Up @@ -477,6 +477,13 @@ impl UninitializedIpfs {
self
}

/// Set pubsub configuration
pub fn set_pubsub_configuration(mut self, config: PubsubConfig) -> Self {
self.options.pubsub_config = Some(config);
self
}


/// Set keypair
pub fn set_keypair(mut self, keypair: Keypair) -> Self {
self.keys = keypair;
Expand Down
28 changes: 10 additions & 18 deletions src/p2p/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::{debug, warn};
use tracing::debug;

use libp2p::core::{Endpoint, Multiaddr};
use libp2p::identity::PeerId;
Expand Down Expand Up @@ -359,32 +359,24 @@ impl NetworkBehaviour for GossipsubStream {
peer_id,
topic,
}) => {
if self.subscribed_peers(&topic.to_string()).contains(&peer_id) {
warn!("Peer is already subscribed to {}", topic);
continue;
}

self.add_explicit_peer(&peer_id);
continue;
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Subscribed { peer_id, topic },
));
}
NetworkBehaviourAction::GenerateEvent(GossipsubEvent::Unsubscribed {
peer_id,
topic,
}) => {
if !self.subscribed_peers(&topic.to_string()).contains(&peer_id) {
warn!("Peer is not subscribed to {}", topic);
continue;
};

self.remove_explicit_peer(&peer_id);

continue;
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed { peer_id, topic },
));
}
NetworkBehaviourAction::GenerateEvent(GossipsubEvent::GossipsubNotSupported {
peer_id,
}) => {
warn!("Not supported for {}", peer_id);
continue;
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::GossipsubNotSupported { peer_id },
));
}
action @ NetworkBehaviourAction::Dial { .. } => {
return Poll::Ready(action);
Expand Down

0 comments on commit f7b0f74

Please sign in to comment.