diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 4c101298a25..ccd796f66fc 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -14,7 +14,8 @@ arrayvec = "0.4.7" bytes = "0.4" either = "1.5" fnv = "1.0" -futures = "0.1" +futures_codec = "0.2" +futures-preview = "0.3.0-alpha.18" log = "0.4" libp2p-core = { version = "0.12.0", path = "../../core" } libp2p-swarm = { version = "0.2.0", path = "../../swarm" } @@ -24,9 +25,7 @@ protobuf = "2.3" rand = "0.6.0" sha2 = "0.8.0" smallvec = "0.6" -tokio-codec = "0.1" -tokio-io = "0.1" -wasm-timer = "0.1" +wasm-timer = "0.2" uint = "0.8" unsigned-varint = { git = "https://github.com/tomaka/unsigned-varint", branch = "futures-codec", features = ["codec"] } void = "1.0" @@ -37,4 +36,3 @@ libp2p-tcp = { version = "0.12.0", path = "../../transports/tcp" } libp2p-yamux = { version = "0.12.0", path = "../../muxers/yamux" } quickcheck = "0.8" rand = "0.6.0" -tokio = "0.1" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 6b01cc86df3..db9363643a3 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -39,7 +39,7 @@ use smallvec::SmallVec; use std::{borrow::Cow, error, iter, marker::PhantomData, time::Duration}; use std::collections::VecDeque; use std::num::NonZeroUsize; -use tokio_io::{AsyncRead, AsyncWrite}; +use std::task::{Context, Poll}; use wasm_timer::Instant; /// Network behaviour that handles Kademlia. @@ -1010,7 +1010,7 @@ where impl NetworkBehaviour for Kademlia where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, for<'a> TStore: RecordStore<'a>, { type ProtocolsHandler = KademliaHandler; @@ -1304,7 +1304,7 @@ where }; } - fn poll(&mut self, parameters: &mut impl PollParameters) -> Async< + fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll< NetworkBehaviourAction< ::InEvent, Self::OutEvent, @@ -1319,7 +1319,7 @@ where if let Some(mut job) = self.add_provider_job.take() { let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity); for _ in 0 .. num { - if let Async::Ready(r) = job.poll(&mut self.store, now) { + if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) { self.start_add_provider(r.key, AddProviderContext::Republish) } else { break @@ -1333,7 +1333,7 @@ where if let Some(mut job) = self.put_record_job.take() { let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity); for _ in 0 .. num { - if let Async::Ready(r) = job.poll(&mut self.store, now) { + if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) { let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) { PutRecordContext::Republish } else { @@ -1350,7 +1350,7 @@ where loop { // Drain queued events first. if let Some(event) = self.queued_events.pop_front() { - return Async::Ready(event); + return Poll::Ready(event); } // Drain applied pending entries from the routing table. @@ -1361,7 +1361,7 @@ where addresses: value, old_peer: entry.evicted.map(|n| n.key.into_preimage()) }; - return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) } // Look for a finished query. @@ -1369,12 +1369,12 @@ where match self.queries.poll(now) { QueryPoolState::Finished(q) => { if let Some(event) = self.query_finished(q, parameters) { - return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) } } QueryPoolState::Timeout(q) => { if let Some(event) = self.query_timeout(q) { - return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) } } QueryPoolState::Waiting(Some((query, peer_id))) => { @@ -1406,7 +1406,7 @@ where // If no new events have been queued either, signal `NotReady` to // be polled again later. if self.queued_events.is_empty() { - return Async::NotReady + return Poll::Pending } } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 137bc704ba0..87a5fabf099 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -36,8 +36,7 @@ use libp2p_core::{ upgrade::{self, InboundUpgrade, OutboundUpgrade, Negotiated} }; use log::trace; -use std::{borrow::Cow, error, fmt, io, time::Duration}; -use tokio_io::{AsyncRead, AsyncWrite}; +use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; /// Protocol handler that handles Kademlia communications with the remote. @@ -48,7 +47,7 @@ use wasm_timer::Instant; /// It also handles requests made by the remote. pub struct KademliaHandler where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, { /// Configuration for the Kademlia protocol. config: KademliaProtocolConfig, @@ -69,7 +68,7 @@ where /// State of an active substream, opened either by us or by the remote. enum SubstreamState where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, { /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. @@ -103,29 +102,29 @@ where impl SubstreamState where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, { - /// Consumes this state and tries to close the substream. + /// Tries to close the substream. /// /// If the substream is not ready to be closed, returns it back. - fn try_close(self) -> AsyncSink { + fn try_close(&mut self, cx: &mut Context) -> Poll<()> { match self { SubstreamState::OutPendingOpen(_, _) - | SubstreamState::OutReportError(_, _) => AsyncSink::Ready, - SubstreamState::OutPendingSend(mut stream, _, _) - | SubstreamState::OutPendingFlush(mut stream, _) - | SubstreamState::OutWaitingAnswer(mut stream, _) - | SubstreamState::OutClosing(mut stream) => match stream.close() { - Ok(Async::Ready(())) | Err(_) => AsyncSink::Ready, - Ok(Async::NotReady) => AsyncSink::NotReady(SubstreamState::OutClosing(stream)), + | SubstreamState::OutReportError(_, _) => Poll::Ready(()), + SubstreamState::OutPendingSend(ref mut stream, _, _) + | SubstreamState::OutPendingFlush(ref mut stream, _) + | SubstreamState::OutWaitingAnswer(ref mut stream, _) + | SubstreamState::OutClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, }, - SubstreamState::InWaitingMessage(_, mut stream) - | SubstreamState::InWaitingUser(_, mut stream) - | SubstreamState::InPendingSend(_, mut stream, _) - | SubstreamState::InPendingFlush(_, mut stream) - | SubstreamState::InClosing(mut stream) => match stream.close() { - Ok(Async::Ready(())) | Err(_) => AsyncSink::Ready, - Ok(Async::NotReady) => AsyncSink::NotReady(SubstreamState::InClosing(stream)), + SubstreamState::InWaitingMessage(_, ref mut stream) + | SubstreamState::InWaitingUser(_, ref mut stream) + | SubstreamState::InPendingSend(_, ref mut stream, _) + | SubstreamState::InPendingFlush(_, ref mut stream) + | SubstreamState::InClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, }, } } @@ -382,7 +381,7 @@ struct UniqueConnecId(u64); impl KademliaHandler where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, { /// Create a `KademliaHandler` that only allows sending messages to the remote but denying /// incoming connections. @@ -418,7 +417,7 @@ where impl Default for KademliaHandler where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, { #[inline] fn default() -> Self { @@ -428,7 +427,7 @@ where impl ProtocolsHandler for KademliaHandler where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, TUserData: Clone, { type InEvent = KademliaHandlerIn; @@ -485,7 +484,10 @@ where _ => false, }); if let Some(pos) = pos { - let _ = self.substreams.remove(pos).try_close(); + // TODO: we don't properly close down the substream + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + let _ = self.substreams.remove(pos).try_close(&mut cx); } } KademliaHandlerIn::FindNodeReq { key, user_data } => { @@ -639,22 +641,22 @@ where fn poll( &mut self, + cx: &mut Context, ) -> Poll< ProtocolsHandlerEvent, - io::Error, > { // We remove each element from `substreams` one by one and add them back. for n in (0..self.substreams.len()).rev() { let mut substream = self.substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.clone()) { + match advance_substream(substream, self.config.clone(), cx) { (Some(new_state), Some(event), _) => { self.substreams.push(new_state); - return Ok(Async::Ready(event)); + return Poll::Ready(event); } (None, Some(event), _) => { - return Ok(Async::Ready(event)); + return Poll::Ready(event); } (Some(new_state), None, false) => { self.substreams.push(new_state); @@ -677,7 +679,7 @@ where self.keep_alive = KeepAlive::Yes; } - Ok(Async::NotReady) + Poll::Pending } } @@ -688,6 +690,7 @@ where fn advance_substream( state: SubstreamState, upgrade: KademliaProtocolConfig, + cx: &mut Context, ) -> ( Option>, Option< @@ -695,12 +698,13 @@ fn advance_substream( KademliaProtocolConfig, (KadRequestMsg, Option), KademliaHandlerEvent, + io::Error, >, >, bool, ) where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, { match state { SubstreamState::OutPendingOpen(msg, user_data) => { @@ -711,18 +715,34 @@ where (None, Some(ev), false) } SubstreamState::OutPendingSend(mut substream, msg, user_data) => { - match substream.start_send(msg) { - Ok(AsyncSink::Ready) => ( - Some(SubstreamState::OutPendingFlush(substream, user_data)), - None, - true, - ), - Ok(AsyncSink::NotReady(msg)) => ( + match Sink::poll_ready(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + match Sink::start_send(Pin::new(&mut substream), msg) { + Ok(()) => ( + Some(SubstreamState::OutPendingFlush(substream, user_data)), + None, + true, + ), + Err(error) => { + let event = if let Some(user_data) = user_data { + Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { + error: KademliaHandlerQueryErr::Io(error), + user_data + })) + } else { + None + }; + + (None, event, false) + } + } + }, + Poll::Pending => ( Some(SubstreamState::OutPendingSend(substream, msg, user_data)), None, false, ), - Err(error) => { + Poll::Ready(Err(error)) => { let event = if let Some(user_data) = user_data { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), @@ -737,8 +757,8 @@ where } } SubstreamState::OutPendingFlush(mut substream, user_data) => { - match substream.poll_complete() { - Ok(Async::Ready(())) => { + match Sink::poll_flush(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { if let Some(user_data) = user_data { ( Some(SubstreamState::OutWaitingAnswer(substream, user_data)), @@ -749,12 +769,12 @@ where (Some(SubstreamState::OutClosing(substream)), None, true) } } - Ok(Async::NotReady) => ( + Poll::Pending => ( Some(SubstreamState::OutPendingFlush(substream, user_data)), None, false, ), - Err(error) => { + Poll::Ready(Err(error)) => { let event = if let Some(user_data) = user_data { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), @@ -768,8 +788,8 @@ where } } } - SubstreamState::OutWaitingAnswer(mut substream, user_data) => match substream.poll() { - Ok(Async::Ready(Some(msg))) => { + SubstreamState::OutWaitingAnswer(mut substream, user_data) => match Stream::poll_next(Pin::new(&mut substream), cx) { + Poll::Ready(Some(Ok(msg))) => { let new_state = SubstreamState::OutClosing(substream); let event = process_kad_response(msg, user_data); ( @@ -778,19 +798,19 @@ where true, ) } - Ok(Async::NotReady) => ( + Poll::Pending => ( Some(SubstreamState::OutWaitingAnswer(substream, user_data)), None, false, ), - Err(error) => { + Poll::Ready(Some(Err(error))) => { let event = KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data, }; (None, Some(ProtocolsHandlerEvent::Custom(event)), false) } - Ok(Async::Ready(None)) => { + Poll::Ready(None) => { let event = KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), user_data, @@ -802,13 +822,13 @@ where let event = KademliaHandlerEvent::QueryError { error, user_data }; (None, Some(ProtocolsHandlerEvent::Custom(event)), false) } - SubstreamState::OutClosing(mut stream) => match stream.close() { - Ok(Async::Ready(())) => (None, None, false), - Ok(Async::NotReady) => (Some(SubstreamState::OutClosing(stream)), None, false), - Err(_) => (None, None, false), + SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) { + Poll::Ready(Ok(())) => (None, None, false), + Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false), + Poll::Ready(Err(_)) => (None, None, false), }, - SubstreamState::InWaitingMessage(id, mut substream) => match substream.poll() { - Ok(Async::Ready(Some(msg))) => { + SubstreamState::InWaitingMessage(id, mut substream) => match Stream::poll_next(Pin::new(&mut substream), cx) { + Poll::Ready(Some(Ok(msg))) => { if let Ok(ev) = process_kad_request(msg, id) { ( Some(SubstreamState::InWaitingUser(id, substream)), @@ -819,16 +839,16 @@ where (Some(SubstreamState::InClosing(substream)), None, true) } } - Ok(Async::NotReady) => ( + Poll::Pending => ( Some(SubstreamState::InWaitingMessage(id, substream)), None, false, ), - Ok(Async::Ready(None)) => { + Poll::Ready(None) => { trace!("Inbound substream: EOF"); (None, None, false) } - Err(e) => { + Poll::Ready(Some(Err(e))) => { trace!("Inbound substream error: {:?}", e); (None, None, false) }, @@ -838,36 +858,39 @@ where None, false, ), - SubstreamState::InPendingSend(id, mut substream, msg) => match substream.start_send(msg) { - Ok(AsyncSink::Ready) => ( - Some(SubstreamState::InPendingFlush(id, substream)), - None, - true, - ), - Ok(AsyncSink::NotReady(msg)) => ( + SubstreamState::InPendingSend(id, mut substream, msg) => match Sink::poll_ready(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { + Ok(()) => ( + Some(SubstreamState::InPendingFlush(id, substream)), + None, + true, + ), + Err(_) => (None, None, false), + }, + Poll::Pending => ( Some(SubstreamState::InPendingSend(id, substream, msg)), None, false, ), - Err(_) => (None, None, false), - }, - SubstreamState::InPendingFlush(id, mut substream) => match substream.poll_complete() { - Ok(Async::Ready(())) => ( + Poll::Ready(Err(_)) => (None, None, false), + } + SubstreamState::InPendingFlush(id, mut substream) => match Sink::poll_flush(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => ( Some(SubstreamState::InWaitingMessage(id, substream)), None, true, ), - Ok(Async::NotReady) => ( + Poll::Pending => ( Some(SubstreamState::InPendingFlush(id, substream)), None, false, ), - Err(_) => (None, None, false), + Poll::Ready(Err(_)) => (None, None, false), }, - SubstreamState::InClosing(mut stream) => match stream.close() { - Ok(Async::Ready(())) => (None, None, false), - Ok(Async::NotReady) => (Some(SubstreamState::InClosing(stream)), None, false), - Err(_) => (None, None, false), + SubstreamState::InClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) { + Poll::Ready(Ok(())) => (None, None, false), + Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false), + Poll::Ready(Err(_)) => (None, None, false), }, } } diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index e7909c90bc0..6d9ed399ef5 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -65,6 +65,8 @@ use crate::record::{self, Record, ProviderRecord, store::RecordStore}; use libp2p_core::PeerId; use futures::prelude::*; use std::collections::HashSet; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use std::vec; use wasm_timer::{Instant, Delay}; @@ -96,16 +98,18 @@ impl PeriodicJob { /// Cuts short the remaining delay, if the job is currently waiting /// for the delay to expire. fn asap(&mut self) { - if let PeriodicJobState::Waiting(delay) = &mut self.state { - delay.reset(Instant::now() - Duration::from_secs(1)) + if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state { + let new_deadline = Instant::now() - Duration::from_secs(1); + *deadline = new_deadline; + delay.reset_at(new_deadline); } } /// Returns `true` if the job is currently not running but ready /// to be run, `false` otherwise. - fn is_ready(&mut self, now: Instant) -> bool { - if let PeriodicJobState::Waiting(delay) = &mut self.state { - if now >= delay.deadline() || delay.poll().map(|a| a.is_ready()).unwrap_or(false) { + fn is_ready(&mut self, cx: &mut Context, now: Instant) -> bool { + if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state { + if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() { return true } } @@ -117,7 +121,7 @@ impl PeriodicJob { #[derive(Debug)] enum PeriodicJobState { Running(T), - Waiting(Delay) + Waiting(Delay, Instant) } ////////////////////////////////////////////////////////////////////////////// @@ -143,7 +147,8 @@ impl PutRecordJob { record_ttl: Option, ) -> Self { let now = Instant::now(); - let delay = Delay::new(now + replicate_interval); + let deadline = now + replicate_interval; + let delay = Delay::new_at(deadline); let next_publish = publish_interval.map(|i| now + i); Self { local_id, @@ -153,7 +158,7 @@ impl PutRecordJob { skipped: HashSet::new(), inner: PeriodicJob { interval: replicate_interval, - state: PeriodicJobState::Waiting(delay) + state: PeriodicJobState::Waiting(delay, deadline) } } } @@ -185,11 +190,11 @@ impl PutRecordJob { /// Must be called in the context of a task. When `NotReady` is returned, /// the current task is registered to be notified when the job is ready /// to be run. - pub fn poll(&mut self, store: &mut T, now: Instant) -> Async + pub fn poll(&mut self, cx: &mut Context, store: &mut T, now: Instant) -> Poll where for<'a> T: RecordStore<'a> { - if self.inner.is_ready(now) { + if self.inner.is_ready(cx, now) { let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub); let records = store.records() .filter_map(|r| { @@ -224,7 +229,7 @@ impl PutRecordJob { if r.is_expired(now) { store.remove(&r.key) } else { - return Async::Ready(r) + return Poll::Ready(r) } } else { break @@ -232,12 +237,13 @@ impl PutRecordJob { } // Wait for the next run. - let delay = Delay::new(now + self.inner.interval); - self.inner.state = PeriodicJobState::Waiting(delay); - assert!(!self.inner.is_ready(now)); + let deadline = now + self.inner.interval; + let delay = Delay::new_at(deadline); + self.inner.state = PeriodicJobState::Waiting(delay, deadline); + assert!(!self.inner.is_ready(cx, now)); } - Async::NotReady + Poll::Pending } } @@ -256,7 +262,10 @@ impl AddProviderJob { Self { inner: PeriodicJob { interval, - state: PeriodicJobState::Waiting(Delay::new(now + interval)) + state: { + let deadline = now + interval; + PeriodicJobState::Waiting(Delay::new_at(deadline), deadline) + } } } } @@ -279,11 +288,11 @@ impl AddProviderJob { /// Must be called in the context of a task. When `NotReady` is returned, /// the current task is registered to be notified when the job is ready /// to be run. - pub fn poll(&mut self, store: &mut T, now: Instant) -> Async + pub fn poll(&mut self, cx: &mut Context, store: &mut T, now: Instant) -> Poll where for<'a> T: RecordStore<'a> { - if self.inner.is_ready(now) { + if self.inner.is_ready(cx, now) { let records = store.provided() .map(|r| r.into_owned()) .collect::>() @@ -297,19 +306,20 @@ impl AddProviderJob { if r.is_expired(now) { store.remove_provider(&r.key, &r.provider) } else { - return Async::Ready(r) + return Poll::Ready(r) } } else { break } } - let delay = Delay::new(now + self.inner.interval); - self.inner.state = PeriodicJobState::Waiting(delay); - assert!(!self.inner.is_ready(now)); + let deadline = now + self.inner.interval; + let delay = Delay::new_at(deadline); + self.inner.state = PeriodicJobState::Waiting(delay, deadline); + assert!(!self.inner.is_ready(cx, now)); } - Async::NotReady + Poll::Pending } } @@ -360,11 +370,11 @@ mod tests { // All (non-expired) records in the store must be yielded by the job. for r in store.records().map(|r| r.into_owned()).collect::>() { if !r.is_expired(now) { - assert_eq!(job.poll(&mut store, now), Async::Ready(r)); + assert_eq!(job.poll(&mut store, now), Poll::Ready(r)); assert!(job.is_running()); } } - assert_eq!(job.poll(&mut store, now), Async::NotReady); + assert_eq!(job.poll(&mut store, now), Poll::Pending); assert!(!job.is_running()); } @@ -390,11 +400,11 @@ mod tests { // All (non-expired) records in the store must be yielded by the job. for r in store.provided().map(|r| r.into_owned()).collect::>() { if !r.is_expired(now) { - assert_eq!(job.poll(&mut store, now), Async::Ready(r)); + assert_eq!(job.poll(&mut store, now), Poll::Ready(r)); assert!(job.is_running()); } } - assert_eq!(job.poll(&mut store, now), Async::NotReady); + assert_eq!(job.poll(&mut store, now), Poll::Pending); assert!(!job.is_running()); } diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index ad5b8894a30..68984a47ec1 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -34,14 +34,13 @@ use bytes::BytesMut; use codec::UviBytes; use crate::protobuf_structs::dht as proto; use crate::record::{self, Record}; -use futures::{future::{self, FutureResult}, sink, stream, Sink, Stream}; +use futures::prelude::*; +use futures_codec::Framed; use libp2p_core::{Multiaddr, PeerId}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use protobuf::{self, Message}; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; -use tokio_codec::Framed; -use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec; use wasm_timer::Instant; @@ -176,10 +175,10 @@ impl UpgradeInfo for KademliaProtocolConfig { impl InboundUpgrade for KademliaProtocolConfig where - C: AsyncRead + AsyncWrite, + C: AsyncRead + AsyncWrite + Unpin, { type Output = KadInStreamSink>; - type Future = FutureResult; + type Future = future::Ready>; type Error = io::Error; #[inline] @@ -189,14 +188,17 @@ where future::ok( Framed::new(incoming, codec) - .from_err() - .with::<_, fn(_) -> _, _>(|response| { + .err_into() + .with::<_, _, fn(_) -> _, _>(|response| { let proto_struct = resp_msg_to_proto(response); - proto_struct.write_to_bytes().map_err(invalid_data) + future::ready(proto_struct.write_to_bytes().map_err(invalid_data)) }) - .and_then:: _, _>(|bytes| { - let request = protobuf::parse_from_bytes(&bytes)?; - proto_to_req_msg(request) + .and_then::<_, fn(_) -> _>(|bytes| { + let request = match protobuf::parse_from_bytes(&bytes) { + Ok(r) => r, + Err(err) => return future::ready(Err(err.into())) + }; + future::ready(proto_to_req_msg(request)) }), ) } @@ -204,10 +206,10 @@ where impl OutboundUpgrade for KademliaProtocolConfig where - C: AsyncRead + AsyncWrite, + C: AsyncRead + AsyncWrite + Unpin, { type Output = KadOutStreamSink>; - type Future = FutureResult; + type Future = future::Ready>; type Error = io::Error; #[inline] @@ -217,14 +219,17 @@ where future::ok( Framed::new(incoming, codec) - .from_err() - .with::<_, fn(_) -> _, _>(|request| { + .err_into() + .with::<_, _, fn(_) -> _, _>(|request| { let proto_struct = req_msg_to_proto(request); - proto_struct.write_to_bytes().map_err(invalid_data) + future::ready(proto_struct.write_to_bytes().map_err(invalid_data)) }) - .and_then:: _, _>(|bytes| { - let response = protobuf::parse_from_bytes(&bytes)?; - proto_to_resp_msg(response) + .and_then::<_, fn(_) -> _>(|bytes| { + let response = match protobuf::parse_from_bytes(&bytes) { + Ok(r) => r, + Err(err) => return future::ready(Err(err.into())) + }; + future::ready(proto_to_resp_msg(response)) }), ) } @@ -238,13 +243,14 @@ pub type KadOutStreamSink = KadStreamSink; pub type KadStreamSink = stream::AndThen< sink::With< - stream::FromErr>>, io::Error>, + stream::ErrInto>>, io::Error>, + Vec, A, - fn(A) -> Result, io::Error>, - Result, io::Error>, + future::Ready, io::Error>>, + fn(A) -> future::Ready, io::Error>>, >, - fn(BytesMut) -> Result, - Result, + future::Ready>, + fn(BytesMut) -> future::Ready>, >; /// Request that we can send to a peer or that we received from a peer.