Skip to content

Commit

Permalink
Wait for worker response before opening WebSocket in client, closes #88
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbbot committed Jan 7, 2022
1 parent 30495ba commit 86bb1f9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 51 deletions.
72 changes: 31 additions & 41 deletions packages/http-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
import { randomHex } from "@miniflare/shared";
import { coupleWebSocket } from "@miniflare/web-sockets";
import { BodyInit, Headers } from "undici";
import StandardWebSocket, { WebSocketServer } from "ws";
import { WebSocketServer } from "ws";
import { getAccessibleHosts } from "./helpers";
import { HTTPPlugin, RequestMeta } from "./plugin";

Expand Down Expand Up @@ -305,38 +305,6 @@ export function createRequestListener<Plugins extends HTTPPluginSignatures>(
};
}

export type WebSocketUpgradeListener = (
ws: StandardWebSocket,
req: http.IncomingMessage
) => void;

export function createWebSocketUpgradeListener<
Plugins extends CorePluginSignatures
>(
mf: MiniflareCore<Plugins>,
listener: RequestListener
): WebSocketUpgradeListener {
return async (ws, req) => {
// Handle request in worker
const response = await listener(req);

// Check web socket response was returned
const webSocket = response?.webSocket;
if (response?.status !== 101 || !webSocket) {
ws.close(1002, "Protocol Error");
mf.log.error(
new TypeError(
"Web Socket request did not return status 101 Switching Protocols response with Web Socket"
)
);
return;
}

// Couple the web socket here
await coupleWebSocket(ws, webSocket);
};
}

export async function createServer<Plugins extends HTTPPluginSignatures>(
mf: MiniflareCore<Plugins>,
options?: http.ServerOptions & https.ServerOptions
Expand All @@ -355,17 +323,39 @@ export async function createServer<Plugins extends HTTPPluginSignatures>(
}

// Setup WebSocket servers
const upgrader = createWebSocketUpgradeListener(mf, listener);
const webSocketServer = new WebSocketServer({ noServer: true });
webSocketServer.on("connection", upgrader);
const liveReloadServer = new WebSocketServer({ noServer: true });
server.on("upgrade", (request, socket, head) => {
server.on("upgrade", async (request, socket, head) => {
// Only interested in pathname so base URL doesn't matter
const { pathname } = new URL(request.url ?? "", "http://localhost");
const server =
pathname === "/cdn-cgi/mf/reload" ? liveReloadServer : webSocketServer;
server.handleUpgrade(request, socket as any, head, (ws) =>
server.emit("connection", ws, request)
);
if (pathname === "/cdn-cgi/mf/reload") {
// If this is the for live-reload, handle the request ourselves
liveReloadServer.handleUpgrade(request, socket as any, head, (ws) => {
liveReloadServer.emit("connection", ws, request);
});
} else {
// Otherwise, handle the request in the worker
const response = await listener(request);

// Check web socket response was returned
const webSocket = response?.webSocket;
if (response?.status !== 101 || !webSocket) {
socket.write("HTTP/1.1 500 Internal Server Error\r\n\r\n");
socket.destroy();
mf.log.error(
new TypeError(
"Web Socket request did not return status 101 Switching Protocols response with Web Socket"
)
);
return;
}

// Accept and couple the Web Socket
webSocketServer.handleUpgrade(request, socket as any, head, (ws) => {
void coupleWebSocket(ws, webSocket);
webSocketServer.emit("connection", ws, request);
});
}
});
const reloadListener = () => {
// Reload all connected live reload clients
Expand Down
28 changes: 18 additions & 10 deletions packages/http-server/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ import {
} from "@miniflare/shared-test";
import { MessageEvent, WebSocketPlugin } from "@miniflare/web-sockets";
import test, { ExecutionContext, Macro } from "ava";
import StandardWebSocket, { Data, Event as WebSocketEvent } from "ws";
import StandardWebSocket, {
Data,
CloseEvent as WebSocketCloseEvent,
ErrorEvent as WebSocketErrorEvent,
Event as WebSocketEvent,
} from "ws";

function listen(
t: ExecutionContext,
Expand Down Expand Up @@ -487,7 +492,10 @@ test("createServer: handles web socket upgrades", async (t) => {
const mf = useMiniflareWithHandler(
{ HTTPPlugin, WebSocketPlugin },
{},
(globals) => {
async (globals) => {
// Simulate slow response, WebSocket must not open until worker responds
await new Promise((resolve) => globals.setTimeout(resolve, 1000));

const [client, worker] = Object.values(new globals.WebSocketPair());
worker.accept();
worker.addEventListener("message", (e: MessageEvent) => {
Expand Down Expand Up @@ -523,18 +531,18 @@ test("createServer: expects status 101 and web socket response for upgrades", as
const port = await listen(t, await createServer(mf));

const ws = new StandardWebSocket(`ws://localhost:${port}`);
const [eventTrigger, eventPromise] = triggerPromise<{
code: number;
reason: string;
}>();
ws.addEventListener("close", eventTrigger);
const event = await eventPromise;
const [closeTrigger, closePromise] = triggerPromise<WebSocketCloseEvent>();
const [errorTrigger, errorPromise] = triggerPromise<WebSocketErrorEvent>();
ws.addEventListener("close", closeTrigger);
ws.addEventListener("error", errorTrigger);
const closeEvent = await closePromise;
const errorEvent = await errorPromise;

t.deepEqual(log.logsAtLevel(LogLevel.ERROR), [
"TypeError: Web Socket request did not return status 101 Switching Protocols response with Web Socket",
]);
t.is(event.code, 1002);
t.is(event.reason, "Protocol Error");
t.is(closeEvent.code, 1006);
t.is(errorEvent.message, "Unexpected server response: 500");
});
test("createServer: notifies connected live reload clients on reload", async (t) => {
const mf = useMiniflareWithHandler({ HTTPPlugin }, {}, (globals) => {
Expand Down

0 comments on commit 86bb1f9

Please sign in to comment.