Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve async op santizer speed and accuracy #20501

Merged
merged 9 commits into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 63 additions & 18 deletions cli/js/40_testing.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,39 @@ let hasSetOpSanitizerDelayMacrotask = false;
// that resolves when it's (probably) fine to run the op sanitizer.
//
// This is implemented by adding a macrotask callback that runs after the
// timer macrotasks, so we can guarantee that a currently running interval
// will have an associated op. An additional `setTimeout` of 0 is needed
// before that, though, in order to give time for worker message ops to finish
// (since timeouts of 0 don't queue tasks in the timer queue immediately).
function opSanitizerDelay() {
// all ready async ops resolve, and the timer macrotask. Using just a macrotask
// callback without delaying is sufficient, because when the macrotask callback
// runs after async op dispatch, we know that all async ops that can currently
// return `Poll::Ready` have done so, and have been dispatched to JS.
//
// Worker ops are an exception to this, because there is no way for the user to
// await shutdown of the worker from the thread calling `worker.terminate()`.
// Because of this, we give extra leeway for worker ops to complete, by waiting
// for a whole millisecond if there are pending worker ops.
function opSanitizerDelay(hasPendingWorkerOps) {
if (!hasSetOpSanitizerDelayMacrotask) {
core.setMacrotaskCallback(handleOpSanitizerDelayMacrotask);
hasSetOpSanitizerDelayMacrotask = true;
}
return new Promise((resolve) => {
const p = new Promise((resolve) => {
// Schedule an async op to complete immediately to ensure the macrotask is
// run. We rely on the fact that enqueueing the resolver callback during the
// timeout callback will mean that the resolver gets called in the same
// event loop tick as the timeout callback.
setTimeout(() => {
ArrayPrototypePush(opSanitizerDelayResolveQueue, resolve);
}, 1);
}, hasPendingWorkerOps ? 1 : 0);
});
return p;
}

function handleOpSanitizerDelayMacrotask() {
ArrayPrototypeShift(opSanitizerDelayResolveQueue)?.();
return opSanitizerDelayResolveQueue.length === 0;
const resolve = ArrayPrototypeShift(opSanitizerDelayResolveQueue);
if (resolve) {
resolve();
return opSanitizerDelayResolveQueue.length === 0;
}
return undefined; // we performed no work, so can skip microtasks checkpoint
}

// An async operation to $0 was started in this test, but never completed. This is often caused by not $1.
Expand Down Expand Up @@ -126,7 +140,8 @@ const OP_DETAILS = {
"op_tls_start": ["start a TLS connection", "awaiting a `Deno.startTls` call"],
"op_truncate_async": ["truncate a file", "awaiting the result of a `Deno.truncate` call"],
"op_utime_async": ["change file timestamps", "awaiting the result of a `Deno.utime` call"],
"op_worker_recv_message": ["receive a message from a web worker", "terminating a `Worker`"],
"op_host_recv_message": ["receive a message from a web worker", "terminating a `Worker`"],
"op_host_recv_ctrl": ["receive a message from a web worker", "terminating a `Worker`"],
"op_ws_close": ["close a WebSocket", "awaiting until the `close` event is emitted on a `WebSocket`, or the `WebSocketStream#closed` promise resolves"],
"op_ws_create": ["create a WebSocket", "awaiting until the `open` event is emitted on a `WebSocket`, or the result of a `WebSocketStream#connection` promise"],
"op_ws_next_event": ["receive the next message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"],
Expand All @@ -136,6 +151,28 @@ const OP_DETAILS = {
"op_ws_send_pong": ["send a message on a WebSocket", "closing a `WebSocket` or `WebSocketStream`"],
};

function collectReliableOpMetrics() {
let metrics = core.metrics();
if (metrics.opsDispatched > metrics.opsCompleted) {
// If there are still async ops pending, we drain the event loop to the
// point where all ops that can return `Poll::Ready` have done so, to ensure
// that any ops are ready because of user cleanup code are completed.
const hasPendingWorkerOps = metrics.ops.op_host_recv_message && (
metrics.ops.op_host_recv_message.opsDispatched >
metrics.ops.op_host_recv_message.opsCompleted ||
metrics.ops.op_host_recv_ctrl.opsDispatched >
metrics.ops.op_host_recv_ctrl.opsCompleted
);
return opSanitizerDelay(hasPendingWorkerOps).then(() => {
metrics = core.metrics();
const traces = new Map(core.opCallTraces);
return { metrics, traces };
});
}
const traces = new Map(core.opCallTraces);
return { metrics, traces };
}

// Wrap test function in additional assertion that makes sure
// the test case does not leak async "ops" - ie. number of async
// completed ops after the test is the same as number of dispatched
Expand All @@ -144,19 +181,26 @@ const OP_DETAILS = {
function assertOps(fn) {
/** @param desc {TestDescription | TestStepDescription} */
return async function asyncOpSanitizer(desc) {
const pre = core.metrics();
const preTraces = new Map(core.opCallTraces);
let metrics = collectReliableOpMetrics();
if (metrics.then) {
// We're delaying so await to get the result asynchronously.
metrics = await metrics;
}
const { metrics: pre, traces: preTraces } = metrics;
let post;
let postTraces;

try {
const innerResult = await fn(desc);
if (innerResult) return innerResult;
} finally {
// Defer until next event loop turn - that way timeouts and intervals
// cleared can actually be removed from resource table, otherwise
// false positives may occur (https://github.com/denoland/deno/issues/4591)
await opSanitizerDelay();
let metrics = collectReliableOpMetrics();
if (metrics.then) {
// We're delaying so await to get the result asynchronously.
metrics = await metrics;
}
({ metrics: post, traces: postTraces } = metrics);
}
const post = core.metrics();
const postTraces = new Map(core.opCallTraces);

// We're checking diff because one might spawn HTTP server in the background
// that will be a pending async op before test starts.
Expand Down Expand Up @@ -232,6 +276,7 @@ function assertOps(fn) {
ArrayPrototypePush(details, message);
}
}

return { failed: { leakedOps: [details, core.isOpCallTracingEnabled()] } };
};
}
Expand Down
10 changes: 10 additions & 0 deletions cli/tests/testdata/test/ops_sanitizer_step_leak.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Check [WILDCARD]/cli/tests/testdata/test/ops_sanitizer_step_leak.ts
running 1 test from ./cli/tests/testdata/test/ops_sanitizer_step_leak.ts
timeout ...
step ... ok [WILDCARD]
------- output -------
done
----- output end -----
timeout ... ok [WILDCARD]

ok | 1 passed (1 step) | 0 failed [WILDCARD]
10 changes: 10 additions & 0 deletions cli/tests/testdata/test/ops_sanitizer_step_leak.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Deno.test("timeout", async (t) => {
const timer = setTimeout(() => {
console.log("timeout");
}, 10000);
clearTimeout(timer);
await t.step("step", async () => {
await new Promise<void>((resolve) => setTimeout(() => resolve(), 10));
});
console.log("done");
});
23 changes: 17 additions & 6 deletions cli/tests/unit/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1752,14 +1752,16 @@ Deno.test(
// if transfer-encoding is sent, content-length is ignored
// even if it has an invalid value (content-length > totalLength)
const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });

const res = await response.arrayBuffer();
const buf = new TextEncoder().encode(data);
assertEquals(res.byteLength, buf.byteLength);
assertEquals(new Uint8Array(res), buf);

listener.close();
client.close();
},
);

