Skip to content

Commit

Permalink
Merge pull request #134 from freedomlayer/real/fix/remove-future-obj
Browse files Browse the repository at this point in the history
Remove usage of FutureObj
  • Loading branch information
realcr authored Nov 27, 2018
2 parents c999e1e + d214289 commit 95b2221
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 193 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ matrix:
include:
- env: TARGET=x86_64-unknown-linux-gnu CC=gcc-6 CXX=g++-6 KCOV=1
os: linux
rust: nightly-2018-11-13
rust: nightly-2018-11-27
before_script:
- travis/trusty/before-script.sh
addons:
Expand All @@ -29,7 +29,7 @@ matrix:

- env: TARGET=x86_64-apple-darwin KCOV=0
os: osx
rust: nightly-2018-11-13
rust: nightly-2018-11-27
osx_image: xcode9.2
before_script:
- brew install capnp
Expand Down
337 changes: 200 additions & 137 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions components/channeler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-channeler"
version = "0.1.0"
Expand All @@ -18,5 +16,6 @@ relay = { path = "../relay", version = "0.1.0" , package = "offst-relay" }
secure_channel = { path = "../secure_channel", version = "0.1.0" , package = "offst-secure-channel" }

log = "0.4"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

15 changes: 8 additions & 7 deletions components/channeler/src/connector_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::FutureObj;
use futures::FutureExt;
use core::pin::Pin;

use futures::{Future, FutureExt};
use futures::task::Spawn;
use futures::future;

Expand Down Expand Up @@ -37,8 +38,8 @@ where
type SendItem = C::SendItem;
type RecvItem = C::RecvItem;

fn connect(&mut self, address: ())
-> FutureObj<Option<ConnPair<C::SendItem, C::RecvItem>>> {
fn connect<'a>(&'a mut self, address: ())
-> Pin<Box<dyn Future<Output=Option<ConnPair<C::SendItem, C::RecvItem>>> + Send + 'a>> {
self.connector.connect(self.address.clone())
}
}
Expand Down Expand Up @@ -88,8 +89,8 @@ where
type SendItem = Vec<u8>;
type RecvItem = Vec<u8>;

fn connect(&mut self, full_address: (PublicKey, A))
-> FutureObj<Option<ConnPair<Vec<u8>, Vec<u8>>>> {
fn connect<'a>(&'a mut self, full_address: (PublicKey, A))
-> Pin<Box<dyn Future<Output=Option<ConnPair<C::SendItem, C::RecvItem>>> + Send + 'a>> {

let (public_key, address) = full_address;
let fut = async move {
Expand All @@ -108,6 +109,6 @@ where
receiver: secure_channel.receiver,
})
};
FutureObj::new(fut.boxed())
Box::pinned(fut)
}
}
10 changes: 5 additions & 5 deletions components/channeler/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::marker::Unpin;
use futures::{select, future, Stream, StreamExt, Sink};
use futures::{select, future, FutureExt, Stream, StreamExt, Sink};
use futures::task::Spawn;

use proto::funder::messages::{FunderToChanneler, ChannelerToFunder};
Expand Down Expand Up @@ -100,20 +100,20 @@ where
// TODO: Possibly wait here in a smart way? Exponential backoff?
await!(sleep_ticks(backoff_ticks, timer_client.clone()))?;
Ok(())
});
}).fuse();

// TODO: Get rid of Box::pinned() later.
let mut new_address_fut = Box::pinned(async {
match await!(incoming_addresses.next()) {
Some(address) => ListenerSelect::IncomingAddress(address),
None => ListenerSelect::IncomingAddressClosed,
}
});
}).fuse();

// TODO: Could we possibly lose an incoming address with this select?
let listener_select = select! {
listener_fut => ListenerSelect::ListenerError(listener_fut.err().unwrap()),
new_address_fut => new_address_fut,
listener_fut = listener_fut => ListenerSelect::ListenerError(listener_fut.err().unwrap()),
new_address_fut = new_address_fut => new_address_fut,
};
// TODO: Make code nicer here somehow. Use scopes instead of drop?
drop(new_address_fut);
Expand Down
5 changes: 2 additions & 3 deletions components/funder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-funder"
version = "0.1.0"
Expand Down Expand Up @@ -27,7 +25,8 @@ bytes = "0.4"
# tokio-codec = "0.1"
# futures-await = "0.1"
futures-cpupool = "0.1.8"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

