Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove usage of FutureObj #134

Merged
merged 2 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ matrix:
include:
- env: TARGET=x86_64-unknown-linux-gnu CC=gcc-6 CXX=g++-6 KCOV=1
os: linux
rust: nightly-2018-11-13
rust: nightly-2018-11-27
before_script:
- travis/trusty/before-script.sh
addons:
Expand All @@ -29,7 +29,7 @@ matrix:

- env: TARGET=x86_64-apple-darwin KCOV=0
os: osx
rust: nightly-2018-11-13
rust: nightly-2018-11-27
osx_image: xcode9.2
before_script:
- brew install capnp
Expand Down
337 changes: 200 additions & 137 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions components/channeler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-channeler"
version = "0.1.0"
Expand All @@ -18,5 +16,6 @@ relay = { path = "../relay", version = "0.1.0" , package = "offst-relay" }
secure_channel = { path = "../secure_channel", version = "0.1.0" , package = "offst-secure-channel" }

log = "0.4"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

15 changes: 8 additions & 7 deletions components/channeler/src/connector_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::FutureObj;
use futures::FutureExt;
use core::pin::Pin;

use futures::{Future, FutureExt};
use futures::task::Spawn;
use futures::future;

Expand Down Expand Up @@ -37,8 +38,8 @@ where
type SendItem = C::SendItem;
type RecvItem = C::RecvItem;

fn connect(&mut self, address: ())
-> FutureObj<Option<ConnPair<C::SendItem, C::RecvItem>>> {
fn connect<'a>(&'a mut self, address: ())
-> Pin<Box<dyn Future<Output=Option<ConnPair<C::SendItem, C::RecvItem>>> + Send + 'a>> {
self.connector.connect(self.address.clone())
}
}
Expand Down Expand Up @@ -88,8 +89,8 @@ where
type SendItem = Vec<u8>;
type RecvItem = Vec<u8>;

fn connect(&mut self, full_address: (PublicKey, A))
-> FutureObj<Option<ConnPair<Vec<u8>, Vec<u8>>>> {
fn connect<'a>(&'a mut self, full_address: (PublicKey, A))
-> Pin<Box<dyn Future<Output=Option<ConnPair<C::SendItem, C::RecvItem>>> + Send + 'a>> {

let (public_key, address) = full_address;
let fut = async move {
Expand All @@ -108,6 +109,6 @@ where
receiver: secure_channel.receiver,
})
};
FutureObj::new(fut.boxed())
Box::pinned(fut)
}
}
10 changes: 5 additions & 5 deletions components/channeler/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::marker::Unpin;
use futures::{select, future, Stream, StreamExt, Sink};
use futures::{select, future, FutureExt, Stream, StreamExt, Sink};
use futures::task::Spawn;

use proto::funder::messages::{FunderToChanneler, ChannelerToFunder};
Expand Down Expand Up @@ -100,20 +100,20 @@ where
// TODO: Possibly wait here in a smart way? Exponential backoff?
await!(sleep_ticks(backoff_ticks, timer_client.clone()))?;
Ok(())
});
}).fuse();

// TODO: Get rid of Box::pinned() later.
let mut new_address_fut = Box::pinned(async {
match await!(incoming_addresses.next()) {
Some(address) => ListenerSelect::IncomingAddress(address),
None => ListenerSelect::IncomingAddressClosed,
}
});
}).fuse();

// TODO: Could we possibly lose an incoming address with this select?
let listener_select = select! {
listener_fut => ListenerSelect::ListenerError(listener_fut.err().unwrap()),
new_address_fut => new_address_fut,
listener_fut = listener_fut => ListenerSelect::ListenerError(listener_fut.err().unwrap()),
new_address_fut = new_address_fut => new_address_fut,
};
// TODO: Make code nicer here somehow. Use scopes instead of drop?
drop(new_address_fut);
Expand Down
5 changes: 2 additions & 3 deletions components/funder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-funder"
version = "0.1.0"
Expand Down Expand Up @@ -27,7 +25,8 @@ bytes = "0.4"
# tokio-codec = "0.1"
# futures-await = "0.1"
futures-cpupool = "0.1.8"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

