From f81455c6ec083d9920b479d159636f448b86891e Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Fri, 11 Oct 2024 22:48:20 +0200 Subject: [PATCH] Unflake "streaming responses cancel inner stream after disconnect" test [flakiness metrics](https://app.datadoghq.com/ci/test-runs?query=test_level%3Atest%20env%3Aci%20%40git.repository.id%3Agithub.com%2Fvercel%2Fnext.js%20%40test.service%3Anextjs%20%40test.status%3Afail%20%40test.suite%3A%22streaming%20responses%20cancel%20inner%20stream%20after%20disconnect%22&agg_m=count&agg_m_source=base&agg_t=count¤tTab=overview&eventStack=&fromUser=false&index=citest&start=1720903715972&end=1728679715972&paused=false) The flakiness of the test is underreported because due to masked errors, the test sometimes yields false-positive results. Due to a slight increase in compilation times for route handlers in #70897, the test started to fail consistently in that PR. This PR fixes the flakiness by handling the case where the request might already be aborted before the response was sent. This leads to the stream not being consumed, and subsequently its `finished` promise is never resolved, which finally leads to test timeouts. --- .../cancel-request/app/edge-route/route.ts | 31 +++++++++++++--- .../cancel-request/app/node-route/route.ts | 31 +++++++++++++--- test/e2e/cancel-request/middleware.ts | 30 +++++++++++++--- test/e2e/cancel-request/pages/api/edge-api.ts | 31 +++++++++++++--- test/e2e/cancel-request/pages/api/node-api.ts | 36 ++++++++++++++----- test/e2e/cancel-request/stream-cancel.test.ts | 21 ++++++----- test/e2e/cancel-request/streamable.ts | 7 ++++ 7 files changed, 153 insertions(+), 34 deletions(-) diff --git a/test/e2e/cancel-request/app/edge-route/route.ts b/test/e2e/cancel-request/app/edge-route/route.ts index 0f1fde9a0d5fd..639745fe3ff11 100644 --- a/test/e2e/cancel-request/app/edge-route/route.ts +++ b/test/e2e/cancel-request/app/edge-route/route.ts @@ -1,26 +1,49 @@ +import { NextRequest } from 'next/server' import { Streamable } from '../../streamable' export const runtime = 'edge' let streamable: ReturnType | undefined -export async function GET(req: Request): Promise { +export async function GET(req: NextRequest): Promise { + if (req.nextUrl.searchParams.has('compile')) { + // The request just wants to trigger compilation. + return new Response(null, { status: 204 }) + } + // Consume the entire request body. // This is so we don't confuse the request close with the connection close. await req.text() - const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const write = req.nextUrl.searchParams.get('write') + if (write) { - const s = (streamable = Streamable(+write!)) + const s = (streamable = Streamable(+write)) + + // The request was aborted before the response was returned. + if (req.signal.aborted) { + s.abort() + return new Response(null, { status: 204 }) + } + req.signal.onabort = () => { s.abort() } + return new Response(s.stream) } // The 2nd request should render the stats. We don't use a query param // because edge rendering will create a different bundle for that. - const old = streamable! + const old = streamable + + if (!old) { + return new Response( + 'The streamable from the prime request is unexpectedly not available', + { status: 500 } + ) + } + streamable = undefined const i = await old.finished return new Response(`${i}`) diff --git a/test/e2e/cancel-request/app/node-route/route.ts b/test/e2e/cancel-request/app/node-route/route.ts index e10adb30c21d9..bde6736f63bd3 100644 --- a/test/e2e/cancel-request/app/node-route/route.ts +++ b/test/e2e/cancel-request/app/node-route/route.ts @@ -1,4 +1,5 @@ import { Streamable } from '../../streamable' +import { NextRequest } from 'next/server' export const runtime = 'nodejs' // Next thinks it can statically compile this route, which breaks the test. @@ -6,23 +7,45 @@ export const dynamic = 'force-dynamic' let streamable: ReturnType | undefined -export async function GET(req: Request): Promise { +export async function GET(req: NextRequest): Promise { + if (req.nextUrl.searchParams.has('compile')) { + // The request just wants to trigger compilation. + return new Response(null, { status: 204 }) + } + // Consume the entire request body. // This is so we don't confuse the request close with the connection close. await req.text() - const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const write = req.nextUrl.searchParams.get('write') + if (write) { - const s = (streamable = Streamable(+write!)) + const s = (streamable = Streamable(+write)) + + // The request was aborted before the response was returned. + if (req.signal.aborted) { + s.abort() + return new Response(null, { status: 204 }) + } + req.signal.onabort = () => { s.abort() } + return new Response(s.stream) } // The 2nd request should render the stats. We don't use a query param // because edge rendering will create a different bundle for that. - const old = streamable! + const old = streamable + + if (!old) { + return new Response( + 'The streamable from the prime request is unexpectedly not available', + { status: 500 } + ) + } + streamable = undefined const i = await old.finished return new Response(`${i}`) diff --git a/test/e2e/cancel-request/middleware.ts b/test/e2e/cancel-request/middleware.ts index d440e0eb2e151..70afba1dbddb8 100644 --- a/test/e2e/cancel-request/middleware.ts +++ b/test/e2e/cancel-request/middleware.ts @@ -1,3 +1,4 @@ +import { NextRequest } from 'next/server' import { Streamable } from './streamable' export const config = { @@ -6,24 +7,45 @@ export const config = { let streamable: ReturnType | undefined -export default async function handler(req: Request): Promise { +export default async function handler(req: NextRequest): Promise { + if (req.nextUrl.searchParams.has('compile')) { + // The request just wants to trigger compilation. + return new Response(null, { status: 204 }) + } + // Consume the entire request body. // This is so we don't confuse the request close with the connection close. await req.text() - const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const write = req.nextUrl.searchParams.get('write') if (write) { - const s = (streamable = Streamable(+write!)) + const s = (streamable = Streamable(+write)) + + // The request was aborted before the response was returned. + if (req.signal.aborted) { + s.abort() + return new Response(null, { status: 204 }) + } + req.signal.onabort = () => { s.abort() } + return new Response(s.stream) } // The 2nd request should render the stats. We don't use a query param // because edge rendering will create a different bundle for that. - const old = streamable! + const old = streamable + + if (!old) { + return new Response( + 'The streamable from the prime request is unexpectedly not available', + { status: 500 } + ) + } + streamable = undefined const i = await old.finished return new Response(`${i}`) diff --git a/test/e2e/cancel-request/pages/api/edge-api.ts b/test/e2e/cancel-request/pages/api/edge-api.ts index 4deaa66c23ac6..e09ea222fa721 100644 --- a/test/e2e/cancel-request/pages/api/edge-api.ts +++ b/test/e2e/cancel-request/pages/api/edge-api.ts @@ -1,3 +1,4 @@ +import { NextRequest } from 'next/server' import { Streamable } from '../../streamable' export const config = { @@ -6,23 +7,45 @@ export const config = { let streamable: ReturnType | undefined -export default async function handler(req: Request): Promise { +export default async function handler(req: NextRequest): Promise { + if (req.nextUrl.searchParams.has('compile')) { + // The request just wants to trigger compilation. + return new Response(null, { status: 204 }) + } + // Consume the entire request body. // This is so we don't confuse the request close with the connection close. await req.text() - const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const write = req.nextUrl.searchParams.get('write') + if (write) { - const s = (streamable = Streamable(+write!)) + const s = (streamable = Streamable(+write)) + + // The request was aborted before the response was returned. + if (req.signal.aborted) { + s.abort() + return new Response(null, { status: 204 }) + } + req.signal.onabort = () => { s.abort() } + return new Response(s.stream) } // The 2nd request should render the stats. We don't use a query param // because edge rendering will create a different bundle for that. - const old = streamable! + const old = streamable + + if (!old) { + return new Response( + 'The streamable from the prime request is unexpectedly not available', + { status: 500 } + ) + } + streamable = undefined const i = await old.finished return new Response(`${i}`) diff --git a/test/e2e/cancel-request/pages/api/node-api.ts b/test/e2e/cancel-request/pages/api/node-api.ts index 1139bf16007be..d7bf1c9c69ed3 100644 --- a/test/e2e/cancel-request/pages/api/node-api.ts +++ b/test/e2e/cancel-request/pages/api/node-api.ts @@ -8,18 +8,26 @@ export const config = { let readable: ReturnType | undefined -export default function handler( +export default async function handler( req: IncomingMessage, res: ServerResponse ): Promise { + const url = new URL(req.url!, 'http://localhost/') + + if (url.searchParams.has('compile')) { + // The request just wants to trigger compilation. + res.statusCode = 204 + res.end() + return + } + // Pages API requests have already consumed the body. // This is so we don't confuse the request close with the connection close. - const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') - // The 2nd request should render the stats. We don't use a query param - // because edge rendering will create a different bundle for that. + const write = url.searchParams.get('write') + if (write) { - const r = (readable = Readable(+write!)) + const r = (readable = Readable(+write)) res.on('close', () => { r.abort() }) @@ -31,9 +39,19 @@ export default function handler( }) } - const old = readable! + // The 2nd request should render the stats. We don't use a query param + // because edge rendering will create a different bundle for that. + const old = readable + + if (!old) { + res.statusCode = 500 + res.end( + 'The streamable from the prime request is unexpectedly not available' + ) + return + } + readable = undefined - return old.finished.then((i) => { - res.end(`${i}`) - }) + const i = await old.finished + res.end(`${i}`) } diff --git a/test/e2e/cancel-request/stream-cancel.test.ts b/test/e2e/cancel-request/stream-cancel.test.ts index d677749f4f6ce..eb3cc8cb16f85 100644 --- a/test/e2e/cancel-request/stream-cancel.test.ts +++ b/test/e2e/cancel-request/stream-cancel.test.ts @@ -7,9 +7,6 @@ describe('streaming responses cancel inner stream after disconnect', () => { files: __dirname, }) - // For some reason, it's flakey. Try a few times. - jest.retryTimes(3) - function prime(url: string, noData?: boolean) { return new Promise((resolve, reject) => { url = new URL(url, next.url).href @@ -55,13 +52,19 @@ describe('streaming responses cancel inner stream after disconnect', () => { ['edge pages api', '/api/edge-api'], ['node pages api', '/api/node-api'], ])('%s', (_name, path) => { + beforeAll(async () => { + // Trigger compilation of the route so that compilation time does not + // factor into the actual test requests. + await next.fetch(path + '?compile') + }) + it('cancels stream making progress', async () => { // If the stream is making regular progress, then we'll eventually hit // the break because `res.destroyed` is true. await prime(path + '?write=25') const res = await next.fetch(path) - const i = +(await res.text()) - expect(i).toBeWithin(1, 5) + const i = await res.text() + expect(i).toBeOneOf(['1', '2', '3', '4', '5']) }, 2500) it('cancels stalled stream', async () => { @@ -69,8 +72,8 @@ describe('streaming responses cancel inner stream after disconnect', () => { // point, so this ensures we handle it with an out-of-band cancellation. await prime(path + '?write=1') const res = await next.fetch(path) - const i = +(await res.text()) - expect(i).toBe(1) + const i = await res.text() + expect(i).toBe('1') }, 2500) it('cancels stream that never sent data', async () => { @@ -78,8 +81,8 @@ describe('streaming responses cancel inner stream after disconnect', () => { // haven't even established the response object yet. await prime(path + '?write=0', true) const res = await next.fetch(path) - const i = +(await res.text()) - expect(i).toBe(0) + const i = await res.text() + expect(i).toBe('0') }, 2500) }) }) diff --git a/test/e2e/cancel-request/streamable.ts b/test/e2e/cancel-request/streamable.ts index 73edfd49036fb..57e15dad2dea8 100644 --- a/test/e2e/cancel-request/streamable.ts +++ b/test/e2e/cancel-request/streamable.ts @@ -5,15 +5,22 @@ export function Streamable(write: number) { const cleanedUp = new Deferred() const aborted = new Deferred() let i = 0 + let startedConsuming = false const streamable = { finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i), abort() { aborted.resolve() + + if (!startedConsuming) { + cleanedUp.resolve() + } }, stream: new ReadableStream({ async pull(controller) { + startedConsuming = true + if (i >= write) { return }