Skip to content

Commit

Permalink
Adapted websocket adapter for RemoteAddr
Browse files Browse the repository at this point in the history
  • Loading branch information
lemunozm committed Feb 16, 2021
1 parent 24ea9f5 commit e725412
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
39 changes: 30 additions & 9 deletions src/adapters/web_socket.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
use crate::adapter::{Resource, Adapter, ActionHandler, EventHandler, SendStatus, AcceptedType, ReadStatus};
use crate::adapter::{
Resource, Adapter, ActionHandler, EventHandler, SendStatus, AcceptedType, ReadStatus,
};
use crate::remote_addr::{RemoteAddr};
use crate::util::{OTHER_THREAD_ERR};

use mio::event::{Source};
use mio::net::{TcpStream, TcpListener};

use tungstenite::protocol::{WebSocket, Message};
use tungstenite::server::{accept as ws_accept};
use tungstenite::client::{client as ws_client};
use tungstenite::client::{client as ws_connect};
use tungstenite::handshake::{HandshakeError};
use tungstenite::error::{Error};

use url::Url;

use std::sync::{Mutex};
use std::net::{SocketAddr, TcpStream as StdTcpStream};
use std::io::{self, ErrorKind};

/// Max message size
// From https://docs.rs/tungstenite/0.13.0/src/tungstenite/protocol/mod.rs.html#65
pub const MAX_WS_PAYLOAD_LEN: usize = 64 << 20;

pub struct ClientResource(Mutex<WebSocket<TcpStream>>);
impl Resource for ClientResource {
fn source(&mut self) -> &mut dyn Source {
Expand Down Expand Up @@ -45,7 +54,18 @@ impl ActionHandler for WsActionHandler {
type Remote = ClientResource;
type Listener = ServerResource;

fn connect(&mut self, addr: SocketAddr) -> io::Result<ClientResource> {
fn connect(&mut self, remote_addr: RemoteAddr) -> io::Result<(ClientResource, SocketAddr)> {
let (addr, url) = match remote_addr {
RemoteAddr::SocketAddr(addr) => (addr, Url::parse(&format!("ws://{}/message-io-default", addr)).unwrap()),
RemoteAddr::Url(url) => {
let addr = url.socket_addrs(|| match url.scheme() {
"ws" => Some(80), // Plain
"wss" => Some(443), //Tls
_ => None,
}).unwrap()[0];
(addr, url)
}
};
// Synchronous tcp handshake
let stream = StdTcpStream::connect(addr)?;

Expand All @@ -54,10 +74,10 @@ impl ActionHandler for WsActionHandler {
let stream = TcpStream::from_std(stream);

// Synchronous waiting for web socket handshake
let mut handshake_result = ws_client(format!("ws://{}/socket", addr), stream);
let mut handshake_result = ws_connect(url, stream);
loop {
match handshake_result {
Ok((ws_socket, _)) => break Ok(ClientResource(Mutex::new(ws_socket))),
Ok((ws_socket, _)) => break Ok((ClientResource(Mutex::new(ws_socket)), addr)),
Err(HandshakeError::Interrupted(mid_handshake)) => {
handshake_result = mid_handshake.handshake();
}
Expand All @@ -79,7 +99,7 @@ impl ActionHandler for WsActionHandler {
loop {
match result {
Ok(_) => break SendStatus::Sent,
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
result = socket.write_pending();
}
Err(_) => break SendStatus::ResourceNotFound,
Expand Down Expand Up @@ -115,15 +135,16 @@ impl EventHandler for WsEventHandler {
fn read_event(
&mut self,
resource: &ClientResource,
process_data: &dyn Fn(&[u8])
) -> ReadStatus {
process_data: &dyn Fn(&[u8]),
) -> ReadStatus
{
loop {
match resource.0.lock().expect(OTHER_THREAD_ERR).read_message() {
Ok(message) => match message {
Message::Binary(data) => process_data(&data),
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
}
},
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
break ReadStatus::WaitNextEvent
}
Expand Down
5 changes: 4 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub const BIG_MESSAGE_SIZE: usize = 1 << 20; // 1MB

// 8 is the Vec head offset added in serialization.
pub const MAX_SIZE_BY_UDP: usize = MAX_UDP_PAYLOAD_LEN - 8;
pub const MAX_SIZE_BY_WS: usize = MAX_UDP_PAYLOAD_LEN - 8;

lazy_static::lazy_static! {
pub static ref TIMEOUT: Duration = Duration::from_millis(5000);
Expand Down Expand Up @@ -181,6 +182,7 @@ fn ping_pong_client_manager_handle(
#[test_case(Transport::Tcp, 1)]
#[test_case(Transport::Tcp, 100)]
#[test_case(Transport::Ws, 1)]
#[test_case(Transport::Ws, 100)]
// NOTE: A medium-high `clients` value can exceeds the "open file" limits of an OS in CI
// with a very obfuscated error message.
fn ping_pong(transport: Transport, clients: usize) {
Expand All @@ -193,8 +195,9 @@ fn ping_pong(transport: Transport, clients: usize) {
client_handle.join().unwrap();
}

#[test_case(Transport::Udp, MAX_SIZE_BY_UDP)]
#[test_case(Transport::Tcp, BIG_MESSAGE_SIZE)]
#[test_case(Transport::Udp, MAX_SIZE_BY_UDP)]
#[test_case(Transport::Ws, MAX_SIZE_BY_WS + 10000)]
fn message_size(transport: Transport, message_size: usize) {
//util::init_logger();

Expand Down

0 comments on commit e725412

Please sign in to comment.