Skip to content

Commit

Permalink
perf: improve async op santizer speed and accuracy (denoland#20501)
Browse files Browse the repository at this point in the history
This commit improves async op sanitizer speed by only delaying metrics
collection if there are pending ops. This
results in a speedup of around 30% for small CPU bound unit tests.

It performs this check and possible delay on every collection now,
fixing an issue with parent test leaks into steps.
  • Loading branch information
lucacasonato authored Sep 16, 2023
1 parent bf07604 commit 430b63c
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 37 deletions.
81 changes: 63 additions & 18 deletions cli/js/40_testing.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,39 @@ let hasSetOpSanitizerDelayMacrotask = false;
// that resolves when it's (probably) fine to run the op sanitizer.
//
// This is implemented by adding a macrotask callback that runs after the
// timer macrotasks, so we can guarantee that a currently running interval
// will have an associated op. An additional `setTimeout` of 0 is needed
// before that, though, in order to give time for worker message ops to finish
// (since timeouts of 0 don't queue tasks in the timer queue immediately).
function opSanitizerDelay() {
// all ready async ops resolve, and the timer macrotask. Using just a macrotask
// callback without delaying is sufficient, because when the macrotask callback
// runs after async op dispatch, we know that all async ops that can currently
// return `Poll::Ready` have done so, and have been dispatched to JS.
//
// Worker ops are an exception to this, because there is no way for the user to
// await shutdown of the worker from the thread calling `worker.terminate()`.
// Because of this, we give extra leeway for worker ops to complete, by waiting
// for a whole millisecond if there are pending worker ops.
function opSanitizerDelay(hasPendingWorkerOps) {
if (!hasSetOpSanitizerDelayMacrotask) {
core.setMacrotaskCallback(handleOpSanitizerDelayMacrotask);
hasSetOpSanitizerDelayMacrotask = true;
}
return new Promise((resolve) => {
const p = new Promise((resolve) => {
// Schedule an async op to complete immediately to ensure the macrotask is
// run. We rely on the fact that enqueueing the resolver callback during the
// timeout callback will mean that the resolver gets called in the same
// event loop tick as the timeout callback.
setTimeout(() => {
ArrayPrototypePush(opSanitizerDelayResolveQueue, resolve);
}, 1);
}, hasPendingWorkerOps ? 1 : 0);
});
return p;
}

function handleOpSanitizerDelayMacrotask() {
ArrayPrototypeShift(opSanitizerDelayResolveQueue)?.();
return opSanitizerDelayResolveQueue.length === 0;
const resolve = ArrayPrototypeShift(opSanitizerDelayResolveQueue);
if (resolve) {
resolve();
return opSanitizerDelayResolveQueue.length === 0;
}
return undefined; // we performed no work, so can skip microtasks checkpoint
}

// An async operation to $0 was started in this test, but never completed. This is often caused by not $1.
Expand Down Expand Up @@ -126,7 +140,8 @@ const OP_DETAILS = {
"op_tls_start": ["start a TLS connection", "awaiting a `Deno.startTls` call"],
"op_truncate_async": ["truncate a file", "awaiting the result of a `Deno.truncate` call"],
"op_utime_async": ["change file timestamps", "awaiting the result of a `Deno.utime` call"],
"op_worker_recv_message": ["receive a message from a web worker", "terminating a `Worker`"],
"op_host_recv_message": ["receive a message from a web worker", "terminating a `Worker`"],
"op_host_recv_ctrl": ["receive a message from a web worker", "terminating a `Worker`"],
"op_ws_close": ["close a WebSocket", "awaiting until the `close` event is emitted on a `WebSocket`, or the `WebSocketStream#closed` promise resolves"],
"op_ws_create": ["create a WebSocket", "awaiting until the `open` event is emitted on a `WebSocket`, or the result of a `WebSocketStream#connection` promise"],
"op_ws_next_event": ["receive the next message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"],
Expand All @@ -136,6 +151,28 @@ const OP_DETAILS = {
"op_ws_send_pong": ["send a message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"],
};

function collectReliableOpMetrics() {
let metrics = core.metrics();
if (metrics.opsDispatched > metrics.opsCompleted) {
// If there are still async ops pending, we drain the event loop to the
// point where all ops that can return `Poll::Ready` have done so, to ensure
// that any ops are ready because of user cleanup code are completed.
const hasPendingWorkerOps = metrics.ops.op_host_recv_message && (
metrics.ops.op_host_recv_message.opsDispatched >
metrics.ops.op_host_recv_message.opsCompleted ||
metrics.ops.op_host_recv_ctrl.opsDispatched >
metrics.ops.op_host_recv_ctrl.opsCompleted
);
return opSanitizerDelay(hasPendingWorkerOps).then(() => {
metrics = core.metrics();
const traces = new Map(core.opCallTraces);
return { metrics, traces };
});
}
const traces = new Map(core.opCallTraces);
return { metrics, traces };
}

// Wrap test function in additional assertion that makes sure
// the test case does not leak async "ops" - ie. number of async
// completed ops after the test is the same as number of dispatched
Expand All @@ -144,19 +181,26 @@ const OP_DETAILS = {
function assertOps(fn) {
/** @param desc {TestDescription | TestStepDescription} */
return async function asyncOpSanitizer(desc) {
const pre = core.metrics();
const preTraces = new Map(core.opCallTraces);
let metrics = collectReliableOpMetrics();
if (metrics.then) {
// We're delaying so await to get the result asynchronously.
metrics = await metrics;
}
const { metrics: pre, traces: preTraces } = metrics;
let post;
let postTraces;

try {
const innerResult = await fn(desc);
if (innerResult) return innerResult;
} finally {
// Defer until next event loop turn - that way timeouts and intervals
// cleared can actually be removed from resource table, otherwise
// false positives may occur (https://github.com/denoland/deno/issues/4591)
await opSanitizerDelay();
let metrics = collectReliableOpMetrics();
if (metrics.then) {
// We're delaying so await to get the result asynchronously.
metrics = await metrics;
}
({ metrics: post, traces: postTraces } = metrics);
}
const post = core.metrics();
const postTraces = new Map(core.opCallTraces);

// We're checking diff because one might spawn HTTP server in the background
// that will be a pending async op before test starts.
Expand Down Expand Up @@ -232,6 +276,7 @@ function assertOps(fn) {
ArrayPrototypePush(details, message);
}
}

return { failed: { leakedOps: [details, core.isOpCallTracingEnabled()] } };
};
}
Expand Down
10 changes: 10 additions & 0 deletions cli/tests/testdata/test/ops_sanitizer_step_leak.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Check [WILDCARD]/cli/tests/testdata/test/ops_sanitizer_step_leak.ts
running 1 test from ./cli/tests/testdata/test/ops_sanitizer_step_leak.ts
timeout ...
step ... ok [WILDCARD]
------- output -------
done
----- output end -----
timeout ... ok [WILDCARD]

ok | 1 passed (1 step) | 0 failed [WILDCARD]
10 changes: 10 additions & 0 deletions cli/tests/testdata/test/ops_sanitizer_step_leak.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Deno.test("timeout", async (t) => {
const timer = setTimeout(() => {
console.log("timeout");
}, 10000);
clearTimeout(timer);
await t.step("step", async () => {
await new Promise<void>((resolve) => setTimeout(() => resolve(), 10));
});
console.log("done");
});
23 changes: 17 additions & 6 deletions cli/tests/unit/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1752,14 +1752,16 @@ Deno.test(
// if transfer-encoding is sent, content-length is ignored
// even if it has an invalid value (content-length > totalLength)
const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });

const res = await response.arrayBuffer();
const buf = new TextEncoder().encode(data);
assertEquals(res.byteLength, buf.byteLength);
assertEquals(new Uint8Array(res), buf);

listener.close();
client.close();
},
);

