Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/http): websocket support #10359

Merged
merged 44 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
c17a720
add websocket upgrade
crowlKats Apr 16, 2021
e63b02b
Merge branch 'main' into native_http_websocket
crowlKats Apr 24, 2021
6de42f0
add ws
crowlKats Apr 24, 2021
a313886
handle sec-websocket-key
crowlKats Apr 24, 2021
a94cbca
handle closing and add typings
crowlKats Apr 25, 2021
de350f5
basic error handling
crowlKats Apr 25, 2021
dc9056d
Update cli/dts/lib.deno.unstable.d.ts
crowlKats May 5, 2021
48e0e23
rename upgradeWebSocket
crowlKats May 5, 2021
2499a5a
Merge branch 'main' into native_http_websocket
crowlKats May 5, 2021
b494a44
clean up
crowlKats May 5, 2021
35d375a
clean up
crowlKats May 5, 2021
85639fb
clean up
crowlKats May 5, 2021
e855c63
Update runtime/js/40_http.js
crowlKats May 5, 2021
b8aa871
fix
crowlKats May 5, 2021
afe1d99
Merge branch 'main' into native_http_websocket
crowlKats Jun 16, 2021
e149890
Merge branch 'main' into native_http_websocket
crowlKats Jun 21, 2021
d4a51bf
work
crowlKats Jun 21, 2021
613636d
wip
lucacasonato Jun 16, 2021
9b32625
work
crowlKats Jun 21, 2021
fa193cf
fix
crowlKats Jun 22, 2021
dddfc29
work
crowlKats Jun 30, 2021
7de7397
fix
lucacasonato Jun 30, 2021
0a70386
cleanup
lucacasonato Jun 30, 2021
4fe1e79
it works!
lucacasonato Jun 30, 2021
aad5ee6
don't leak resources
lucacasonato Jun 30, 2021
087aed1
clean up & remove hyper_tungstenite
crowlKats Jul 1, 2021
15ae4c0
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
9e2b416
clean up
crowlKats Jul 1, 2021
2dc7e8c
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
66ba74d
clean up
crowlKats Jul 1, 2021
d940e7e
add test & lint
crowlKats Jul 1, 2021
c86e7bb
add protocol selection support
crowlKats Jul 1, 2021
79c324e
fix
crowlKats Jul 1, 2021
54941c1
fix
crowlKats Jul 1, 2021
588c563
errors
crowlKats Jul 1, 2021
8a1f533
adjust test
crowlKats Jul 1, 2021
8f99e87
adjust test
crowlKats Jul 1, 2021
46d6591
fix
crowlKats Jul 2, 2021
17d5038
use boxed dyn Future for connection
lucacasonato Jul 7, 2021
65536d1
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 7, 2021
05b8166
lint + fmt
lucacasonato Jul 7, 2021
fc8b27f
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 8, 2021
8a9e486
generate accept header sync
crowlKats Jul 8, 2021
ac56367
Merge branch 'main' into native_http_websocket
lucacasonato Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,13 @@ declare namespace Deno {
*/
export function serveHttp(conn: Conn): HttpConn;

export interface WebSocketUpgrade {
response: Response;
websocket: WebSocket;
}

export function upgradeWebSocket(request: Request): Promise<WebSocketUpgrade>;

/** **UNSTABLE**: New option, yet to be vetted. */
export interface TestDefinition {
/** Specifies the permissions that should be used to run the test.
Expand Down
1 change: 1 addition & 0 deletions extensions/fetch/23_response.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@

window.__bootstrap.fetch ??= {};
window.__bootstrap.fetch.Response = Response;
window.__bootstrap.fetch.newInnerResponse = newInnerResponse;
window.__bootstrap.fetch.toInnerResponse = toInnerResponse;
window.__bootstrap.fetch.fromInnerResponse = fromInnerResponse;
window.__bootstrap.fetch.redirectStatus = redirectStatus;
Expand Down
5 changes: 3 additions & 2 deletions extensions/websocket/01_websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
const _protocol = Symbol("[[protocol]]");
const _binaryType = Symbol("[[binaryType]]");
const _bufferedAmount = Symbol("[[bufferedAmount]]");
const _eventLoop = Symbol("[[eventLoop]]");
class WebSocket extends EventTarget {
[_rid];

Expand Down Expand Up @@ -255,7 +256,7 @@
event.target = this;
this.dispatchEvent(event);

this.#eventLoop();
this[_eventLoop]();
}
}).catch((err) => {
this[_readyState] = CLOSED;
Expand Down Expand Up @@ -379,7 +380,7 @@
}
}

async #eventLoop() {
async [_eventLoop]() {
while (this[_readyState] === OPEN) {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
Expand Down
1 change: 1 addition & 0 deletions extensions/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ serde = { version = "1.0.125", features = ["derive"] }
tokio = { version = "1.7.1", features = ["full"] }
tokio-rustls = "0.22.0"
tokio-tungstenite = { version = "0.14.0", features = ["rustls-tls"] }
hyper = { version = "0.14.9" }
webpki = "0.21.4"
webpki-roots = "0.21.1"
84 changes: 67 additions & 17 deletions extensions/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,67 @@ impl WebSocketPermissions for NoWebSocketPermissions {
}

type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
struct WsStreamResource {
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>,
pub enum WebSocketStreamType {
Client {
tx: AsyncRefCell<SplitSink<WsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>,
},
Server {
tx: AsyncRefCell<SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>>,
rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>,
},
}

pub struct WsStreamResource {
pub stream: WebSocketStreamType,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
// canceled, while 'write' ops are allowed to complete. Therefore only
// 'read' futures are attached to this cancel handle.
cancel: CancelHandle,
pub cancel: CancelHandle,
}

impl WsStreamResource {
async fn send(self: &Rc<Self>, message: Message) -> Result<(), AnyError> {
match self.stream {
WebSocketStreamType::Client { .. } => {
let mut tx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { tx, .. } => tx,
WebSocketStreamType::Server { .. } => unreachable!(),
}).borrow_mut().await;
tx.send(message).await?;
},
WebSocketStreamType::Server { .. } => {
let mut tx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { .. } => unreachable!(),
WebSocketStreamType::Server { tx, .. } => tx,
}).borrow_mut().await;
tx.send(message).await?;
},
}

Ok(())
}

async fn next_message(self: &Rc<Self>, cancel: RcRef<CancelHandle>) -> Result<Option<Result<Message, tokio_tungstenite::tungstenite::Error>>, AnyError> {
match &self.stream {
WebSocketStreamType::Client { .. } => {
let mut rx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { rx, .. } => rx,
WebSocketStreamType::Server { .. } => unreachable!(),
}).borrow_mut().await;
let message = rx.next().or_cancel(cancel).await?;
Ok(message)
},
WebSocketStreamType::Server { .. } => {
let mut rx = RcRef::map(self, |r| match &r.stream {
WebSocketStreamType::Client { .. } => unreachable!(),
WebSocketStreamType::Server { rx, .. } => rx,
}).borrow_mut().await;
let message = rx.next().or_cancel(cancel).await?;
Ok(message)
},
}
}
}

impl Resource for WsStreamResource {
Expand All @@ -79,8 +133,6 @@ impl Resource for WsStreamResource {
}
}

impl WsStreamResource {}

// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
Expand Down Expand Up @@ -184,8 +236,10 @@ where

let (ws_tx, ws_rx) = stream.split();
let resource = WsStreamResource {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
stream: WebSocketStreamType::Client {
rx: AsyncRefCell::new(ws_rx),
tx: AsyncRefCell::new(ws_tx),
},
cancel: Default::default(),
};
let mut state = state.borrow_mut();
Expand Down Expand Up @@ -227,15 +281,13 @@ pub async fn op_ws_send(
"pong" => Message::Pong(vec![]),
_ => unreachable!(),
};
let rid = args.rid;

let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)
.get::<WsStreamResource>(args.rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
resource.send(msg).await?;
Ok(())
}

Expand Down Expand Up @@ -266,8 +318,7 @@ pub async fn op_ws_close(
.resource_table
.get::<WsStreamResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
tx.send(msg).await?;
resource.send(msg).await?;
Ok(())
}

Expand All @@ -294,9 +345,8 @@ pub async fn op_ws_next_event(
.get::<WsStreamResource>(rid)
.ok_or_else(bad_resource_id)?;

let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let val = rx.next().or_cancel(cancel).await?;
let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?;
let res = match val {
Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
Expand Down
1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ deno_websocket = { version = "0.15.0", path = "../extensions/websocket" }
deno_webstorage = { version = "0.5.0", path = "../extensions/webstorage" }

atty = "0.2.14"
base64 = "0.13.0"
bytes = "1"
dlopen = "0.1.8"
encoding_rs = "0.8.28"
Expand Down
107 changes: 96 additions & 11 deletions runtime/js/40_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
"use strict";

((window) => {
const webidl = window.__bootstrap.webidl;
const { forgivingBase64Encode } = window.__bootstrap.infra;
const { InnerBody } = window.__bootstrap.fetchBody;
const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } =
const { setEventTargetData } = window.__bootstrap.eventTarget;
const { Response, fromInnerRequest, toInnerResponse, newInnerRequest, newInnerResponse, fromInnerResponse } =
window.__bootstrap.fetch;
const errors = window.__bootstrap.errors.errors;
const core = window.Deno.core;
Expand Down Expand Up @@ -53,7 +56,7 @@
if (nextRequest === null) return null;

const [
requestBodyRid,
requestRid,
responseSenderRid,
method,
headersList,
Expand All @@ -62,8 +65,8 @@

/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
if (typeof requestBodyRid === "number") {
body = createRequestBodyStream(requestBodyRid);
if (typeof requestRid === "number") {
body = createRequestBodyStream(requestRid);
}

const innerRequest = newInnerRequest(
Expand All @@ -74,7 +77,11 @@
);
const request = fromInnerRequest(innerRequest, null, "immutable");

const respondWith = createRespondWith(this, responseSenderRid);
const respondWith = createRespondWith(
this,
responseSenderRid,
requestRid,
);

return { request, respondWith };
}
Expand Down Expand Up @@ -105,7 +112,7 @@
);
}

function createRespondWith(httpConn, responseSenderRid) {
function createRespondWith(httpConn, responseSenderRid, requestRid) {
return async function respondWith(resp) {
if (resp instanceof Promise) {
resp = await resp;
Expand Down Expand Up @@ -206,10 +213,50 @@
} catch { /* pass */ }
}
}