num-bigint = "0.2.0"
num-traits = "0.2.4"
Expand Down
5 changes: 2 additions & 3 deletions components/identity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-identity"
version = "0.1.0"
Expand All @@ -15,7 +13,8 @@ crypto = { path = "../crypto", version = "0.1.0" , package = "offst-crypto"}
# futures = "0.1.25"
# tokio-core = "0.1"
# futures-await = "0.1"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

[dev-dependencies]

5 changes: 2 additions & 3 deletions components/relay/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-relay"
version = "0.1.0"
Expand All @@ -15,5 +13,6 @@ timer = { path = "../timer", version = "0.1.0" , package = "offst-timer" }
proto = { path = "../proto", version = "0.1.0" , package = "offst-proto" }

log = "0.4"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

10 changes: 6 additions & 4 deletions components/relay/src/client/client_connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::pin::Pin;
use crypto::identity::PublicKey;
use futures::{future, FutureExt, TryFutureExt, StreamExt, SinkExt};
use futures::future::FutureObj;
use futures::{future, Future, FutureExt, TryFutureExt, StreamExt, SinkExt};
use futures::task::{Spawn, SpawnExt};
use futures::channel::mpsc;

Expand Down Expand Up @@ -106,11 +106,13 @@ where
type SendItem = Vec<u8>;
type RecvItem = Vec<u8>;

fn connect(&mut self, address: (A, PublicKey)) -> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>> {
fn connect<'a>(&'a mut self, address: (A, PublicKey))
-> Pin<Box<dyn Future<Output=Option<ConnPair<Self::SendItem, Self::RecvItem>>> + Send + 'a>> {

let (relay_address, remote_public_key) = address;
let relay_connect = self.relay_connect(relay_address, remote_public_key)
.map(|res| res.ok());
FutureObj::new(relay_connect.boxed())
Box::pinned(relay_connect)
}
}

Expand Down
13 changes: 7 additions & 6 deletions components/relay/src/client/client_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ where
let conn_timeout_ticks = usize_to_u64(conn_timeout_ticks).unwrap();
let mut fut_timeout = timer_stream
.take(conn_timeout_ticks)
.for_each(|_| future::ready(()));
let mut fut_connect = connector.connect(());
.for_each(|_| future::ready(()))
.fuse();
let mut fut_connect = connector.connect(()).fuse();

select! {
fut_timeout => None,
fut_connect => fut_connect,
fut_timeout = fut_timeout => None,
fut_connect = fut_connect => fut_connect,
}
}