Expand All @@ -1781,21 +1783,23 @@ Deno.test(

// It should fail if multiple content-length headers with different values are sent
const listener = invalidServer(addr, body);
const client = Deno.createHttpClient({});
await assertRejects(
async () => {
await fetch(`http://${addr}/`);
await fetch(`http://${addr}/`, { client });
},
TypeError,
"invalid content-length parsed",
);

listener.close();
client.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchWithInvalidContentLength(): Promise<
async function fetchWithInvalidContentLength2(): Promise<
void
> {
const addr = `127.0.0.1:${listenPort}`;
Expand All @@ -1807,7 +1811,8 @@ Deno.test(
);

const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });

// If content-length < totalLength, a maximum of content-length bytes
// should be returned.
Expand All @@ -1817,12 +1822,13 @@ Deno.test(
assertEquals(new Uint8Array(res), buf.subarray(contentLength));

listener.close();
client.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchWithInvalidContentLength(): Promise<
async function fetchWithInvalidContentLength3(): Promise<
void
> {
const addr = `127.0.0.1:${listenPort}`;
Expand All @@ -1834,7 +1840,8 @@ Deno.test(
);

const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });
// If content-length > totalLength, a maximum of content-length bytes
// should be returned.
await assertRejects(
Expand All @@ -1846,6 +1853,7 @@ Deno.test(
);

listener.close();
client.close();
},
);

