Skip to content

Commit

Permalink
feat(runtime/http): server side websocket support (#10359)
Browse files Browse the repository at this point in the history
Co-authored-by: Nayeem Rahman <nayeemrmn99@gmail.com>
Co-authored-by: Luca Casonato <hello@lcas.dev>
  • Loading branch information
3 people authored Jul 8, 2021
1 parent 215f6f2 commit 5e092b1
Show file tree
Hide file tree
Showing 11 changed files with 579 additions and 217 deletions.
302 changes: 161 additions & 141 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
assertThrowsAsync,
deferred,
delay,
fail,
unitTest,
} from "./test_util.ts";

Expand Down Expand Up @@ -631,3 +632,33 @@ unitTest(
await promise;
},
);

unitTest({ perms: { net: true } }, async function httpServerWebSocket() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
for await (const conn of listener) {
const httpConn = Deno.serveHttp(conn);
const { request, respondWith } = (await httpConn.nextRequest())!;
const {
response,
websocket,
} = await Deno.upgradeWebSocket(request);
websocket.onerror = () => fail();
websocket.onmessage = (m) => {
websocket.send(m.data);
websocket.close();
};
await respondWith(response);
break;
}
})();

const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "foo");
ws.onerror = () => fail();
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
await def;
await promise;
});
1 change: 1 addition & 0 deletions extensions/fetch/23_response.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@

window.__bootstrap.fetch ??= {};
window.__bootstrap.fetch.Response = Response;
window.__bootstrap.fetch.newInnerResponse = newInnerResponse;
window.__bootstrap.fetch.toInnerResponse = toInnerResponse;
window.__bootstrap.fetch.fromInnerResponse = fromInnerResponse;
window.__bootstrap.fetch.redirectStatus = redirectStatus;
Expand Down
144 changes: 129 additions & 15 deletions extensions/net/03_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,34 @@
"use strict";

((window) => {
const webidl = window.__bootstrap.webidl;
const { InnerBody } = window.__bootstrap.fetchBody;
const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } =
window.__bootstrap.fetch;
const { setEventTargetData } = window.__bootstrap.eventTarget;
const {
Response,
fromInnerRequest,
toInnerResponse,
newInnerRequest,
newInnerResponse,
fromInnerResponse,
} = window.__bootstrap.fetch;
const core = window.Deno.core;
const { BadResource, Interrupted } = core;
const { ReadableStream } = window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal;
const { WebSocket, _rid, _readyState, _eventLoop, _protocol } =
window.__bootstrap.webSocket;
const {
Symbol,
Uint8Array,
ArrayPrototypeIncludes,
ArrayPrototypePush,
Promise,
StringPrototypeIncludes,
StringPrototypeSplit,
Symbol,
SymbolAsyncIterator,
TypeError,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = window.__bootstrap.primordials;

function serveHttp(conn) {
Expand Down Expand Up @@ -65,7 +78,7 @@
if (nextRequest === null) return null;

const [
requestBodyRid,
requestRid,
responseSenderRid,
method,
headersList,
Expand All @@ -74,8 +87,8 @@

/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
if (typeof requestBodyRid === "number") {
body = createRequestBodyStream(requestBodyRid);
if (typeof requestRid === "number") {
body = createRequestBodyStream(requestRid);
}

const innerRequest = newInnerRequest(
Expand All @@ -87,7 +100,11 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");

const respondWith = createRespondWith(this, responseSenderRid);
const respondWith = createRespondWith(
this,
responseSenderRid,
requestRid,
);

return { request, respondWith };
}
Expand Down Expand Up @@ -118,7 +135,7 @@
);
}

function createRespondWith(httpConn, responseSenderRid) {
function createRespondWith(httpConn, responseSenderRid, requestRid) {
return async function respondWith(resp) {
if (resp instanceof Promise) {
resp = await resp;
Expand Down Expand Up @@ -222,10 +239,51 @@
} catch { /* pass */ }
}
}

const ws = resp[_ws];
if (ws) {
if (typeof requestRid !== "number") {
throw new TypeError(
"This request can not be upgraded to a websocket connection.",
);
}

const wsRid = await core.opAsync(
"op_http_upgrade_websocket",
requestRid,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");

if (ws[_readyState] === WebSocket.CLOSING) {
await core.opAsync("op_ws_close", { rid: wsRid });

ws[_readyState] = WebSocket.CLOSED;

const errEvent = new ErrorEvent("error");
ws.dispatchEvent(errEvent);

const event = new CloseEvent("close");
ws.dispatchEvent(event);

try {
core.close(wsRid);
} catch (err) {
// Ignore error if the socket has already been closed.
if (!(err instanceof Deno.errors.BadResource)) throw err;
}
} else {
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);

ws[_eventLoop]();
}
}
};
}

function createRequestBodyStream(requestBodyRid) {
function createRequestBodyStream(requestRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
Expand All @@ -234,7 +292,7 @@
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest(
requestBodyRid,
requestRid,
chunk,
);
if (read > 0) {
Expand All @@ -243,23 +301,79 @@
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
core.close(requestBodyRid);
core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
core.close(requestBodyRid);
core.close(requestRid);
}
},
cancel() {
core.close(requestBodyRid);
core.close(requestRid);
},
});
}