Expand Down Expand Up @@ -507,7 +508,7 @@ mod tests {
async fn task_client_listener_basic(mut spawner: impl Spawn + Clone + Send + 'static) {
let (req_sender, mut req_receiver) = mpsc::channel(0);
let connector = DummyConnector::new(req_sender);
let (connections_sender, mut connections_receiver) = mpsc::channel(0);
let (connections_sender, connections_receiver) = mpsc::channel(0);
let conn_timeout_ticks = 8;
let keepalive_ticks = 16;
let (tick_sender, tick_receiver) = mpsc::channel(0);
Expand Down Expand Up @@ -587,7 +588,7 @@ mod tests {
// Listener will accept the connection:

// Listener will open a connection to the relay:
let (mut remote_sender, local_receiver) = mpsc::channel(0);
let (remote_sender, local_receiver) = mpsc::channel(0);
let (local_sender, mut remote_receiver) = mpsc::channel(0);
let conn_pair = ConnPair {
sender: local_sender,
Expand Down
7 changes: 4 additions & 3 deletions components/relay/src/client/connector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::FutureObj;
use core::pin::Pin;
use futures::channel::mpsc;
use futures::Future;

pub struct ConnPair<SendItem, RecvItem> {
pub sender: mpsc::Sender<SendItem>,
Expand All @@ -10,6 +11,6 @@ pub trait Connector {
type Address;
type SendItem;
type RecvItem;
fn connect(&mut self, address: Self::Address)
-> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>>;
fn connect<'a>(&'a mut self, address: Self::Address)
-> Pin<Box<dyn Future<Output=Option<ConnPair<Self::SendItem, Self::RecvItem>>> + Send + 'a>>;
}
9 changes: 4 additions & 5 deletions components/relay/src/client/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::pin::Pin;
use futures::channel::{mpsc, oneshot};
use futures::future::FutureObj;
use futures::{FutureExt, SinkExt};
use futures::{Future, FutureExt, SinkExt};
use super::connector::{Connector, ConnPair};


Expand Down Expand Up @@ -39,7 +39,7 @@ where
type SendItem = SI;
type RecvItem = RI;

fn connect(&mut self, address: A) -> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>> {
fn connect<'a>(&'a mut self, address: A) -> Pin<Box<dyn Future<Output=Option<ConnPair<Self::SendItem, Self::RecvItem>>> + Send + 'a>> {
let (response_sender, response_receiver) = oneshot::channel();
let conn_request = ConnRequest {
address,
Expand All @@ -50,8 +50,7 @@ where
await!(self.req_sender.send(conn_request)).unwrap();
await!(response_receiver).ok()
};
let future_obj = FutureObj::new(fut_conn_pair.boxed());
future_obj
Box::pinned(fut_conn_pair)
}
}

4 changes: 2 additions & 2 deletions components/relay/src/server/conn_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ where
// NOTE: This select is probably not Unpin. Maybe we need to implement our own?
async move {
select! {
fut_receiver => fut_receiver,
fut_time => fut_time,
fut_receiver = fut_receiver => fut_receiver,
fut_time = fut_time => fut_time,
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions components/secure_channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-secure-channel"
version = "0.1.0"
Expand All @@ -18,7 +16,8 @@ proto = { path = "../proto", version = "0.1.0" , package = "offst-proto" }
log = "0.4"
pretty_env_logger = "0.2"

futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

# futures = "0.1.25"
# tokio-core = "0.1"
Expand Down
5 changes: 2 additions & 3 deletions components/timer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-timer"
version = "0.1.0"
Expand All @@ -13,7 +11,8 @@ log = "0.4"
bytes = "0.4"
# futures-preview = "0.3.0-alpha.7"
# futures-util-preview = {version = "0.3.0-alpha.7", features = ["compat"]}
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

utils = { path = "../utils", version = "0.1.0" , package = "offst-utils"}

Expand Down
6 changes: 3 additions & 3 deletions components/timer/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

// #![deny(warnings)]

use core::pin::Pin;
use std::time::Duration;
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -163,7 +164,6 @@ mod tests {
use super::*;
use std::time::{Duration, Instant};
use futures::executor::LocalPool;
use futures::future::FutureObj;
// use core::pin::Pin;

#[test]
Expand Down Expand Up @@ -225,7 +225,7 @@ mod tests {
const TIMER_CLIENT_NUM: usize = 2;

let mut senders = Vec::new();
let mut joined_receivers = FutureObj::from(future::ready(()).boxed());
let mut joined_receivers = Box::pinned(future::ready(())) as Pin<Box<dyn Future<Output=()> + Send>>;

for _ in 0 .. TIMER_CLIENT_NUM {
let (sender, receiver) = oneshot::channel::<()>();
Expand All @@ -236,7 +236,7 @@ mod tests {
});

let new_join = joined_receivers.join(receiver).map(|_| ());
joined_receivers = FutureObj::from(new_join.boxed());
joined_receivers = Box::pinned(new_join) as Pin<Box<Future<Output=()> + Send>>;
}

let (sender_done, receiver_done) = oneshot::channel::<()>();
Expand Down
3 changes: 2 additions & 1 deletion components/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ log = "0.4"


bytes = "0.4"
futures-preview = { version = "0.3.0-alpha.9", features=["compat"]}
# futures-preview = { version = "0.3.0-alpha.9", features=["compat"]}
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202", features = ["compat"] }

tokio-io = "0.1"
tokio-core = "0.1"
Expand Down