Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up HTTP async iterator code #411

Merged
merged 22 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions http/http_bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ import { serve } from "./server.ts";

const addr = Deno.args[1] || "127.0.0.1:4500";
const server = serve(addr);

const body = new TextEncoder().encode("Hello World");

async function main(): Promise<void> {
console.log(`http://${addr}/`);
for await (const request of server) {
request.respond({ status: 200, body });
for await (const req of server) {
req.respond({ body });
}
}

Expand Down
231 changes: 81 additions & 150 deletions http/server.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { listen, copy, toAsyncIterator } = Deno;
type Listener = Deno.Listener;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert } from "../testing/asserts.ts";

interface Deferred {
promise: Promise<{}>;
resolve: () => void;
reject: () => void;
}

function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

interface HttpConn extends Conn {
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred>;
}

function createHttpConn(c: Conn): HttpConn {
const httpConn = Object.assign(c, {
lastPipelineId: 0,
pendingDeferredMap: new Map()
});

const resolvedDeferred = deferred(true);
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
return httpConn;
}
import { assert, fail } from "../testing/asserts.ts";
import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts";

function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
Expand All @@ -58,6 +17,7 @@ function bufWriter(w: Writer): BufWriter {
return new BufWriter(w);
}
}

export function setContentLength(r: Response): void {
if (!r.headers) {
r.headers = new Headers();
Expand All @@ -74,6 +34,7 @@ export function setContentLength(r: Response): void {
}
}
}

async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const writer = bufWriter(w);
const encoder = new TextEncoder();
Expand All @@ -90,6 +51,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}

export async function writeResponse(w: Writer, r: Response): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
Expand Down Expand Up @@ -131,6 +93,7 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> {
}
await writer.flush();
}

async function readAllIterator(
it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
Expand All @@ -154,14 +117,14 @@ async function readAllIterator(
}

export class ServerRequest {
pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
conn: HttpConn;
conn: Conn;
r: BufReader;
w: BufWriter;
done: Deferred<void> = deferred();

public async *bodyStream(): AsyncIterableIterator<Uint8Array> {
if (this.headers.has("content-length")) {
Expand Down Expand Up @@ -244,134 +207,102 @@ export class ServerRequest {
}

async respond(r: Response): Promise<void> {
// Check and wait if the previous request is done responding.
const lastPipelineId = this.pipelineId - 1;
const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred.promise;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
await writeResponse(this.w, r);
// Signal the next pending request that it can start writing.
const currPipelineDeferred = this.conn.pendingDeferredMap.get(
this.pipelineId
);
assert(!!currPipelineDeferred);
currPipelineDeferred.resolve();
// Signal that this request has been processed and the next pipelined
// request on the same connection can be accepted.
this.done.resolve();
}
}

interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
}

/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
c: HttpConn,
bufr?: BufReader
conn: Conn,
bufr: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
bufr = new BufReader(c);
}
const bufw = new BufWriter(c);
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;
const tp = new TextProtoReader(bufr!);

let s: string;
req.conn = conn;
req.r = bufr;
req.w = new BufWriter(conn);
const tp = new TextProtoReader(bufr);
let err: BufState;

// First line: GET /index.html HTTP/1.0
[s, err] = await tp.readLine();
let firstLine: string;
[firstLine, err] = await tp.readLine();
if (err) {
return [null, err];
}
[req.method, req.url, req.proto] = s.split(" ", 3);

[req.method, req.url, req.proto] = firstLine.split(" ", 3);
[req.headers, err] = await tp.readMIMEHeader();

return [req, err];
}

