Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/http): websocket support #10359

Merged
merged 44 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
c17a720
add websocket upgrade
crowlKats Apr 16, 2021
e63b02b
Merge branch 'main' into native_http_websocket
crowlKats Apr 24, 2021
6de42f0
add ws
crowlKats Apr 24, 2021
a313886
handle sec-websocket-key
crowlKats Apr 24, 2021
a94cbca
handle closing and add typings
crowlKats Apr 25, 2021
de350f5
basic error handling
crowlKats Apr 25, 2021
dc9056d
Update cli/dts/lib.deno.unstable.d.ts
crowlKats May 5, 2021
48e0e23
rename upgradeWebSocket
crowlKats May 5, 2021
2499a5a
Merge branch 'main' into native_http_websocket
crowlKats May 5, 2021
b494a44
clean up
crowlKats May 5, 2021
35d375a
clean up
crowlKats May 5, 2021
85639fb
clean up
crowlKats May 5, 2021
e855c63
Update runtime/js/40_http.js
crowlKats May 5, 2021
b8aa871
fix
crowlKats May 5, 2021
afe1d99
Merge branch 'main' into native_http_websocket
crowlKats Jun 16, 2021
e149890
Merge branch 'main' into native_http_websocket
crowlKats Jun 21, 2021
d4a51bf
work
crowlKats Jun 21, 2021
613636d
wip
lucacasonato Jun 16, 2021
9b32625
work
crowlKats Jun 21, 2021
fa193cf
fix
crowlKats Jun 22, 2021
dddfc29
work
crowlKats Jun 30, 2021
7de7397
fix
lucacasonato Jun 30, 2021
0a70386
cleanup
lucacasonato Jun 30, 2021
4fe1e79
it works!
lucacasonato Jun 30, 2021
aad5ee6
don't leak resources
lucacasonato Jun 30, 2021
087aed1
clean up & remove hyper_tungstenite
crowlKats Jul 1, 2021
15ae4c0
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
9e2b416
clean up
crowlKats Jul 1, 2021
2dc7e8c
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
66ba74d
clean up
crowlKats Jul 1, 2021
d940e7e
add test & lint
crowlKats Jul 1, 2021
c86e7bb
add protocol selection support
crowlKats Jul 1, 2021
79c324e
fix
crowlKats Jul 1, 2021
54941c1
fix
crowlKats Jul 1, 2021
588c563
errors
crowlKats Jul 1, 2021
8a1f533
adjust test
crowlKats Jul 1, 2021
8f99e87
adjust test
crowlKats Jul 1, 2021
46d6591
fix
crowlKats Jul 2, 2021
17d5038
use boxed dyn Future for connection
lucacasonato Jul 7, 2021
65536d1
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 7, 2021
05b8166
lint + fmt
lucacasonato Jul 7, 2021
fc8b27f
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 8, 2021
8a9e486
generate accept header sync
crowlKats Jul 8, 2021
ac56367
Merge branch 'main' into native_http_websocket
lucacasonato Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,44 @@ declare namespace Deno {

export function memoryUsage(): MemoryUsage;

export interface WebSocketConnEventString {
kind: "text";
value: string;
}

export interface WebSocketConnEventUint8Array {
kind: "binary" | "pong" | "ping";
value: Uint8Array;
}

export interface WebSocketConnEventClose {
kind: "close";
value: { code: number; reason: string };
}

export type WebSocketConnEvent =
| WebSocketConnEventString
| WebSocketConnEventUint8Array
| WebSocketConnEventClose;

export interface WebSocketConn extends AsyncIterable<WebSocketConnEvent> {
send(kind: "text", data: string): Promise<void>;
send(kind: "binary", data: Uint8Array): Promise<void>;
send(kind: "pong", data: Uint8Array): Promise<void>;
send(kind: "ping", data: Uint8Array): Promise<void>;

close(code?: number, reason?: string): Promise<void>;
}

export interface WebSocketUpgrade {
response: Response;
ws: WebSocketConn;
}

export interface RequestEvent {
readonly request: Request;
respondWith(r: Response | Promise<Response>): Promise<void>;
upgradeWebsocket(): Promise<WebSocketUpgrade>;
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
}

export interface HttpConn extends AsyncIterable<RequestEvent> {
Expand Down
45 changes: 23 additions & 22 deletions op_crates/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use std::io::Cursor;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
use tokio_tungstenite::tungstenite::Error as TungsteniteError;
Expand Down Expand Up @@ -60,24 +62,22 @@ impl WebSocketPermissions for NoWebSocketPermissions {
}
}

type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
struct WsStreamResource {
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>,
pub type WsStream = MaybeTlsStream<TcpStream>;
pub struct WsStreamResource<T: AsyncRead + AsyncWrite> {
pub tx: AsyncRefCell<SplitSink<WebSocketStream<T>, Message>>,
pub rx: AsyncRefCell<SplitStream<WebSocketStream<T>>>,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
// canceled, while 'write' ops are allowed to complete. Therefore only
// 'read' futures are attached to this cancel handle.
cancel: CancelHandle,
pub cancel: CancelHandle,
}

impl Resource for WsStreamResource {
impl<T: AsyncRead + AsyncWrite> Resource for WsStreamResource<T> {
fn name(&self) -> Cow<str> {
"webSocketStream".into()
}
}

impl WsStreamResource {}

// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
Expand Down Expand Up @@ -183,7 +183,7 @@ where
_ => unreachable!(),
};

let (stream, response): (WsStream, Response) =
let (stream, response): (WebSocketStream<WsStream>, Response) =
client_async(request, socket).await.map_err(|err| {
type_error(format!(
"failed to connect to WebSocket: {}",
Expand Down Expand Up @@ -226,23 +226,24 @@ pub struct SendArgs {
text: Option<String>,
}

pub async fn op_ws_send(
pub async fn op_ws_send<T: AsyncRead + AsyncWrite>(
state: Rc<RefCell<OpState>>,
args: SendArgs,
buf: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
let msg = match args.kind.as_str() {
"text" => Message::Text(args.text.unwrap()),
"binary" => Message::Binary(buf.ok_or_else(null_opbuf)?.to_vec()),
"pong" => Message::Pong(vec![]),
"pong" => Message::Pong(buf.ok_or_else(null_opbuf)?.to_vec()),
"ping" => Message::Ping(buf.ok_or_else(null_opbuf)?.to_vec()),
_ => unreachable!(),
};
let rid = args.rid;

let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource<T>>(rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
Expand All @@ -257,7 +258,7 @@ pub struct CloseArgs {
reason: Option<String>,
}

pub async fn op_ws_close(
pub async fn op_ws_close<T: AsyncRead + AsyncWrite>(
state: Rc<RefCell<OpState>>,
args: CloseArgs,
_bufs: Option<ZeroCopyBuf>,
Expand All @@ -274,7 +275,7 @@ pub async fn op_ws_close(
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource<T>>(rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
Expand All @@ -287,21 +288,21 @@ pub enum NextEventResponse {
String(String),
Binary(Vec<u8>),
Close { code: u16, reason: String },
Ping,
Pong,
Error,
Ping(Vec<u8>),
Pong(Vec<u8>),
Error(String),
Closed,
}

pub async fn op_ws_next_event(
pub async fn op_ws_next_event<T: AsyncRead + AsyncWrite>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_bufs: Option<ZeroCopyBuf>,
) -> Result<NextEventResponse, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource<T>>(rid)
.ok_or_else(bad_resource_id)?;

let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
Expand All @@ -321,9 +322,9 @@ pub async fn op_ws_next_event(
code: 1005,
reason: String::new(),
},
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
Some(Err(_)) => NextEventResponse::Error,
Some(Ok(Message::Ping(v))) => NextEventResponse::Ping(v),
Some(Ok(Message::Pong(v))) => NextEventResponse::Pong(v),
Some(Err(e)) => NextEventResponse::Error(e.to_string()),
None => {
state.borrow_mut().resource_table.close(rid).unwrap();
NextEventResponse::Closed
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ deno_webidl = { path = "../op_crates/webidl", version = "0.5.0" }
deno_websocket = { path = "../op_crates/websocket", version = "0.10.0" }

atty = "0.2.14"
base64 = "0.13.0"
bytes = "1"
dlopen = "0.1.8"
encoding_rs = "0.8.28"
Expand Down
70 changes: 69 additions & 1 deletion runtime/js/40_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@
const request = fromInnerRequest(innerRequest, "immutable");

const respondWith = createRespondWith(responseSenderRid, this.#rid);
const upgradeWebsocket = createUpgradeWebsocket(requestBodyRid);

return { request, respondWith };
return { request, respondWith, upgradeWebsocket };
Copy link

@ije ije Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use upgrade instead of upgradeWebsocket? a simple API can get better dev experience.

Copy link
Member Author

@crowlKats crowlKats Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, upgrading isn't something exclusive to websockets. that would end up being misleading and confusing.
9 more characters in a function name used in a single place wont affect dev experience.
Of course that would make sense if we would support upgrading other kind of conns out of the box

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course that would make sense if we would support upgrading other kind of conns out of the box

Other hypothetical upgrades would have different return types, so they should just have individual names anyway.

}

/** @returns {void} */
Expand Down Expand Up @@ -219,6 +220,73 @@
});
}

function createUpgradeWebsocket(rid) {
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
return async function upgradeWebsocket() {
const { key, rid } = await core.opAsync(
"op_http_upgrade_websocket",
rid,
);

const response = new Response(undefined, {
status: 101,
headers: {
"Upgrade": "websocket",
"Connection": "Upgrade",
"Sec-WebSocket-Accept": key,
},
});

const ws = {
async *[Symbol.asyncIterator]() {
let data = await core.opAsync(
"op_http_ws_next_event",
rid,
);

if (data.kind === "error") {
throw new Error(data.value); // TODO
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
} else if (data.kind === "close") {
return data;
} else {
if (Array.isArray(data.value)) {
data.value = new Uint8Array(data.value);
}
crowlKats marked this conversation as resolved.
Show resolved Hide resolved

yield data;
}
},
async send(kind, data) {
switch (kind) {
case "text":
await core.opAsync("op_http_ws_close", {
rid,
kind,
text: data,
});
break;
case "binary":
case "pong":
case "ping":
await core.opAsync("op_http_ws_close", {
rid,
kind,
}, data);
break;
}
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
},
async close(code, reason) {
await core.opAsync("op_http_ws_close", {
rid,
code,
reason,
});
},
};

return { response, ws };
};
}

window.__bootstrap.http = {
serveHttp,
};
Expand Down
76 changes: 76 additions & 0 deletions runtime/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use deno_core::error::null_opbuf;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::stream::SplitStream;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
Expand All @@ -19,11 +20,15 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_websocket::tokio_tungstenite::tungstenite::Message;
use deno_websocket::tokio_tungstenite::WebSocketStream;
use deno_websocket::WsStreamResource;
use hyper::body::HttpBody;
use hyper::http;
use hyper::server::conn::Connection;
use hyper::server::conn::Http;
use hyper::service::Service as HyperService;
use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Request;
use hyper::Response;
Expand Down Expand Up @@ -52,6 +57,23 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_async(rt, "op_http_response", op_http_response);
super::reg_async(rt, "op_http_response_write", op_http_response_write);
super::reg_async(rt, "op_http_response_close", op_http_response_close);

super::reg_async(rt, "op_http_upgrade_websocket", op_http_upgrade_websocket);
super::reg_async(
rt,
"op_http_ws_send",
deno_websocket::op_ws_send::<WebSocketStream<Upgraded>>,
);
super::reg_async(
rt,
"op_http_ws_close",
deno_websocket::op_ws_close::<WebSocketStream<Upgraded>>,
);
super::reg_async(
rt,
"op_http_ws_next_event",
deno_websocket::op_ws_next_event::<WebSocketStream<Upgraded>>,
);
}

struct ServiceInner {
Expand Down Expand Up @@ -502,6 +524,60 @@ async fn op_http_response_write(
Ok(())
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpUpgradeWebsocketResponse {
rid: ResourceId,
key: String,
}

async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_data: Option<ZeroCopyBuf>,
) -> Result<HttpUpgradeWebsocketResponse, AnyError> {
let conn_resource = state
.borrow()
.resource_table
.get::<ConnResource>(rid)
.ok_or_else(bad_resource_id)?;

let service = conn_resource.deno_service.inner.borrow_mut().ok_or("")?; // TODO

let key = service
.request
.headers()
.get(http::header::SEC_WEBSOCKET_KEY)
.ok_or("failed to read ws key from headers")?; // TODO
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key.to_str()?).as_bytes(),
);
let return_key = base64::encode(digest);

let upgraded = service.request.on_upgrade().await?;
let stream =
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
None,
)
.await;

let (ws_tx, ws_rx) = stream.split();

let rid = state.borrow_mut().resource_table.add(WsStreamResource {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
cancel: Default::default(),
});

Ok(HttpUpgradeWebsocketResponse {
key: return_key,
rid,
})
}

type BytesStream =
Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;

Expand Down
Loading