const _ws = Symbol("[[associated_ws]]");

function upgradeWebSocket(request, options = {}) {
if (request.headers.get("upgrade") !== "websocket") {
throw new TypeError(
"Invalid Header: 'upgrade' header must be 'websocket'",
);
}

if (request.headers.get("connection") !== "Upgrade") {
throw new TypeError(
"Invalid Header: 'connection' header must be 'Upgrade'",
);
}

const websocketKey = request.headers.get("sec-websocket-key");
if (websocketKey === null) {
throw new TypeError(
"Invalid Header: 'sec-websocket-key' header must be set",
);
}

const accept = core.opSync("op_http_websocket_accept_header", websocketKey);

const r = newInnerResponse(101);
r.headerList = [
["upgrade", "websocket"],
["connection", "Upgrade"],
["sec-websocket-accept", accept],
];

const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
const protocols = StringPrototypeSplit(protocolsStr, ", ");
if (protocols && options.protocol) {
if (ArrayPrototypeIncludes(protocols, options.protocol)) {
ArrayPrototypePush(r.headerList, [
"sec-websocket-protocol",
options.protocol,
]);
} else {
throw new TypeError(
`Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
);
}
}

const response = fromInnerResponse(r, "immutable");

const websocket = webidl.createBranded(WebSocket);
setEventTargetData(websocket);
response[_ws] = websocket;

return { response, websocket };
}

window.__bootstrap.http = {
serveHttp,
upgradeWebSocket,
};
})(this);
3 changes: 3 additions & 0 deletions extensions/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ path = "lib.rs"

[dependencies]
deno_core = { version = "0.92.0", path = "../../core" }
deno_websocket = { version = "0.15.0", path = "../websocket" }

base64 = "0.13.0"
bytes = "1"
log = "0.4.14"
lazy_static = "1.4.0"
http = "0.2.3"
hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
ring = "0.16.20"
rustls = "0.19.0"
serde = { version = "1.0.125", features = ["derive"] }
tokio = { version = "1.8.0", features = ["full"] }
Expand Down
14 changes: 14 additions & 0 deletions extensions/net/lib.deno_net.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,18 @@ declare namespace Deno {
* then the underlying HttpConn resource is closed automatically.
*/
export function serveHttp(conn: Conn): HttpConn;

export interface WebSocketUpgrade {
response: Response;
websocket: WebSocket;
}

export interface UpgradeWebSocketOptions {
protocol?: string;
}

export function upgradeWebSocket(
request: Request,
options?: UpgradeWebSocketOptions,
): WebSocketUpgrade;
}
Loading

0 comments on commit 5e092b1

Please sign in to comment.