Skip to content

Commit

Permalink
http/server: Clean up async code
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed May 16, 2019
1 parent cfc8108 commit a43e200
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 120 deletions.
5 changes: 2 additions & 3 deletions http/http_bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ import { serve } from "./server.ts";

const addr = Deno.args[1] || "127.0.0.1:4500";
const server = serve(addr);

const body = new TextEncoder().encode("Hello World");

async function main(): Promise<void> {
console.log(`http://${addr}/`);
for await (const request of server) {
request.respond({ status: 200, body });
for await (const req of server) {
req.respond({ body });
}
}

Expand Down
154 changes: 63 additions & 91 deletions http/server.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,21 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { listen, copy, toAsyncIterator } = Deno;
type Listener = Deno.Listener;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert } from "../testing/asserts.ts";

interface Deferred {
promise: Promise<{}>;
resolve: () => void;
reject: () => void;
}

function deferred(isResolved = false): Deferred {
let resolve, reject;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}
import { AsyncQueue, deferred, Deferred } from "../util/async.ts";

interface HttpConn extends Conn {
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred>;
pendingDeferredMap: Map<number, Deferred<void>>;
}

function createHttpConn(c: Conn): HttpConn {
Expand All @@ -46,8 +24,10 @@ function createHttpConn(c: Conn): HttpConn {
pendingDeferredMap: new Map()
});

const resolvedDeferred = deferred(true);
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
const d = deferred();
d.resolve(); // The first request is ready immediately.
httpConn.pendingDeferredMap.set(0, d);

return httpConn;
}

Expand All @@ -58,6 +38,7 @@ function bufWriter(w: Writer): BufWriter {
return new BufWriter(w);
}
}

export function setContentLength(r: Response): void {
if (!r.headers) {
r.headers = new Headers();
Expand All @@ -74,6 +55,7 @@ export function setContentLength(r: Response): void {
}
}
}

async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const writer = bufWriter(w);
const encoder = new TextEncoder();
Expand All @@ -90,6 +72,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}

export async function writeResponse(w: Writer, r: Response): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
Expand Down Expand Up @@ -131,6 +114,7 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> {
}
await writer.flush();
}

async function readAllIterator(
it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
Expand Down Expand Up @@ -250,7 +234,7 @@ export class ServerRequest {
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred.promise;
await lastPipelineDeferred;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
Expand All @@ -264,11 +248,6 @@ export class ServerRequest {
}
}

interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
}

/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
Expand All @@ -279,11 +258,8 @@ interface ServeEnv {
*/
async function readRequest(
c: HttpConn,
bufr?: BufReader
bufr: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
bufr = new BufReader(c);
}
const bufw = new BufWriter(c);
const req = new ServerRequest();

Expand Down Expand Up @@ -313,65 +289,61 @@ async function readRequest(
return [req, err];
}

function maybeHandleReq(
env: ServeEnv,
conn: Conn,
maybeReq: [ServerRequest, BufState]
): void {
const [req, _err] = maybeReq;
if (_err) {
conn.close(); // assume EOF for now...
return;
export class Server implements AsyncIterableIterator<ServerRequest> {
private closing = false;
private looping = false;
private queue: AsyncQueue<ServerRequest> = new AsyncQueue();

constructor(public listener: Listener) {}

async acceptLoop(): Promise<void> {
assert(!this.looping);
this.looping = true;
try {
while (!this.closing) {
const conn = await this.listener.accept();
this.serveConn(conn);
}
} finally {
this.looping = false;
}
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
}

function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
close(): void {
this.listener.close();
this.closing = true;
}

export async function* serve(
addr: string
): AsyncIterableIterator<ServerRequest> {
const listener = listen("tcp", addr);
const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
serveDeferred: deferred()
};

// Routine that keeps calling accept
let handleConn = (_conn: Conn): void => {};
let scheduleAccept = (): void => {};
const acceptRoutine = (): void => {
scheduleAccept = (): void => {
listener.accept().then(handleConn);
};
handleConn = (conn: Conn): void => {
const httpConn = createHttpConn(conn);
serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};

scheduleAccept();
};

acceptRoutine();

// Loop hack to allow yield (yield won't work in callbacks)
while (true) {
await env.serveDeferred.promise;
env.serveDeferred = deferred(); // use a new deferred
let queueToProcess = env.reqQueue;
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
// Continue read more from conn when user is done with the current req
// Moving this here makes it easier to manage
serveConn(env, result.conn, result.r);
async serveConn(conn: Conn): Promise<void> {
const httpConn = createHttpConn(conn);
const bufr = new BufReader(httpConn);
while (true) {
const [req, err] = await readRequest(httpConn, bufr);
if (err) {
// TODO(ry) This should be more granular. Perhaps return back a 400 or
// 500 error?
httpConn.close();
break;
}
this.queue.push(req);
}
}
listener.close();

async next(): Promise<IteratorResult<ServerRequest>> {
const req = await this.queue.shift();
return { done: false, value: req };
}

[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
return this;
}
}

export function serve(addr: string): Server {
const listener = listen("tcp", addr);
const server = new Server(listener);
server.acceptLoop();
return server;
}

export async function listenAndServe(
Expand Down
33 changes: 7 additions & 26 deletions http/server_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { assertEquals } from "../testing/asserts.ts";
import { Response, ServerRequest, writeResponse } from "./server.ts";
import { BufReader, BufWriter } from "../io/bufio.ts";
import { StringReader } from "../io/readers.ts";
import { deferred } from "../util/async.ts";

interface ResponseTest {
response: Response;
Expand All @@ -22,31 +23,6 @@ const dec = new TextDecoder();

type Handler = () => void;

interface Deferred {
promise: Promise<{}>;
resolve: Handler;
reject: Handler;
}

function deferred(isResolved = false): Deferred {
let resolve: Handler = (): void => void 0;
let reject: Handler = (): void => void 0;
const promise = new Promise(
(res, rej): void => {
resolve = res;
reject = rej;
}
);
if (isResolved) {
resolve();
}
return {
promise,
resolve,
reject
};
}

const responseTests: ResponseTest[] = [
// Default response
{
Expand Down Expand Up @@ -74,6 +50,11 @@ test(async function responseWrite(): Promise<void> {
const request = new ServerRequest();
request.pipelineId = 1;
request.w = bufw;

const d0 = deferred<void>();
d0.resolve();
const d1 = deferred<void>();

request.conn = {
localAddr: "",
remoteAddr: "",
Expand All @@ -88,7 +69,7 @@ test(async function responseWrite(): Promise<void> {
},
close: (): void => {},
lastPipelineId: 0,
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
pendingDeferredMap: new Map([[0, d0], [1, d1]])
};

await request.respond(testCase.response);
Expand Down

0 comments on commit a43e200

Please sign in to comment.