Expand Down Expand Up @@ -1935,10 +1943,12 @@ Deno.test(
},
});

const client = Deno.createHttpClient({});
const err = await assertRejects(() =>
fetch(`http://localhost:${listenPort}/`, {
body: stream,
method: "POST",
client,
})
);

Expand All @@ -1948,6 +1958,7 @@ Deno.test(
assertEquals(err.cause.message, "foo");

await server;
client.close();
},
);

Expand Down
22 changes: 20 additions & 2 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,14 @@ Deno.test(
await respondWith(new Response(stream.readable));
})();

const resp = await fetch(`http://127.0.0.1:${listenPort}/`);
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, { client });
const respBody = await resp.text();
assertEquals("hello world", respBody);
await promise;
httpConn!.close();
listener.close();
client.close();
},
);

Expand All @@ -216,8 +218,8 @@ Deno.test(
writer.write(new TextEncoder().encode("world"));
writer.close();

const listener = Deno.listen({ port: listenPort });
const promise = (async () => {
const listener = Deno.listen({ port: listenPort });
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const evt = await httpConn.nextRequest();
Expand All @@ -235,14 +237,17 @@ Deno.test(
listener.close();
})();

const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, {
body: stream.readable,
method: "POST",
headers: { "connection": "close" },
client,
});

await resp.arrayBuffer();
await promise;
client.close();
},
);

Expand Down Expand Up @@ -375,16 +380,19 @@ Deno.test(
await respondWith(new Response("response"));
})();

const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, {
method: "POST",
body: "request",
client,
});
const respBody = await resp.text();
assertEquals("response", respBody);
await promise;

httpConn!.close();
listener.close();
client.close();
},
);

Expand Down Expand Up @@ -427,9 +435,11 @@ Deno.test(
listener.close();
})();

const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`);
await resp.body!.cancel();
await promise;
client.close();
},
);

Expand Down Expand Up @@ -788,7 +798,11 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
socket.send(m.data);
socket.close(1001);
};
const close = new Promise<void>((resolve) => {
socket.onclose = () => resolve();
});
await respondWith(response);
await close;
})();

const def = deferred();
Expand Down Expand Up @@ -1228,11 +1242,15 @@ Deno.test(
async function client() {
const socket = new WebSocket(`ws://${hostname}:${port}/`);
socket.onopen = () => socket.send("bla bla");
const closed = new Promise<void>((resolve) => {
socket.onclose = () => resolve();
});
const { data } = await new Promise<MessageEvent<string>>((res) =>
socket.onmessage = res
);
assertStrictEquals(data, "bla bla");
socket.close();
await closed;
}

await Promise.all([server(), client()]);
Expand Down
5 changes: 3 additions & 2 deletions cli/tests/unit/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ Deno.test(
);

Deno.test({ permissions: { net: true } }, async function whatwgStreams() {
(async () => {
const server = (async () => {
const listener = Deno.listen({ hostname: "127.0.0.1", port: listenPort });
const conn = await listener.accept();
await conn.readable.pipeTo(conn.writable);
Expand All @@ -920,6 +920,7 @@ Deno.test({ permissions: { net: true } }, async function whatwgStreams() {
assert(!done);
assertEquals(decoder.decode(value), "Hello World");
await reader.cancel();
await server;
});

Deno.test(
Expand Down Expand Up @@ -973,7 +974,7 @@ Deno.test(

Deno.test(
{ permissions: { read: true, run: true } },
async function netListenUnref() {
async function netListenUnref2() {
const [statusCode, _output] = await execCode(`
async function main() {
const listener = Deno.listen({ port: ${listenPort} });
Expand Down
Loading

0 comments on commit 430b63c

Please sign in to comment.