Expand All @@ -1781,21 +1783,23 @@ Deno.test(

// It should fail if multiple content-length headers with different values are sent
const listener = invalidServer(addr, body);
const client = Deno.createHttpClient({});
await assertRejects(
async () => {
await fetch(`http://${addr}/`);
await fetch(`http://${addr}/`, { client });
},
TypeError,
"invalid content-length parsed",
);

listener.close();
client.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchWithInvalidContentLength(): Promise<
async function fetchWithInvalidContentLength2(): Promise<
void
> {
const addr = `127.0.0.1:${listenPort}`;
Expand All @@ -1807,7 +1811,8 @@ Deno.test(
);

const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });

// If content-length < totalLength, a maximum of content-length bytes
// should be returned.
Expand All @@ -1817,12 +1822,13 @@ Deno.test(
assertEquals(new Uint8Array(res), buf.subarray(contentLength));

listener.close();
client.close();
},
);

Deno.test(
{ permissions: { net: true } },
async function fetchWithInvalidContentLength(): Promise<
async function fetchWithInvalidContentLength3(): Promise<
void
> {
const addr = `127.0.0.1:${listenPort}`;
Expand All @@ -1834,7 +1840,8 @@ Deno.test(
);

const listener = invalidServer(addr, body);
const response = await fetch(`http://${addr}/`);
const client = Deno.createHttpClient({});
const response = await fetch(`http://${addr}/`, { client });
// If content-length > totalLength, a maximum of content-length bytes
// should be returned.
await assertRejects(
Expand All @@ -1846,6 +1853,7 @@ Deno.test(
);

listener.close();
client.close();
},
);

Expand Down Expand Up @@ -1935,10 +1943,12 @@ Deno.test(
},
});

