Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio support. #111

Merged
merged 52 commits into from
May 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
d52f69a
add websocket encode/decode codec for tokio
illegalprime Apr 4, 2017
09ad0a3
Added futures as an async dep.
illegalprime May 19, 2017
cd54128
Fixed bugs in codec, corrected masks.
illegalprime May 19, 2017
31c3199
Remove some async code when turned off.
illegalprime May 19, 2017
65ba247
Add custom buffer impl to be used with async.
illegalprime May 19, 2017
df1f87c
Fix writing large frames & fragmentation in codec.
illegalprime May 19, 2017
aff620e
Autobahn tests for client passes with tokio async.
illegalprime May 19, 2017
c8b5e07
Added async transformer for client.
illegalprime May 19, 2017
8ef9969
Added async client example.
illegalprime May 19, 2017
6a14d19
Removed Splittable for async, already exists.
illegalprime May 20, 2017
6295e7d
Added handshake codec and started on async builder.
illegalprime May 20, 2017
55f0081
Added semantic owned message type to deserialize to. Fixes #108
illegalprime May 21, 2017
004ddf7
Codec now has no lifetimes, better API.
illegalprime May 21, 2017
4079547
Simplified sender and receiver, received OwnedMessage now.
illegalprime May 21, 2017
a8e9b53
Added insecure async option to client builder.
illegalprime May 21, 2017
2f04ca9
Get rid of unused import warnings.
illegalprime May 21, 2017
94f69a2
Redid autobahn client test using new API.
illegalprime May 21, 2017
570d860
Added default constructors for codecs.
illegalprime May 22, 2017
08fff2e
Use tokio-io fork that allows acces to internal buffers.
illegalprime May 22, 2017
c6257d2
Can build async insecure clients in Builder.
illegalprime May 22, 2017
86e9229
Fully async-api-ed autobahn client harness.
illegalprime May 22, 2017
312c5a1
Updated async client example.
illegalprime May 22, 2017
5e6c03a
Fixed parsing bug in hyper codec.
illegalprime May 22, 2017
4077486
Reorganization into much cleaner structure.
illegalprime May 22, 2017
faf9350
Updated server example.
illegalprime May 22, 2017
2cd1381
Switched from openssl to native-tls.
illegalprime May 22, 2017
f89ab46
Added async-ssl feature.
illegalprime May 22, 2017
1b58dde
Updated all examples with new API.
illegalprime May 22, 2017
5e79dea
Created client/server http codecs, added async intows trait.
illegalprime May 22, 2017
3eec435
Added skeleton of async WsUpgrade.
illegalprime May 22, 2017
79f5856
Removed uneeded explicit lifetimes.
illegalprime May 22, 2017
db9ce91
Updated to use new upstream tokio-io.
illegalprime May 22, 2017
3998118
Implemented async IntoWs and WsUpgrade, only server is left.
illegalprime May 23, 2017
117c3f3
Split server into async and sync, implemented some server async.
illegalprime May 23, 2017
5238b56
Restructured to make crate prefer neither sync or async.
illegalprime May 23, 2017
80d6e4a
Test all feature combos on travis, added build script.
illegalprime May 24, 2017
b99efe6
Added async server impl.
illegalprime May 24, 2017
cbae391
Added async agnostic connect.
illegalprime May 24, 2017
39f6e55
Fix HTTP parsing error in codec.
illegalprime May 24, 2017
4e9c6f2
Matched sync server API with async, small fixes.
illegalprime May 24, 2017
7ffdf7e
Added async server example and updated examples.
illegalprime May 24, 2017
aacd6ab
Fixed multi client bug in async server example.
illegalprime May 25, 2017
7ee2a0d
Fixed http codec bug, server gives client IP in stream.
illegalprime May 25, 2017
a406e72
Added async autobahn server, better closing behaviour.
illegalprime May 25, 2017
7000b3a
Updated to published tokio-io crate.
illegalprime May 25, 2017
bc8a0a4
Updated doc examples and tests, added to travis.
illegalprime May 25, 2017
86aeb2a
Added async rw impl for rwpair.
illegalprime May 26, 2017
3782f0d
Fixed bug in async SSL connections.
illegalprime May 26, 2017
23aa593
Added more docs and examples.
illegalprime May 26, 2017
8764318
Added even more docs and examples.
illegalprime May 28, 2017
2307070
Finished docs and updated tests, removed warnings.
illegalprime May 28, 2017
b51db78
fixup
illegalprime May 28, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/target
/Cargo.lock

# emacs
*.#*.rs

# Windows image file caches
Thumbs.db
ehthumbs.db
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ before_script:

script:
- cargo fmt -- --write-mode=diff
- cargo build --features nightly
- ./scripts/build-all.sh
- cargo test --features nightly
- cargo bench --features nightly

Expand Down
31 changes: 18 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ license = "MIT"

