Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime/http): websocket support #10359

Merged
merged 44 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
c17a720
add websocket upgrade
crowlKats Apr 16, 2021
e63b02b
Merge branch 'main' into native_http_websocket
crowlKats Apr 24, 2021
6de42f0
add ws
crowlKats Apr 24, 2021
a313886
handle sec-websocket-key
crowlKats Apr 24, 2021
a94cbca
handle closing and add typings
crowlKats Apr 25, 2021
de350f5
basic error handling
crowlKats Apr 25, 2021
dc9056d
Update cli/dts/lib.deno.unstable.d.ts
crowlKats May 5, 2021
48e0e23
rename upgradeWebSocket
crowlKats May 5, 2021
2499a5a
Merge branch 'main' into native_http_websocket
crowlKats May 5, 2021
b494a44
clean up
crowlKats May 5, 2021
35d375a
clean up
crowlKats May 5, 2021
85639fb
clean up
crowlKats May 5, 2021
e855c63
Update runtime/js/40_http.js
crowlKats May 5, 2021
b8aa871
fix
crowlKats May 5, 2021
afe1d99
Merge branch 'main' into native_http_websocket
crowlKats Jun 16, 2021
e149890
Merge branch 'main' into native_http_websocket
crowlKats Jun 21, 2021
d4a51bf
work
crowlKats Jun 21, 2021
613636d
wip
lucacasonato Jun 16, 2021
9b32625
work
crowlKats Jun 21, 2021
fa193cf
fix
crowlKats Jun 22, 2021
dddfc29
work
crowlKats Jun 30, 2021
7de7397
fix
lucacasonato Jun 30, 2021
0a70386
cleanup
lucacasonato Jun 30, 2021
4fe1e79
it works!
lucacasonato Jun 30, 2021
aad5ee6
don't leak resources
lucacasonato Jun 30, 2021
087aed1
clean up & remove hyper_tungstenite
crowlKats Jul 1, 2021
15ae4c0
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
9e2b416
clean up
crowlKats Jul 1, 2021
2dc7e8c
Merge branch 'main' into native_http_websocket
crowlKats Jul 1, 2021
66ba74d
clean up
crowlKats Jul 1, 2021
d940e7e
add test & lint
crowlKats Jul 1, 2021
c86e7bb
add protocol selection support
crowlKats Jul 1, 2021
79c324e
fix
crowlKats Jul 1, 2021
54941c1
fix
crowlKats Jul 1, 2021
588c563
errors
crowlKats Jul 1, 2021
8a1f533
adjust test
crowlKats Jul 1, 2021
8f99e87
adjust test
crowlKats Jul 1, 2021
46d6591
fix
crowlKats Jul 2, 2021
17d5038
use boxed dyn Future for connection
lucacasonato Jul 7, 2021
65536d1
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 7, 2021
05b8166
lint + fmt
lucacasonato Jul 7, 2021
fc8b27f
Merge remote-tracking branch 'origin/main' into native_http_websocket
lucacasonato Jul 8, 2021
8a9e486
generate accept header sync
crowlKats Jul 8, 2021
ac56367
Merge branch 'main' into native_http_websocket
lucacasonato Jul 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 161 additions & 143 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
assertThrowsAsync,
deferred,
delay,
fail,
unitTest,
} from "./test_util.ts";

Expand Down Expand Up @@ -631,3 +632,33 @@ unitTest(
await promise;
},
);

unitTest({ perms: { net: true } }, async function httpServerWebSocket() {
const promise = (async () => {
const listener = Deno.listen({ port: 4501 });
for await (const conn of listener) {
const httpConn = Deno.serveHttp(conn);
const { request, respondWith } = (await httpConn.nextRequest())!;
const {
response,
websocket,
} = await Deno.upgradeWebSocket(request);
websocket.onerror = () => fail();
websocket.onmessage = (m) => {
websocket.send(m.data);
websocket.close();
};
await respondWith(response);
break;
}
})();

const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "foo");
ws.onerror = () => fail();
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
await def;
await promise;
});
1 change: 1 addition & 0 deletions extensions/fetch/23_response.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@

window.__bootstrap.fetch ??= {};
window.__bootstrap.fetch.Response = Response;
window.__bootstrap.fetch.newInnerResponse = newInnerResponse;
window.__bootstrap.fetch.toInnerResponse = toInnerResponse;
window.__bootstrap.fetch.fromInnerResponse = fromInnerResponse;
window.__bootstrap.fetch.redirectStatus = redirectStatus;
Expand Down
147 changes: 132 additions & 15 deletions extensions/net/03_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,35 @@
"use strict";