function maybeHandleReq(
env: ServeEnv,
conn: Conn,
maybeReq: [ServerRequest, BufState]
): void {
const [req, _err] = maybeReq;
if (_err) {
conn.close(); // assume EOF for now...
return;
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
}
export class Server implements AsyncIterable<ServerRequest> {
private closing = false;

function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
constructor(public listener: Listener) {}

export async function* serve(
addr: string
): AsyncIterableIterator<ServerRequest> {
const listener = listen("tcp", addr);
const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
serveDeferred: deferred()
};
close(): void {
this.closing = true;
this.listener.close();
}

// Routine that keeps calling accept
let handleConn = (_conn: Conn): void => {};
let scheduleAccept = (): void => {};
const acceptRoutine = (): void => {
scheduleAccept = (): void => {
listener.accept().then(handleConn);
};
handleConn = (conn: Conn): void => {
const httpConn = createHttpConn(conn);
serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};
// Yields all HTTP requests on a single TCP connection.
private async *iterateHttpRequests(
conn: Conn
): AsyncIterableIterator<ServerRequest> {
const bufr = new BufReader(conn);
let bufStateErr: BufState;
let req: ServerRequest;

while (!this.closing) {
[req, bufStateErr] = await readRequest(conn, bufr);
if (bufStateErr) break;
yield req;
// Wait for the request to be processed before we accept a new request on
// this connection.
await req.done;
}

scheduleAccept();
};
if (bufStateErr === "EOF") {
// The connection was gracefully closed.
} else if (bufStateErr instanceof Error) {
// TODO(ry): send something back like a HTTP 500 status.
} else if (this.closing) {
// There are more requests incoming but the server is closing.
// TODO(ry): send a back a HTTP 503 Service Unavailable status.
} else {
fail(`unexpected BufState: ${bufStateErr}`);
}

acceptRoutine();
conn.close();
}

// Loop hack to allow yield (yield won't work in callbacks)
while (true) {
await env.serveDeferred.promise;
env.serveDeferred = deferred(); // use a new deferred
let queueToProcess = env.reqQueue;
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
// Continue read more from conn when user is done with the current req
// Moving this here makes it easier to manage
serveConn(env, result.conn, result.r);
}
// Accepts a new TCP connection and yields all HTTP requests that arrive on
// it. When a connection is accepted, it also creates a new iterator of the
// same kind and adds it to the request multiplexer so that another TCP
// connection can be accepted.
private async *acceptConnAndIterateHttpRequests(
mux: MuxAsyncIterator<ServerRequest>
): AsyncIterableIterator<ServerRequest> {
if (this.closing) return;
// Wait for a new connection.
const conn = await this.listener.accept();
// Try to accept another connection and add it to the multiplexer.
mux.add(this.acceptConnAndIterateHttpRequests(mux));
// Yield the requests that arrive on the just-accepted connection.
yield* this.iterateHttpRequests(conn);
}
listener.close();

[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator();
mux.add(this.acceptConnAndIterateHttpRequests(mux));
return mux.iterate();
}
}

export function serve(addr: string): Server {
const listener = listen("tcp", addr);
return new Server(listener);
}

export async function listenAndServe(
Expand Down
32 changes: 3 additions & 29 deletions http/server_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,6 @@ const dec = new TextDecoder();

type Handler = () => void;

interface Deferred {
promise: Promise<{}>;
resolve: Handler;
reject: Handler;
}

function deferred(isResolved = false): Deferred {
let resolve: Handler = (): void => void 0;
let reject: Handler = (): void => void 0;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

const responseTests: ResponseTest[] = [
// Default response
{
Expand All @@ -72,8 +47,8 @@ test(async function responseWrite(): Promise<void> {
const buf = new Buffer();
const bufw = new BufWriter(buf);
const request = new ServerRequest();
request.pipelineId = 1;
request.w = bufw;

request.conn = {
localAddr: "",
remoteAddr: "",
Expand All @@ -86,13 +61,12 @@ test(async function responseWrite(): Promise<void> {
write: async (): Promise<number> => {
return -1;
},
close: (): void => {},
lastPipelineId: 0,
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
close: (): void => {}
};

await request.respond(testCase.response);
assertEquals(buf.toString(), testCase.raw);
await request.done;
}
});

Expand Down
1 change: 1 addition & 0 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "./strings/test.ts";
import "./testing/test.ts";
import "./textproto/test.ts";
import "./toml/test.ts";
import "./util/test.ts";
import "./ws/test.ts";

import "./testing/main.ts";
Loading