[dependencies]
hyper = "^0.10.6"
unicase = "^1.0"
url = "^1.0"
bitflags = "^0.8"
rand = "^0.3"
byteorder = "^1.0"
sha1 = "^0.2"
openssl = { version = "^0.9.10", optional = true }
base64 = "^0.5"

[dev-dependencies]
serde_json = "^1.0"
unicase = "1.0"
url = "1.0"
bitflags = "0.8"
rand = "0.3"
byteorder = "1.0"
sha1 = "0.2"
base64 = "0.5"
futures = { version = "0.1", optional = true }
tokio-core = { version = "0.1", optional = true }
tokio-io = { version = "^0.1.2", optional = true }
tokio-tls = { version = "0.1", optional = true }
bytes = { version = "0.4", optional = true }
native-tls = { version = "^0.1.2", optional = true }

[features]
default = ["ssl"]
ssl = ["openssl"]
default = ["sync", "sync-ssl", "async", "async-ssl"]
sync = []
sync-ssl = ["native-tls", "sync"]
async = ["tokio-core", "tokio-io", "bytes", "futures"]
async-ssl = ["native-tls", "tokio-tls", "async"]
nightly = ["hyper/nightly"]
12 changes: 6 additions & 6 deletions autobahn/client-results.json
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@
"reportfile": "rust_websocket_case_3_2.json"
},
"3.3": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2,
"remoteCloseCode": null,
Expand Down Expand Up @@ -2773,28 +2773,28 @@
"reportfile": "rust_websocket_case_6_3_2.json"
},
"6.4.1": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2002,
"remoteCloseCode": null,
"reportfile": "rust_websocket_case_6_4_1.json"
},
"6.4.2": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2002,
"remoteCloseCode": null,
"reportfile": "rust_websocket_case_6_4_2.json"
},
"6.4.3": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2003,
"remoteCloseCode": null,
"reportfile": "rust_websocket_case_6_4_3.json"
},
"6.4.4": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2002,
"remoteCloseCode": null,
Expand Down Expand Up @@ -3634,4 +3634,4 @@
"reportfile": "rust_websocket_case_9_8_6.json"
}
}
}
}
10 changes: 5 additions & 5 deletions autobahn/server-results.json
Original file line number Diff line number Diff line change
Expand Up @@ -2773,28 +2773,28 @@
"reportfile": "rust_websocket_case_6_3_2.json"
},
"6.4.1": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2002,
"remoteCloseCode": null,
"reportfile": "rust_websocket_case_6_4_1.json"
},
"6.4.2": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2001,
"remoteCloseCode": null,
"reportfile": "rust_websocket_case_6_4_2.json"
},
"6.4.3": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2003,
"remoteCloseCode": null,
"reportfile": "rust_websocket_case_6_4_3.json"
},
"6.4.4": {
"behavior": "NON-STRICT",
"behavior": "OK",
"behaviorClose": "OK",
"duration": 2003,
"remoteCloseCode": null,
Expand Down Expand Up @@ -3634,4 +3634,4 @@
"reportfile": "rust_websocket_case_9_8_6.json"
}
}
}
}
109 changes: 109 additions & 0 deletions examples/async-autobahn-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
extern crate websocket;
extern crate tokio_core;
extern crate futures;

use websocket::{ClientBuilder, OwnedMessage};
use websocket::result::WebSocketError;
use tokio_core::reactor::Core;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::Future;
use futures::future::{self, Loop};

fn main() {
let addr = "ws://127.0.0.1:9001".to_string();
let agent = "rust-websocket";
let mut core = Core::new().unwrap();
let handle = core.handle();

println!("Using fuzzingserver {}", addr);
println!("Using agent {}", agent);

let case_count = get_case_count(addr.clone(), &mut core);
println!("We will be running {} test cases!", case_count);

println!("Running test suite...");
for case_id in 1..(case_count + 1) {
let url = addr.clone() + "/runCase?case=" + &case_id.to_string()[..] + "&agent=" + agent;

let test_case = ClientBuilder::new(&url)
.unwrap()
.async_connect_insecure(&handle)
.and_then(move |(duplex, _)| {
println!("Executing test case: {}/{}", case_id, case_count);
future::loop_fn(duplex, |stream| {
stream.into_future()
.or_else(|(err, stream)| {
println!("Could not receive message: {:?}", err);
stream.send(OwnedMessage::Close(None)).map(|s| (None, s))
})
.and_then(|(msg, stream)| match msg {
Some(OwnedMessage::Text(txt)) => {
stream.send(OwnedMessage::Text(txt))
.map(|s| Loop::Continue(s))
.boxed()
}
Some(OwnedMessage::Binary(bin)) => {
stream.send(OwnedMessage::Binary(bin))
.map(|s| Loop::Continue(s))
.boxed()
}
Some(OwnedMessage::Ping(data)) => {
stream.send(OwnedMessage::Pong(data))
.map(|s| Loop::Continue(s))
.boxed()
}
Some(OwnedMessage::Close(_)) => {
stream.send(OwnedMessage::Close(None))
.map(|_| Loop::Break(()))
.boxed()
}
Some(OwnedMessage::Pong(_)) => {
future::ok(Loop::Continue(stream)).boxed()
}
None => future::ok(Loop::Break(())).boxed(),
})
})
})
.map(move |_| {
println!("Test case {} is finished!", case_id);
})
.or_else(move |err| {
println!("Test case {} ended with an error: {:?}", case_id, err);
Ok(()) as Result<(), ()>
});

core.run(test_case).ok();
}

update_reports(addr.clone(), agent, &mut core);
println!("Test suite finished!");
}