const client = Deno.createHttpClient({});
const err = await assertRejects(() =>
fetch(`http://localhost:${listenPort}/`, {
body: stream,
method: "POST",
client,
})
);

Expand All @@ -1948,6 +1958,7 @@ Deno.test(
assertEquals(err.cause.message, "foo");

await server;
client.close();
},
);

Expand Down
22 changes: 20 additions & 2 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,14 @@ Deno.test(
await respondWith(new Response(stream.readable));
})();

const resp = await fetch(`http://127.0.0.1:${listenPort}/`);
const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, { client });
const respBody = await resp.text();
assertEquals("hello world", respBody);
await promise;
httpConn!.close();
listener.close();
client.close();
},
);

Expand All @@ -216,8 +218,8 @@ Deno.test(
writer.write(new TextEncoder().encode("world"));
writer.close();

const listener = Deno.listen({ port: listenPort });
const promise = (async () => {
const listener = Deno.listen({ port: listenPort });
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const evt = await httpConn.nextRequest();
Expand All @@ -235,14 +237,17 @@ Deno.test(
listener.close();
})();

const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, {
body: stream.readable,
method: "POST",
headers: { "connection": "close" },
client,
});

await resp.arrayBuffer();
await promise;
client.close();
},
);

Expand Down Expand Up @@ -375,16 +380,19 @@ Deno.test(
await respondWith(new Response("response"));
})();

const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`, {
method: "POST",
body: "request",
client,
});
const respBody = await resp.text();
assertEquals("response", respBody);
await promise;

httpConn!.close();
listener.close();
client.close();
},
);

Expand Down Expand Up @@ -427,9 +435,11 @@ Deno.test(
listener.close();
})();

const client = Deno.createHttpClient({});
const resp = await fetch(`http://127.0.0.1:${listenPort}/`);
await resp.body!.cancel();
await promise;
client.close();
},
);

Expand Down Expand Up @@ -788,7 +798,11 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
socket.send(m.data);
socket.close(1001);
};
const close = new Promise<void>((resolve) => {
socket.onclose = () => resolve();
});
await respondWith(response);
await close;
})();

const def = deferred();
Expand Down Expand Up @@ -1228,11 +1242,15 @@ Deno.test(
async function client() {
const socket = new WebSocket(`ws://${hostname}:${port}/`);
socket.onopen = () => socket.send("bla bla");
const closed = new Promise<void>((resolve) => {
socket.onclose = () => resolve();
});
const { data } = await new Promise<MessageEvent<string>>((res) =>
socket.onmessage = res
);
assertStrictEquals(data, "bla bla");
socket.close();
await closed;
}

await Promise.all([server(), client()]);
Expand Down
5 changes: 3 additions & 2 deletions cli/tests/unit/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ Deno.test(
);

Deno.test({ permissions: { net: true } }, async function whatwgStreams() {
(async () => {
const server = (async () => {
const listener = Deno.listen({ hostname: "127.0.0.1", port: listenPort });
const conn = await listener.accept();
await conn.readable.pipeTo(conn.writable);
Expand All @@ -920,6 +920,7 @@ Deno.test({ permissions: { net: true } }, async function whatwgStreams() {
assert(!done);
assertEquals(decoder.decode(value), "Hello World");
await reader.cancel();
await server;
});

Deno.test(
Expand Down Expand Up @@ -973,7 +974,7 @@ Deno.test(

Deno.test(
{ permissions: { read: true, run: true } },
async function netListenUnref() {
async function netListenUnref2() {
const [statusCode, _output] = await execCode(`
async function main() {
const listener = Deno.listen({ port: ${listenPort} });
Expand Down
Loading