num-bigint = "0.2.0"
num-traits = "0.2.4"
Expand Down
5 changes: 2 additions & 3 deletions components/identity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-identity"
version = "0.1.0"
Expand All @@ -15,7 +13,8 @@ crypto = { path = "../crypto", version = "0.1.0" , package = "offst-crypto"}
# futures = "0.1.25"
# tokio-core = "0.1"
# futures-await = "0.1"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

[dev-dependencies]

5 changes: 2 additions & 3 deletions components/relay/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-relay"
version = "0.1.0"
Expand All @@ -15,5 +13,6 @@ timer = { path = "../timer", version = "0.1.0" , package = "offst-timer" }
proto = { path = "../proto", version = "0.1.0" , package = "offst-proto" }

log = "0.4"
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

10 changes: 6 additions & 4 deletions components/relay/src/client/client_connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::pin::Pin;
use crypto::identity::PublicKey;
use futures::{future, FutureExt, TryFutureExt, StreamExt, SinkExt};
use futures::future::FutureObj;
use futures::{future, Future, FutureExt, TryFutureExt, StreamExt, SinkExt};
use futures::task::{Spawn, SpawnExt};
use futures::channel::mpsc;

Expand Down Expand Up @@ -106,11 +106,13 @@ where
type SendItem = Vec<u8>;
type RecvItem = Vec<u8>;

fn connect(&mut self, address: (A, PublicKey)) -> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>> {
fn connect<'a>(&'a mut self, address: (A, PublicKey))
-> Pin<Box<dyn Future<Output=Option<ConnPair<Self::SendItem, Self::RecvItem>>> + Send + 'a>> {

let (relay_address, remote_public_key) = address;
let relay_connect = self.relay_connect(relay_address, remote_public_key)
.map(|res| res.ok());
FutureObj::new(relay_connect.boxed())
Box::pinned(relay_connect)
}
}

Expand Down
13 changes: 7 additions & 6 deletions components/relay/src/client/client_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ where
let conn_timeout_ticks = usize_to_u64(conn_timeout_ticks).unwrap();
let mut fut_timeout = timer_stream
.take(conn_timeout_ticks)
.for_each(|_| future::ready(()));
let mut fut_connect = connector.connect(());
.for_each(|_| future::ready(()))
.fuse();
let mut fut_connect = connector.connect(()).fuse();

select! {
fut_timeout => None,
fut_connect => fut_connect,
fut_timeout = fut_timeout => None,
fut_connect = fut_connect => fut_connect,
}
}