const ws = resp[_ws];
if (ws) {
const _readyState = Symbol.for("[[readyState]]");

core.opAsync("op_http_upgrade_websocket", requestRid).then((rid) => {
ws[Symbol.for("[[rid]]")] = rid;
// TODO: protocols & extensions

if (ws[_readyState] === WebSocket.CLOSING) {
core.opAsync("op_ws_close", {
rid,
}).then(() => {
ws[_readyState] = WebSocket.CLOSED;

const errEvent = new ErrorEvent("error");
errEvent.target = ws;
ws.dispatchEvent(errEvent);

const event = new CloseEvent("close");
event.target = ws;
ws.dispatchEvent(event);

try {
core.close(rid);
} catch (err) {
// Ignore error if the socket has already been closed.
if (!(err instanceof Deno.errors.BadResource)) throw err;
}
});
} else {
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
event.target = ws;
ws.dispatchEvent(event);

ws[Symbol.for("[[eventLoop]]")]();
}
});
}
};
}

function createRequestBodyStream(requestBodyRid) {
function createRequestBodyStream(requestRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
Expand All @@ -218,7 +265,7 @@
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest(
requestBodyRid,
requestRid,
chunk,
);
if (read > 0) {
Expand All @@ -227,23 +274,61 @@
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
core.close(requestBodyRid);
core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
core.close(requestBodyRid);
core.close(requestRid);
}
},
cancel() {
core.close(requestBodyRid);
core.close(requestRid);
},
});
}

const _ws = Symbol("[[associated_ws]]");

async function upgradeWebSocket(request) {
if (request.headers["Upgrade"] !== "websocket") {
// Throw
}

if (request.headers["Connection"] !== "Upgrade") {
// Throw
}

if (!request.headers["Sec-WebSocket-Key"]) {
// Throw
}

const key = new TextEncoder().encode(
request.headers["Sec-WebSocket-Key"] +
"258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
);
const accept = await crypto.subtle.digest("SHA-1", key);

const r = newInnerResponse(101);
r.headerList = [
["Upgrade", "websocket"],
["Connection", "Upgrade"],
["Sec-WebSocket-Accept", forgivingBase64Encode(new Uint8Array(accept))]
crowlKats marked this conversation as resolved.
Show resolved Hide resolved
];

const response = fromInnerResponse(r, "immutable");

const websocket = webidl.createBranded(WebSocket);
setEventTargetData(websocket);
response[_ws] = websocket;

return { response, websocket };
}

window.__bootstrap.http = {
serveHttp,
upgradeWebSocket,
};
})(this);
Loading