fn get_case_count(addr: String, core: &mut Core) -> usize {
let url = addr + "/getCaseCount";
let err = "Unsupported message in /getCaseCount";

let counter = ClientBuilder::new(&url)
.unwrap()
.async_connect_insecure(&core.handle())
.and_then(|(s, _)| s.into_future().map_err(|e| e.0))
.and_then(|(msg, _)| match msg {
Some(OwnedMessage::Text(txt)) => Ok(txt.parse().unwrap()),
_ => Err(WebSocketError::ProtocolError(err)),
});
core.run(counter).unwrap()
}

fn update_reports(addr: String, agent: &str, core: &mut Core) {
println!("Updating reports...");
let url = addr + "/updateReports?agent=" + agent;

let updater = ClientBuilder::new(&url)
.unwrap()
.async_connect_insecure(&core.handle())
.and_then(|(sink, _)| sink.send(OwnedMessage::Close(None)));
core.run(updater).unwrap();

println!("Reports updated.");
}
54 changes: 54 additions & 0 deletions examples/async-autobahn-server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
extern crate websocket;
extern crate futures;
extern crate tokio_core;

use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;

use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};

fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
// bind to the server
let server = Server::bind("127.0.0.1:9002", &handle).unwrap();

// time to build the server's future
// this will be a struct containing everything the server is going to do

// a stream of incoming connections
let f = server.incoming()
// we don't wanna save the stream if it drops
.map_err(|InvalidConnection { error, .. }| error)
.for_each(|(upgrade, addr)| {
// accept the request to be a ws connection
println!("Got a connection from: {}", addr);
let f = upgrade
.accept()
.and_then(|(s, _)| {
// simple echo server impl
let (sink, stream) = s.split();
stream
.take_while(|m| Ok(!m.is_close()))
.filter_map(|m| {
match m {
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
OwnedMessage::Pong(_) => None,
_ => Some(m),
}
})
.forward(sink)
.and_then(|(_, sink)| {
sink.send(OwnedMessage::Close(None))
})
});

handle.spawn(f.map_err(move |e| println!("{}: '{:?}'", addr, e))
.map(move |_| println!("{} closed.", addr)));
Ok(())
});

core.run(f).unwrap();
}
65 changes: 65 additions & 0 deletions examples/async-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
extern crate websocket;
extern crate futures;
extern crate tokio_core;

use std::thread;
use std::io::stdin;
use tokio_core::reactor::Core;
use futures::future::Future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc;
use websocket::result::WebSocketError;
use websocket::{ClientBuilder, OwnedMessage};

const CONNECTION: &'static str = "ws://127.0.0.1:2794";

fn main() {
println!("Connecting to {}", CONNECTION);
let mut core = Core::new().unwrap();

// standard in isn't supported in mio yet, so we use a thread
// see https://github.com/carllerche/mio/issues/321
let (usr_msg, stdin_ch) = mpsc::channel(0);
thread::spawn(move || {
let mut input = String::new();
let mut stdin_sink = usr_msg.wait();
loop {
input.clear();
stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();

let (close, msg) = match trimmed {
"/close" => (true, OwnedMessage::Close(None)),
"/ping" => (false, OwnedMessage::Ping(b"PING".to_vec())),
_ => (false, OwnedMessage::Text(trimmed.to_string())),
};

stdin_sink.send(msg)
.expect("Sending message across stdin channel.");

if close {
break;
}
}
});

let runner = ClientBuilder::new(CONNECTION)
.unwrap()
.add_protocol("rust-websocket")
.async_connect_insecure(&core.handle())
.and_then(|(duplex, _)| {
let (sink, stream) = duplex.split();
stream.filter_map(|message| {
println!("Received Message: {:?}", message);
match message {
OwnedMessage::Close(e) => Some(OwnedMessage::Close(e)),
OwnedMessage::Ping(d) => Some(OwnedMessage::Pong(d)),
_ => None,
}
})
.select(stdin_ch.map_err(|_| WebSocketError::NoDataAvailable))
.forward(sink)
});
core.run(runner).unwrap();
}
Loading