Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Swarm::next_extended #1374

Merged
merged 5 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
//! peer ID will be generated randomly.

use async_std::task;
use futures::prelude::*;
use libp2p::{
Swarm,
PeerId,
Expand Down Expand Up @@ -90,7 +89,8 @@ fn main() -> Result<(), Box<dyn Error>> {

// Kick it off!
task::block_on(async move {
while let Some(event) = swarm.try_next().await? {
loop {
let event = swarm.next().await;
if let KademliaEvent::GetClosestPeersResult(result) = event {
match result {
Ok(ok) =>
Expand Down
35 changes: 22 additions & 13 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,20 +254,18 @@ pub enum IdentifyEvent {
#[cfg(test)]
mod tests {
use crate::{Identify, IdentifyEvent};
use futures::prelude::*;
use futures::{prelude::*, pin_mut};
use libp2p_core::{
identity,
PeerId,
muxing::StreamMuxer,
Multiaddr,
Transport,
upgrade
};
use libp2p_tcp::TcpConfig;
use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_mplex::MplexConfig;
use rand::{Rng, thread_rng};
use std::{fmt, io};

fn transport() -> (identity::PublicKey, impl Transport<
Expand Down Expand Up @@ -303,30 +301,41 @@ mod tests {
(swarm, pubkey)
};

let addr: Multiaddr = {
let port = thread_rng().gen_range(49152, std::u16::MAX);
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()
};
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

Swarm::listen_on(&mut swarm1, addr.clone()).unwrap();
Swarm::dial_addr(&mut swarm2, addr.clone()).unwrap();
let listen_addr = async_std::task::block_on(async {
loop {
let swarm1_fut = swarm1.next_event();
pin_mut!(swarm1_fut);
match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr,
_ => {}
}
}
});
Swarm::dial_addr(&mut swarm2, listen_addr).unwrap();

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
match future::select(swarm1.next(), swarm2.next()).await.factor_second().0 {
future::Either::Left(Some(Ok(IdentifyEvent::Received { info, .. }))) => {
let swarm1_fut = swarm1.next();
pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.next();
pin_mut!(swarm2_fut);

match future::select(swarm1_fut, swarm2_fut).await.factor_second().0 {
future::Either::Left(IdentifyEvent::Received { info, .. }) => {
assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d");
assert!(!info.protocols.is_empty());
assert!(info.listen_addrs.is_empty());
return;
}
future::Either::Right(Some(Ok(IdentifyEvent::Received { info, .. }))) => {
future::Either::Right(IdentifyEvent::Received { info, .. }) => {
assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b");
Expand Down
42 changes: 21 additions & 21 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn bootstrap() {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::BootstrapResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => {
assert_eq!(i, 0);
assert_eq!(ok.peer, swarm_ids[0]);
let known = swarm.kbuckets.iter()
Expand All @@ -138,7 +138,7 @@ fn bootstrap() {
return Poll::Ready(())
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -186,7 +186,7 @@ fn query_iter() {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(swarm_ids[i], expected_swarm_id);
assert_eq!(swarm.queries.size(), 0);
Expand All @@ -196,7 +196,7 @@ fn query_iter() {
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -234,13 +234,13 @@ fn unresponsive_not_returned_direct() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 0);
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -278,14 +278,14 @@ fn unresponsive_not_returned_indirect() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id);
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -314,7 +314,7 @@ fn get_record_not_found() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Err(e))))) => {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => {
if let GetRecordError::NotFound { key, closest_peers, } = e {
assert_eq!(key, target_key);
assert_eq!(closest_peers.len(), 2);
Expand All @@ -326,7 +326,7 @@ fn get_record_not_found() {
}
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -375,8 +375,8 @@ fn put_record() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::PutRecordResult(res)))) |
Poll::Ready(Some(Ok(KademliaEvent::RepublishRecordResult(res)))) => {
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => {
match res {
Err(e) => panic!(e),
Ok(ok) => {
Expand All @@ -387,7 +387,7 @@ fn put_record() {
}
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -474,13 +474,13 @@ fn get_value() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), 1);
assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -513,13 +513,13 @@ fn get_value_many() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), num_results);
assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(());
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -561,8 +561,8 @@ fn add_provider() {
for swarm in &mut swarms {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::StartProvidingResult(res)))) |
Poll::Ready(Some(Ok(KademliaEvent::RepublishProviderResult(res)))) => {
Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) |
Poll::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => {
match res {
Err(e) => panic!(e),
Ok(ok) => {
Expand All @@ -572,7 +572,7 @@ fn add_provider() {
}
}
// Ignore any other event.
Poll::Ready(Some(Ok(_))) => (),
Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break,
}
Expand Down Expand Up @@ -669,7 +669,7 @@ fn exceed_jobs_max_queries() {
for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) {
if let Ok(KademliaEvent::BootstrapResult(r)) = e {
if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error")
} else {
panic!("Unexpected event: {:?}", e)
Expand Down
4 changes: 2 additions & 2 deletions protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn ping() {
}

loop {
match swarm1.next().await.unwrap().unwrap() {
match swarm1.next().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid1.clone(), peer, rtt)
},
Expand All @@ -74,7 +74,7 @@ fn ping() {
Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap();

loop {
match swarm2.next().await.unwrap().unwrap() {
match swarm2.next().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid2.clone(), peer, rtt)
},
Expand Down
Loading