From 072f23ec3ac44430b22e65a51cc60af916800399 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 21 Nov 2019 14:09:26 +0100 Subject: [PATCH] Fix mplex tests --- muxers/mplex/Cargo.toml | 1 + muxers/mplex/tests/async_write.rs | 71 ++++++-------- muxers/mplex/tests/two_peers.rs | 149 +++++++++++------------------- 3 files changed, 86 insertions(+), 135 deletions(-) diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index f4656e8de1a..6dc5bbaaeb8 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -20,4 +20,5 @@ parking_lot = "0.9" unsigned-varint = { version = "0.2.3", features = ["futures-codec"] } [dev-dependencies] +async-std = "1.0" libp2p-tcp = { version = "0.13.0", path = "../../transports/tcp" } diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 4fe3c319cb0..e0b708e340f 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -18,20 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_core::{muxing, upgrade, Transport, transport::ListenerEvent}; +use libp2p_core::{muxing, upgrade, Transport}; use libp2p_tcp::TcpConfig; -use futures::prelude::*; -use std::sync::{Arc, mpsc}; -use std::thread; -use tokio::runtime::current_thread::Runtime; +use futures::{prelude::*, channel::oneshot}; +use std::sync::Arc; #[test] fn async_write() { - // Tests that `AsyncWrite::shutdown` implies flush. + // Tests that `AsyncWrite::close` implies flush. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = oneshot::channel(); - let bg_thread = thread::spawn(move || { + let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); let transport = TcpConfig::new().and_then(move |c, e| @@ -41,8 +39,7 @@ fn async_write() { .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); - let addr = listener.by_ref().wait() - .next() + let addr = listener.next().await .expect("some event") .expect("no error") .into_new_address() @@ -50,41 +47,31 @@ fn async_write() { tx.send(addr).unwrap(); - let future = listener - .filter_map(ListenerEvent::into_upgrade) - .into_future() - .map_err(|(err, _)| panic!("{:?}", err)) - .and_then(|(client, _)| client.unwrap().0) - .map_err(|err| panic!("{:?}", err)) - .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) - .and_then(|client| { - tokio::io::read_to_end(client, vec![]) - }) - .and_then(|(_, msg)| { - assert_eq!(msg, b"hello world"); - Ok(()) - }); + let client = listener + .next().await + .unwrap() + .unwrap() + .into_upgrade().unwrap().0.await.unwrap(); + + let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)).await.unwrap(); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); + let mut buf = Vec::new(); + outbound.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"hello world"); }); - let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| - upgrade::apply(c, mplex, e, upgrade::Version::V1)); + async_std::task::block_on(async { + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); + + let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut inbound = muxing::inbound_from_ref_and_wrap(Arc::new(client)).await.unwrap(); + inbound.write_all(b"hello world").await.unwrap(); - let future = transport - .dial(rx.recv().unwrap()) - .unwrap() - .map_err(|err| panic!("{:?}", err)) - .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) - .and_then(|server| tokio::io::write_all(server, b"hello world")) - .and_then(|(server, _)| { - tokio::io::shutdown(server) - }) - .map(|_| ()); + // The test consists in making sure that this flushes the substream. + inbound.close().await.unwrap(); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); - bg_thread.join().unwrap(); + bg_thread.await; + }); } diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index e3e7d5d7fbc..51293a37cfc 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -18,23 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_core::{muxing, upgrade, Transport, transport::ListenerEvent}; +use libp2p_core::{muxing, upgrade, Transport}; use libp2p_tcp::TcpConfig; -use futures::prelude::*; -use std::sync::{Arc, mpsc}; -use std::thread; -use tokio::{ - codec::length_delimited::Builder, - runtime::current_thread::Runtime -}; +use futures::{channel::oneshot, prelude::*}; +use std::sync::Arc; #[test] fn client_to_server_outbound() { // Simulate a client sending a message to a server through a multiplex upgrade. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = oneshot::channel(); - let bg_thread = thread::spawn(move || { + let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); let transport = TcpConfig::new().and_then(move |c, e| @@ -44,8 +39,7 @@ fn client_to_server_outbound() { .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); - let addr = listener.by_ref().wait() - .next() + let addr = listener.next().await .expect("some event") .expect("no error") .into_new_address() @@ -53,56 +47,42 @@ fn client_to_server_outbound() { tx.send(addr).unwrap(); - let future = listener - .filter_map(ListenerEvent::into_upgrade) - .into_future() - .map_err(|(err, _)| panic!("{:?}", err)) - .and_then(|(client, _)| client.unwrap().0) - .map_err(|err| panic!("{:?}", err)) - .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) - .map(|client| Builder::new().new_read(client)) - .and_then(|client| { - client - .into_future() - .map_err(|(err, _)| err) - .map(|(msg, _)| msg) - }) - .and_then(|msg| { - let msg = msg.unwrap(); - assert_eq!(msg, "hello world"); - Ok(()) - }); - - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); + let client = listener + .next().await + .unwrap() + .unwrap() + .into_upgrade().unwrap().0.await.unwrap(); + + let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)).await.unwrap(); + + let mut buf = Vec::new(); + outbound.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"hello world"); }); - let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| - upgrade::apply(c, mplex, e, upgrade::Version::V1)); - - let future = transport - .dial(rx.recv().unwrap()) - .unwrap() - .map_err(|err| panic!("{:?}", err)) - .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) - .map(|server| Builder::new().new_write(server)) - .and_then(|server| server.send("hello world".into())) - .map(|_| ()); - - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); - bg_thread.join().unwrap(); + async_std::task::block_on(async { + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); + + let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut inbound = muxing::inbound_from_ref_and_wrap(Arc::new(client)).await.unwrap(); + inbound.write_all(b"hello world").await.unwrap(); + inbound.close().await.unwrap(); + + bg_thread.await; + }); } #[test] fn client_to_server_inbound() { // Simulate a client sending a message to a server through a multiplex upgrade. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = oneshot::channel(); - let bg_thread = thread::spawn(move || { + let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); @@ -110,54 +90,37 @@ fn client_to_server_inbound() { .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); - let addr = listener.by_ref().wait() - .next() + let addr = listener.next().await .expect("some event") .expect("no error") .into_new_address() .expect("listen address"); - tx.send(addr).unwrap(); - let future = listener - .filter_map(ListenerEvent::into_upgrade) - .into_future() - .map_err(|(err, _)| panic!("{:?}", err)) - .and_then(|(client, _)| client.unwrap().0) - .map_err(|err| panic!("{:?}", err)) - .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) - .map(|client| Builder::new().new_read(client)) - .and_then(|client| { - client - .into_future() - .map_err(|(err, _)| err) - .map(|(msg, _)| msg) - }) - .and_then(|msg| { - let msg = msg.unwrap(); - assert_eq!(msg, "hello world"); - Ok(()) - }); - - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); + let client = listener + .next().await + .unwrap() + .unwrap() + .into_upgrade().unwrap().0.await.unwrap(); + + let mut inbound = muxing::inbound_from_ref_and_wrap(Arc::new(client)).await.unwrap(); + + let mut buf = Vec::new(); + inbound.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"hello world"); }); - let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new().and_then(move |c, e| - upgrade::apply(c, mplex, e, upgrade::Version::V1)); - - let future = transport - .dial(rx.recv().unwrap()) - .unwrap() - .map_err(|err| panic!("{:?}", err)) - .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) - .map(|server| Builder::new().new_write(server)) - .and_then(|server| server.send("hello world".into())) - .map(|_| ()); - - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); - bg_thread.join().unwrap(); + async_std::task::block_on(async { + let mplex = libp2p_mplex::MplexConfig::new(); + let transport = TcpConfig::new().and_then(move |c, e| + upgrade::apply(c, mplex, e, upgrade::Version::V1)); + + let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut outbound = muxing::outbound_from_ref_and_wrap(Arc::new(client)).await.unwrap(); + outbound.write_all(b"hello world").await.unwrap(); + outbound.close().await.unwrap(); + + bg_thread.await; + }); }