Skip to content

Commit

Permalink
provide a sync api
Browse files Browse the repository at this point in the history
This is an attempt to provide a sync api, which makes much more sense
for expect. The idea is to run the event loop in a separate thread so
that we can call `Future::wait()` in the client handle without
preventing the event loop from making progress.

Note that this currently rely on
rust-lang/rust#47760

Fwiw, running our own even loop is not considered good practice, but it
the approach reqwest has taken too, so I guess it's not that bad.
  • Loading branch information
little-dude committed Jan 27, 2018
1 parent 83f79d0 commit d9cca33
Showing 1 changed file with 67 additions and 24 deletions.
91 changes: 67 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::fmt;
use std::io::{self, Read, Write};
use std::collections::VecDeque;
use std::process::Command;
use std::thread;
use std::time::Duration;

use futures::{Async, Canceled, Future, Poll, Stream};
Expand Down Expand Up @@ -79,6 +80,8 @@ pub struct Session {
/// FIFO storage for the matching requests. Requests are processed one after another, not
/// concurrently.
match_requests: VecDeque<ActiveMatchRequest>,

drop_rx: oneshot::Receiver<()>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -140,40 +143,48 @@ impl fmt::Display for MatchError {

type MatchOutcome = Result<(usize, Vec<u8>), MatchError>;

#[derive(Clone)]
pub struct Handle {
match_requests_tx: mpsc::UnboundedSender<MatchRequest>,
input_requests_tx: mpsc::UnboundedSender<InputRequest>,
thread: Option<thread::JoinHandle<()>>,
drop_tx: Option<oneshot::Sender<()>>,
}

impl Handle {
pub fn send(&self, bytes: Vec<u8>) -> Box<Future<Item = (), Error = Canceled>> {
pub fn send(&self, bytes: Vec<u8>) -> () {
let handle = self.clone();
let (response_tx, response_rx) = oneshot::channel::<()>();
handle
.input_requests_tx
.unbounded_send(InputRequest(bytes, response_tx))
.unwrap();
Box::new(response_rx)
response_rx.wait().unwrap()
}

pub fn expect(
&mut self,
matches: Vec<Match>,
timeout: Option<Duration>,
) -> Box<Future<Item = MatchOutcome, Error = ()>> {
) -> Result<(usize, std::vec::Vec<u8>), MatchError> {
let (response_tx, response_rx) = oneshot::channel::<MatchOutcome>();
let request = MatchRequest {
matches,
response_tx,
timeout,
};
let handle = self.clone();
handle.match_requests_tx.unbounded_send(request).unwrap();
Box::new(response_rx.map_err(|_| ()))
self.match_requests_tx.unbounded_send(request).unwrap();
response_rx.wait().unwrap()
}
}

impl Drop for Handle {
fn drop(&mut self) {
self.drop_tx.take().unwrap().send(()).unwrap();
self.thread.take().unwrap().join();
}
}


// TODO:
//
// - Make stdin evented PollEvented?
Expand All @@ -184,25 +195,46 @@ impl Handle {
// precedence over the MIN and TIME settings.

impl Session {
pub fn spawn(cmd: Command, handle: &TokioHandle) -> Result<Handle, ()> {
pub fn spawn(cmd: Command) -> Result<Handle, ()> {
debug!("spawning new command {:?}", cmd);
let (input_tx, input_rx) = mpsc::unbounded::<InputRequest>();
let (match_tx, match_rx) = mpsc::unbounded::<MatchRequest>();
let mut pty = Pty::new::<::std::fs::File>(None, handle).unwrap();
let mut _child = pty.spawn(cmd).unwrap();
let session = Session {
pty: pty,
handle: handle.clone(),
buffer: Vec::new(),
input_requests_rx: input_rx,
match_requests_rx: match_rx,
input_requests: VecDeque::new(),
match_requests: VecDeque::new(),
};
handle.spawn(session);
let (drop_tx, drop_rx) = oneshot::channel::<()>();

// spawn the core future in a separate thread.
//
// it's bad practice to spawn our own core but otherwise, it's complicated to provide a
// synchronous client, since calling `wait()` blocks the current thread, preventing the
// event loop from making progress... But running the event loop in a separate thread, we
// can call `wait()` in the client.
let thread = thread::Builder::new()
.name("expect-internal-core".into())
.spawn(move || {
use tokio_core::reactor::Core;
let mut core = Core::new().unwrap();

let mut pty = Pty::new::<::std::fs::File>(None, &core.handle()).unwrap();
// FIXME: I guess we should do something with the child?
let _child = pty.spawn(cmd).unwrap();

let session = Session {
pty: pty,
handle: core.handle(),
buffer: Vec::new(),
input_requests_rx: input_rx,
match_requests_rx: match_rx,
input_requests: VecDeque::new(),
match_requests: VecDeque::new(),
drop_rx: drop_rx,
};
core.run(session);
})
.unwrap();
Ok(Handle {
match_requests_tx: match_tx.clone(),
input_requests_tx: input_tx.clone(),
thread: Some(thread),
drop_tx: Some(drop_tx),
})
}

Expand All @@ -216,7 +248,7 @@ impl Session {
if size == req.0.len() {
return Ok(Async::Ready(()));
}
// FIXME: do we need to check if we wrote 0 bytes to avoid infinite looping?
// FIXME: do we need to check if we wrote 0 bytes to avoid infinite loops?
continue;
}
Err(e) => {
Expand Down Expand Up @@ -459,10 +491,21 @@ impl Future for Session {
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.get_input_requests().unwrap();
self.get_match_requests().unwrap();
if let Err(_e) = self.get_input_requests() {
return Err(());
}
if let Err(_e) = self.get_match_requests() {
return Err(());
}
self.process_input();
self.process_matches().unwrap();
if let Err(_e) = self.process_matches() {
return Err(());
}
match self.drop_rx.poll() {
Ok(Async::Ready(())) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => {},
Err(Canceled) => return Err(()),
}
Ok(Async::NotReady)
}
}

0 comments on commit d9cca33

Please sign in to comment.