Skip to content

Commit

Permalink
Wait for req body to be read and simplify further
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed May 18, 2019
1 parent 635e6b6 commit eab25c5
Showing 1 changed file with 41 additions and 90 deletions.
131 changes: 41 additions & 90 deletions http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,6 @@ import { STATUS_TEXT } from "./http_status.ts";
import { assert, fail } from "../testing/asserts.ts";
import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts";

// TODO(ry) This should be a class, not an interface.
interface HttpConn extends Conn {
// When read by a newly created request B, lastId is the id pointing to a previous
// request A, such that we must wait for responses to A to complete before
// writing B's response.
lastPipelineId: number;
pendingDeferredMap: Map<number, Deferred<void>>;
}

function createHttpConn(c: Conn): HttpConn {
const httpConn = Object.assign(c, {
lastPipelineId: 0,
pendingDeferredMap: new Map()
});

const d = deferred();
d.resolve(); // The first request is ready immediately.
httpConn.pendingDeferredMap.set(0, d);

return httpConn;
}

function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
return w;
Expand Down Expand Up @@ -139,14 +117,14 @@ async function readAllIterator(
}

export class ServerRequest {
pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
conn: HttpConn;
conn: Conn;
r: BufReader;
w: BufWriter;
done: Deferred<void> = deferred();

public async *bodyStream(): AsyncIterableIterator<Uint8Array> {
if (this.headers.has("content-length")) {
Expand Down Expand Up @@ -229,47 +207,22 @@ export class ServerRequest {
}

async respond(r: Response): Promise<void> {
// Check and wait if the previous request is done responding.
const lastPipelineId = this.pipelineId - 1;
const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
lastPipelineId
);
assert(!!lastPipelineDeferred);
await lastPipelineDeferred;
// If yes, delete old deferred and proceed with writing.
this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
await writeResponse(this.w, r);
// Signal the next pending request that it can start writing.
const currPipelineDeferred = this.conn.pendingDeferredMap.get(
this.pipelineId
);
assert(!!currPipelineDeferred);
currPipelineDeferred.resolve();
// Signal that this request has been processed and the next pipelined
// request on the same connection can be accepted.
this.done.resolve();
}
}

async function readRequest(
httpConn: HttpConn,
conn: Conn,
bufr: BufReader
): Promise<[ServerRequest, BufState]> {
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++httpConn.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
httpConn.pendingDeferredMap.set(req.pipelineId, deferred());

// TODO(ry) Let's say the request has a body which is processed by the consumer
// of this iterator. We'd want to wait for this processing to be complete
// before reading a new set of headers. Therefore we might need an await
// statement after this yield, e.g. await req.done where done is a Promise
// that's resolved when the request has been processed.

req.conn = httpConn;
req.conn = conn;
req.r = bufr;
req.w = new BufWriter(httpConn);
req.w = new BufWriter(conn);
const tp = new TextProtoReader(bufr);
let err: BufState;
// First line: GET /index.html HTTP/1.0
Expand All @@ -283,37 +236,6 @@ async function readRequest(
return [req, err];
}

/** Continuously read more requests from conn until EOF
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
async function* iterateHttpRequests(
conn: Conn
): AsyncIterableIterator<ServerRequest> {
const httpConn = createHttpConn(conn);
const bufr = new BufReader(httpConn);
let bufStateErr: BufState;
let req: ServerRequest;
for (;;) {
[req, bufStateErr] = await readRequest(httpConn, bufr);
if (bufStateErr) break;
yield req;
}

if (bufStateErr === "EOF") {
// The connection was gracefully closed.
} else if (bufStateErr instanceof Error) {
// TODO(ry): send something back like a HTTP 500 status.
} else {
fail(`unexpected BufState: ${bufStateErr}`);
}

httpConn.close();
}

export class Server implements AsyncIterable<ServerRequest> {
private closing = false;

Expand All @@ -324,21 +246,50 @@ export class Server implements AsyncIterable<ServerRequest> {
this.listener.close();
}

async *iterateRequestsOnNewConnection(
/** Yield all HTTP requests on a single TCP connection. */
async *iterateHttpRequests(conn: Conn): AsyncIterableIterator<ServerRequest> {
const bufr = new BufReader(conn);
let bufStateErr: BufState;
let req: ServerRequest;

while (!this.closing) {
[req, bufStateErr] = await readRequest(conn, bufr);
if (bufStateErr) break;
yield req;
// Wait for the request to be processed before we accept a new request on
// this connection.
await req.done;
}

if (bufStateErr === "EOF") {
// The connection was gracefully closed.
} else if (bufStateErr instanceof Error) {
// TODO(ry): send something back like a HTTP 500 status.
} else if (this.closing) {
// There are more requests incoming but the server is closing.
// TODO(ry): send a back a HTTP 503 Service Unavailable status.
} else {
fail(`unexpected BufState: ${bufStateErr}`);
}

conn.close();
}

async *acceptConnAndIterateHttpRequests(
mux: MuxAsyncIterator<ServerRequest>
): AsyncIterableIterator<ServerRequest> {
if (this.closing) return;
// Wait for a new connection.
const conn = await this.listener.accept();
// Try to accept another connection and add it to the multiplexer.
mux.add(this.iterateRequestsOnNewConnection(mux));
mux.add(this.acceptConnAndIterateHttpRequests(mux));
// Yield the requests that arrive on the just-accepted connection.
yield* iterateHttpRequests(conn);
yield* this.iterateHttpRequests(conn);
}

[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator();
mux.add(this.iterateRequestsOnNewConnection(mux));
mux.add(this.acceptConnAndIterateHttpRequests(mux));
return mux;
}
}
Expand Down

0 comments on commit eab25c5

Please sign in to comment.