From 554f941713e97128f96c4eb24d64a2c9b06565ab Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 16 May 2019 13:24:32 -0400 Subject: [PATCH] http/server: Clean up async code --- http/http_bench.ts | 5 +- http/server.ts | 155 ++++++++++++++++++-------------------------- http/server_test.ts | 33 ++-------- 3 files changed, 73 insertions(+), 120 deletions(-) diff --git a/http/http_bench.ts b/http/http_bench.ts index 6d72d4be6ab98..06043f9e4e82c 100644 --- a/http/http_bench.ts +++ b/http/http_bench.ts @@ -3,13 +3,12 @@ import { serve } from "./server.ts"; const addr = Deno.args[1] || "127.0.0.1:4500"; const server = serve(addr); - const body = new TextEncoder().encode("Hello World"); async function main(): Promise { console.log(`http://${addr}/`); - for await (const request of server) { - request.respond({ status: 200, body }); + for await (const req of server) { + req.respond({ body }); } } diff --git a/http/server.ts b/http/server.ts index 484ecf808d1ed..30e3146ce0c81 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,5 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. const { listen, copy, toAsyncIterator } = Deno; +type Listener = Deno.Listener; type Conn = Deno.Conn; type Reader = Deno.Reader; type Writer = Deno.Writer; @@ -7,37 +8,14 @@ import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert } from "../testing/asserts.ts"; - -interface Deferred { - promise: Promise<{}>; - resolve: () => void; - reject: () => void; -} - -function deferred(isResolved = false): Deferred { - let resolve, reject; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} +import { Latch, deferred, Deferred } from "../util/async.ts"; interface HttpConn extends Conn { // When read by a newly created request B, lastId is the id pointing to a previous // request A, such that we must wait for responses to A to complete before // writing B's response. lastPipelineId: number; - pendingDeferredMap: Map; + pendingDeferredMap: Map>; } function createHttpConn(c: Conn): HttpConn { @@ -46,8 +24,10 @@ function createHttpConn(c: Conn): HttpConn { pendingDeferredMap: new Map() }); - const resolvedDeferred = deferred(true); - httpConn.pendingDeferredMap.set(0, resolvedDeferred); + const d = deferred(); + d.resolve(); // The first request is ready immediately. + httpConn.pendingDeferredMap.set(0, d); + return httpConn; } @@ -58,6 +38,7 @@ function bufWriter(w: Writer): BufWriter { return new BufWriter(w); } } + export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); @@ -74,6 +55,7 @@ export function setContentLength(r: Response): void { } } } + async function writeChunkedBody(w: Writer, r: Reader): Promise { const writer = bufWriter(w); const encoder = new TextEncoder(); @@ -90,6 +72,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise { const endChunk = encoder.encode("0\r\n\r\n"); await writer.write(endChunk); } + export async function writeResponse(w: Writer, r: Response): Promise { const protoMajor = 1; const protoMinor = 1; @@ -131,6 +114,7 @@ export async function writeResponse(w: Writer, r: Response): Promise { } await writer.flush(); } + async function readAllIterator( it: AsyncIterableIterator ): Promise { @@ -250,7 +234,7 @@ export class ServerRequest { lastPipelineId ); assert(!!lastPipelineDeferred); - await lastPipelineDeferred.promise; + await lastPipelineDeferred; // If yes, delete old deferred and proceed with writing. this.conn.pendingDeferredMap.delete(lastPipelineId); // Write our response! @@ -264,11 +248,6 @@ export class ServerRequest { } } -interface ServeEnv { - reqQueue: ServerRequest[]; - serveDeferred: Deferred; -} - /** Continuously read more requests from conn until EOF * Calls maybeHandleReq. * bufr is empty on a fresh TCP connection. @@ -279,11 +258,8 @@ interface ServeEnv { */ async function readRequest( c: HttpConn, - bufr?: BufReader + bufr: BufReader ): Promise<[ServerRequest, BufState]> { - if (!bufr) { - bufr = new BufReader(c); - } const bufw = new BufWriter(c); const req = new ServerRequest(); @@ -313,65 +289,62 @@ async function readRequest( return [req, err]; } -function maybeHandleReq( - env: ServeEnv, - conn: Conn, - maybeReq: [ServerRequest, BufState] -): void { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; +export class Server implements AsyncIterableIterator { + private closing = false; + private looping = false; + private latch = new Latch(); + + constructor(public listener: Listener) {} + + async acceptLoop(): Promise { + assert(!this.looping); + this.looping = true; + try { + while (!this.closing) { + const conn = await this.listener.accept(); + this.serveConn(conn); // async! + } + } finally { + this.looping = false; + } } - env.reqQueue.push(req); // push req to queue - env.serveDeferred.resolve(); // signal while loop to process it -} -function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); -} + close(): void { + this.closing = true; + this.listener.close(); + } -export async function* serve( - addr: string -): AsyncIterableIterator { - const listener = listen("tcp", addr); - const env: ServeEnv = { - reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() - }; - - // Routine that keeps calling accept - let handleConn = (_conn: Conn): void => {}; - let scheduleAccept = (): void => {}; - const acceptRoutine = (): void => { - scheduleAccept = (): void => { - listener.accept().then(handleConn); - }; - handleConn = (conn: Conn): void => { - const httpConn = createHttpConn(conn); - serveConn(env, httpConn); // don't block - scheduleAccept(); // schedule next accept - }; - - scheduleAccept(); - }; - - acceptRoutine(); - - // Loop hack to allow yield (yield won't work in callbacks) - while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; - env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); + async serveConn(conn: Conn): Promise { + const httpConn = createHttpConn(conn); + const bufr = new BufReader(httpConn); + while (true) { + const [req, err] = await readRequest(httpConn, bufr); + if (err) { + // TODO(ry) This should be more granular. Perhaps return back a 400 or + // 500 error? + httpConn.close(); + break; + } + + await this.latch.send(req); } } - listener.close(); + + async next(): Promise> { + const req = await this.latch.recv(); + return { done: false, value: req }; + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } +} + +export function serve(addr: string): Server { + const listener = listen("tcp", addr); + const server = new Server(listener); + server.acceptLoop(); + return server; } export async function listenAndServe( diff --git a/http/server_test.ts b/http/server_test.ts index 82a368395e0d1..904a667c7f9da 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -11,6 +11,7 @@ import { assertEquals } from "../testing/asserts.ts"; import { Response, ServerRequest, writeResponse } from "./server.ts"; import { BufReader, BufWriter } from "../io/bufio.ts"; import { StringReader } from "../io/readers.ts"; +import { deferred } from "../util/async.ts"; interface ResponseTest { response: Response; @@ -22,31 +23,6 @@ const dec = new TextDecoder(); type Handler = () => void; -interface Deferred { - promise: Promise<{}>; - resolve: Handler; - reject: Handler; -} - -function deferred(isResolved = false): Deferred { - let resolve: Handler = (): void => void 0; - let reject: Handler = (): void => void 0; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - const responseTests: ResponseTest[] = [ // Default response { @@ -74,6 +50,11 @@ test(async function responseWrite(): Promise { const request = new ServerRequest(); request.pipelineId = 1; request.w = bufw; + + const d0 = deferred(); + d0.resolve(); + const d1 = deferred(); + request.conn = { localAddr: "", remoteAddr: "", @@ -88,7 +69,7 @@ test(async function responseWrite(): Promise { }, close: (): void => {}, lastPipelineId: 0, - pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]]) + pendingDeferredMap: new Map([[0, d0], [1, d1]]) }; await request.respond(testCase.response);