((window) => {
const webidl = window.__bootstrap.webidl;
const { forgivingBase64Encode } = window.__bootstrap.infra;
const { InnerBody } = window.__bootstrap.fetchBody;
const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } =
window.__bootstrap.fetch;
const { setEventTargetData } = window.__bootstrap.eventTarget;
const {
Response,
fromInnerRequest,
toInnerResponse,
newInnerRequest,
newInnerResponse,
fromInnerResponse,
} = window.__bootstrap.fetch;
const core = window.Deno.core;
const { BadResource, Interrupted } = core;
const { ReadableStream } = window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal;
const { WebSocket, _rid, _readyState, _eventLoop, _protocol } =
window.__bootstrap.webSocket;
const {
Symbol,
Uint8Array,
ArrayPrototypeIncludes,
ArrayPrototypePush,
Promise,
StringPrototypeIncludes,
StringPrototypeSplit,
Symbol,
SymbolAsyncIterator,
TypeError,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = window.__bootstrap.primordials;

function serveHttp(conn) {
Expand Down Expand Up @@ -65,7 +79,7 @@
if (nextRequest === null) return null;

const [
requestBodyRid,
requestRid,
responseSenderRid,
method,
headersList,
Expand All @@ -74,8 +88,8 @@

/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
if (typeof requestBodyRid === "number") {
body = createRequestBodyStream(requestBodyRid);
if (typeof requestRid === "number") {
body = createRequestBodyStream(requestRid);
}

const innerRequest = newInnerRequest(
Expand All @@ -87,7 +101,11 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");

const respondWith = createRespondWith(this, responseSenderRid);
const respondWith = createRespondWith(
this,
responseSenderRid,
requestRid,
);

return { request, respondWith };
}
Expand Down Expand Up @@ -118,7 +136,7 @@
);
}

function createRespondWith(httpConn, responseSenderRid) {
function createRespondWith(httpConn, responseSenderRid, requestRid) {
return async function respondWith(resp) {
if (resp instanceof Promise) {
resp = await resp;
Expand Down Expand Up @@ -222,10 +240,51 @@
} catch { /* pass */ }
}
}

const ws = resp[_ws];
if (ws) {
if (typeof requestRid !== "number") {
throw new TypeError(
"This request can not be upgraded to a websocket connection.",
);
}

const wsRid = await core.opAsync(
"op_http_upgrade_websocket",
requestRid,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");

if (ws[_readyState] === WebSocket.CLOSING) {
await core.opAsync("op_ws_close", { rid: wsRid });

ws[_readyState] = WebSocket.CLOSED;

const errEvent = new ErrorEvent("error");
ws.dispatchEvent(errEvent);

const event = new CloseEvent("close");
ws.dispatchEvent(event);

try {
core.close(wsRid);
} catch (err) {
// Ignore error if the socket has already been closed.
if (!(err instanceof Deno.errors.BadResource)) throw err;
}
} else {
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);

ws[_eventLoop]();
}
}
};
}

function createRequestBodyStream(requestBodyRid) {
function createRequestBodyStream(requestRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
Expand All @@ -234,7 +293,7 @@
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest(
requestBodyRid,
requestRid,
chunk,
);
if (read > 0) {
Expand All @@ -243,23 +302,81 @@
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
core.close(requestBodyRid);
core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
core.close(requestBodyRid);
core.close(requestRid);
}
},
cancel() {
core.close(requestBodyRid);
core.close(requestRid);
},
});
}

const _ws = Symbol("[[associated_ws]]");

async function upgradeWebSocket(request, options = {}) {
if (request.headers.get("upgrade") !== "websocket") {
throw new TypeError(
"Invalid Header: 'upgrade' header must be 'websocket'",
);
}

if (request.headers.get("connection") !== "Upgrade") {
throw new TypeError(
"Invalid Header: 'connection' header must be 'Upgrade'",
);
}

const websocketKey = request.headers.get("sec-websocket-key");
if (websocketKey === null) {
throw new TypeError(
"Invalid Header: 'sec-websocket-key' header must be set",
);
}

const key = new TextEncoder()
.encode(websocketKey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
const accept = await crypto.subtle.digest("SHA-1", key);

const r = newInnerResponse(101);
r.headerList = [
["upgrade", "websocket"],
["connection", "Upgrade"],
["sec-websocket-accept", forgivingBase64Encode(new Uint8Array(accept))],
];

const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
const protocols = StringPrototypeSplit(protocolsStr, ", ");
if (protocols && options.protocol) {
if (ArrayPrototypeIncludes(protocols, options.protocol)) {
ArrayPrototypePush(r.headerList, [
"sec-websocket-protocol",
options.protocol,
]);
} else {
throw new TypeError(
`Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
);
}
}

const response = fromInnerResponse(r, "immutable");

const websocket = webidl.createBranded(WebSocket);
setEventTargetData(websocket);
response[_ws] = websocket;

return { response, websocket };
}

window.__bootstrap.http = {
serveHttp,
upgradeWebSocket,
};
})(this);
1 change: 1 addition & 0 deletions extensions/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "lib.rs"

[dependencies]
deno_core = { version = "0.92.0", path = "../../core" }
deno_websocket = { version = "0.15.0", path = "../websocket" }

bytes = "1"
log = "0.4.14"
Expand Down
14 changes: 14 additions & 0 deletions extensions/net/lib.deno_net.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,18 @@ declare namespace Deno {
* then the underlying HttpConn resource is closed automatically.
*/
export function serveHttp(conn: Conn): HttpConn;

export interface WebSocketUpgrade {
response: Response;
websocket: WebSocket;
}

export interface UpgradeWebSocketOptions {
protocol?: string;
}

export function upgradeWebSocket(
request: Request,
options?: UpgradeWebSocketOptions,
): Promise<WebSocketUpgrade>;
}
Loading