Expand Down Expand Up @@ -507,7 +508,7 @@ mod tests {
async fn task_client_listener_basic(mut spawner: impl Spawn + Clone + Send + 'static) {
let (req_sender, mut req_receiver) = mpsc::channel(0);
let connector = DummyConnector::new(req_sender);
let (connections_sender, mut connections_receiver) = mpsc::channel(0);
let (connections_sender, connections_receiver) = mpsc::channel(0);
let conn_timeout_ticks = 8;
let keepalive_ticks = 16;
let (tick_sender, tick_receiver) = mpsc::channel(0);
Expand Down Expand Up @@ -587,7 +588,7 @@ mod tests {
// Listener will accept the connection:

// Listener will open a connection to the relay:
let (mut remote_sender, local_receiver) = mpsc::channel(0);
let (remote_sender, local_receiver) = mpsc::channel(0);
let (local_sender, mut remote_receiver) = mpsc::channel(0);
let conn_pair = ConnPair {
sender: local_sender,
Expand Down
7 changes: 4 additions & 3 deletions components/relay/src/client/connector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::FutureObj;
use core::pin::Pin;
use futures::channel::mpsc;
use futures::Future;

pub struct ConnPair<SendItem, RecvItem> {
pub sender: mpsc::Sender<SendItem>,
Expand All @@ -10,6 +11,6 @@ pub trait Connector {
type Address;
type SendItem;
type RecvItem;
fn connect(&mut self, address: Self::Address)
-> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>>;
fn connect<'a>(&'a mut self, address: Self::Address)
-> Pin<Box<dyn Future<Output=Option<ConnPair<Self::SendItem, Self::RecvItem>>> + Send + 'a>>;
}
9 changes: 4 additions & 5 deletions components/relay/src/client/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::pin::Pin;
use futures::channel::{mpsc, oneshot};
use futures::future::FutureObj;
use futures::{FutureExt, SinkExt};
use futures::{Future, FutureExt, SinkExt};
use super::connector::{Connector, ConnPair};


Expand Down Expand Up @@ -39,7 +39,7 @@ where
type SendItem = SI;
type RecvItem = RI;

fn connect(&mut self, address: A) -> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>> {
fn connect<'a>(&'a mut self, address: A) -> Pin<Box<dyn Future<Output=Option<ConnPair<Self::SendItem, Self::RecvItem>>> + Send + 'a>> {
let (response_sender, response_receiver) = oneshot::channel();
let conn_request = ConnRequest {
address,
Expand All @@ -50,8 +50,7 @@ where
await!(self.req_sender.send(conn_request)).unwrap();
await!(response_receiver).ok()
};
let future_obj = FutureObj::new(fut_conn_pair.boxed());
future_obj
Box::pinned(fut_conn_pair)
}
}

4 changes: 2 additions & 2 deletions components/relay/src/server/conn_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ where
// NOTE: This select is probably not Unpin. Maybe we need to implement our own?
async move {
select! {
fut_receiver => fut_receiver,
fut_time => fut_time,
fut_receiver = fut_receiver => fut_receiver,
fut_time = fut_time => fut_time,
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions components/secure_channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-secure-channel"
version = "0.1.0"
Expand All @@ -18,7 +16,8 @@ proto = { path = "../proto", version = "0.1.0" , package = "offst-proto" }
log = "0.4"
pretty_env_logger = "0.2"

futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

# futures = "0.1.25"
# tokio-core = "0.1"
Expand Down
5 changes: 2 additions & 3 deletions components/timer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cargo-features = ["rename-dependency"]

[package]
name = "offst-timer"
version = "0.1.0"
Expand All @@ -13,7 +11,8 @@ log = "0.4"
bytes = "0.4"
# futures-preview = "0.3.0-alpha.7"
# futures-util-preview = {version = "0.3.0-alpha.7", features = ["compat"]}
futures-preview = "0.3.0-alpha.9"
# futures-preview = "0.3.0-alpha.9"
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202" }

utils = { path = "../utils", version = "0.1.0" , package = "offst-utils"}

Expand Down
6 changes: 3 additions & 3 deletions components/timer/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

// #![deny(warnings)]

use core::pin::Pin;
use std::time::Duration;
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -163,7 +164,6 @@ mod tests {
use super::*;
use std::time::{Duration, Instant};
use futures::executor::LocalPool;
use futures::future::FutureObj;
// use core::pin::Pin;

#[test]
Expand Down Expand Up @@ -225,7 +225,7 @@ mod tests {
const TIMER_CLIENT_NUM: usize = 2;

let mut senders = Vec::new();
let mut joined_receivers = FutureObj::from(future::ready(()).boxed());
let mut joined_receivers = Box::pinned(future::ready(())) as Pin<Box<dyn Future<Output=()> + Send>>;

for _ in 0 .. TIMER_CLIENT_NUM {
let (sender, receiver) = oneshot::channel::<()>();
Expand All @@ -236,7 +236,7 @@ mod tests {
});

let new_join = joined_receivers.join(receiver).map(|_| ());
joined_receivers = FutureObj::from(new_join.boxed());
joined_receivers = Box::pinned(new_join) as Pin<Box<Future<Output=()> + Send>>;
}

let (sender_done, receiver_done) = oneshot::channel::<()>();
Expand Down
3 changes: 2 additions & 1 deletion components/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ log = "0.4"


bytes = "0.4"
futures-preview = { version = "0.3.0-alpha.9", features=["compat"]}
# futures-preview = { version = "0.3.0-alpha.9", features=["compat"]}
futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "5d87cce202", features = ["compat"] }

tokio-io = "0.1"
tokio-core = "0.1"
Expand Down

0 comments on commit 95b2221

Please sign in to comment.