Skip to content

Commit

Permalink
Upgrade libp2p-kad to stable futures (#1254)
Browse files Browse the repository at this point in the history
* Upgrade libp2p-kad to stable futures

* Fix comment
  • Loading branch information
tomaka authored Sep 26, 2019
1 parent d7e9ba4 commit 7f58684
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 140 deletions.
8 changes: 3 additions & 5 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ arrayvec = "0.4.7"
bytes = "0.4"
either = "1.5"
fnv = "1.0"
futures = "0.1"
futures_codec = "0.2"
futures-preview = "0.3.0-alpha.18"
log = "0.4"
libp2p-core = { version = "0.12.0", path = "../../core" }
libp2p-swarm = { version = "0.2.0", path = "../../swarm" }
Expand All @@ -24,9 +25,7 @@ protobuf = "2.3"
rand = "0.6.0"
sha2 = "0.8.0"
smallvec = "0.6"
tokio-codec = "0.1"
tokio-io = "0.1"
wasm-timer = "0.1"
wasm-timer = "0.2"
uint = "0.8"
unsigned-varint = { git = "https://github.com/tomaka/unsigned-varint", branch = "futures-codec", features = ["codec"] }
void = "1.0"
Expand All @@ -37,4 +36,3 @@ libp2p-tcp = { version = "0.12.0", path = "../../transports/tcp" }
libp2p-yamux = { version = "0.12.0", path = "../../muxers/yamux" }
quickcheck = "0.8"
rand = "0.6.0"
tokio = "0.1"
20 changes: 10 additions & 10 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, marker::PhantomData, time::Duration};
use std::collections::VecDeque;
use std::num::NonZeroUsize;
use tokio_io::{AsyncRead, AsyncWrite};
use std::task::{Context, Poll};
use wasm_timer::Instant;

/// Network behaviour that handles Kademlia.
Expand Down Expand Up @@ -1010,7 +1010,7 @@ where

impl<TSubstream, TStore> NetworkBehaviour for Kademlia<TSubstream, TStore>
where
TSubstream: AsyncRead + AsyncWrite,
TSubstream: AsyncRead + AsyncWrite + Unpin,
for<'a> TStore: RecordStore<'a>,
{
type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>;
Expand Down Expand Up @@ -1304,7 +1304,7 @@ where
};
}

fn poll(&mut self, parameters: &mut impl PollParameters) -> Async<
fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
Expand All @@ -1319,7 +1319,7 @@ where
if let Some(mut job) = self.add_provider_job.take() {
let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
for _ in 0 .. num {
if let Async::Ready(r) = job.poll(&mut self.store, now) {
if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
self.start_add_provider(r.key, AddProviderContext::Republish)
} else {
break
Expand All @@ -1333,7 +1333,7 @@ where
if let Some(mut job) = self.put_record_job.take() {
let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
for _ in 0 .. num {
if let Async::Ready(r) = job.poll(&mut self.store, now) {
if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
PutRecordContext::Republish
} else {
Expand All @@ -1350,7 +1350,7 @@ where
loop {
// Drain queued events first.
if let Some(event) = self.queued_events.pop_front() {
return Async::Ready(event);
return Poll::Ready(event);
}

// Drain applied pending entries from the routing table.
Expand All @@ -1361,20 +1361,20 @@ where
addresses: value,
old_peer: entry.evicted.map(|n| n.key.into_preimage())
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event))
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}

// Look for a finished query.
loop {
match self.queries.poll(now) {
QueryPoolState::Finished(q) => {
if let Some(event) = self.query_finished(q, parameters) {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event))
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
}
QueryPoolState::Timeout(q) => {
if let Some(event) = self.query_timeout(q) {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event))
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
}
QueryPoolState::Waiting(Some((query, peer_id))) => {
Expand Down Expand Up @@ -1406,7 +1406,7 @@ where
// If no new events have been queued either, signal `NotReady` to
// be polled again later.
if self.queued_events.is_empty() {
return Async::NotReady
return Poll::Pending
}
}
}
Expand Down
Loading

0 comments on commit 7f58684

Please sign in to comment.