From e52934a09d806a44e3f489700bfd0e7245a74ef1 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 15:35:58 -0700 Subject: [PATCH 01/10] replace readableStreamTee --- .../server/app-render/use-flight-response.tsx | 2 +- .../stream-utils/node-web-streams-helper.ts | 23 ------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index f441aa87e25cf..21f1613c7efa2 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -25,7 +25,7 @@ export function useFlightResponse( createFromReadableStream, } = require(`react-server-dom-webpack/client.edge`) - const [renderStream, forwardStream] = readableStreamTee(req) + const [renderStream, forwardStream] = req.tee() const res = createFromReadableStream(renderStream, { moduleMap: isEdgeRuntime ? clientReferenceManifest.edgeSSRModuleMapping diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index be68dacdfac46..b8751df936539 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -52,29 +52,6 @@ export function cloneTransformStream(source: TransformStream) { return clone } -export function readableStreamTee( - readable: ReadableStream -): [ReadableStream, ReadableStream] { - const transformStream = new TransformStream() - const transformStream2 = new TransformStream() - const writer = transformStream.writable.getWriter() - const writer2 = transformStream2.writable.getWriter() - - const reader = readable.getReader() - async function read() { - const { done, value } = await reader.read() - if (done) { - await Promise.all([writer.close(), writer2.close()]) - return - } - await Promise.all([writer.write(value), writer2.write(value)]) - await read() - } - read() - - return [transformStream.readable, transformStream2.readable] -} - export function chainStreams( streams: ReadableStream[] ): ReadableStream { From e51dc2d29514173ad22fc4a2cb6326939f7916f7 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 15:39:59 -0700 Subject: [PATCH 02/10] s/setTimeout/setImmediate --- .../stream-utils/node-web-streams-helper.ts | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index b8751df936539..83497d256b607 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -6,9 +6,6 @@ import { getTracer } from '../lib/trace/tracer' import { AppRenderSpan } from '../lib/trace/constants' import { decodeText, encodeText } from './encode-decode' -const queueTask = - process.env.NEXT_RUNTIME === 'edge' ? globalThis.setTimeout : setImmediate - export type ReactReadableStream = ReadableStream & { allReady?: Promise | undefined } @@ -107,12 +104,12 @@ export function createBufferedTransformStream(): TransformStream< const flushBuffer = (controller: TransformStreamDefaultController) => { if (!pendingFlush) { pendingFlush = new Promise((resolve) => { - setTimeout(async () => { + setImmediate(async () => { controller.enqueue(bufferedBytes) bufferedBytes = new Uint8Array() pendingFlush = null resolve() - }, 0) + }) }) } return pendingFlush @@ -198,7 +195,7 @@ function createHeadInsertionTransformStream( if (!inserted) { controller.enqueue(chunk) } else { - queueTask(() => { + setImmediate(() => { freezing = false }) } @@ -230,7 +227,7 @@ function createDeferredSuffixStream( // NOTE: streaming flush // Enqueue suffix part before the major chunks are enqueued so that // suffix won't be flushed too early to interrupt the data stream - setTimeout(() => { + setImmediate(() => { controller.enqueue(encodeText(suffix)) res() }) @@ -265,13 +262,10 @@ export function createInlineDataStream( // the safe timing to pipe the data stream, this extra tick is // necessary. dataStreamFinished = new Promise((res) => - // We use `setTimeout` here to ensure that it's inserted after flushing + // We use `setImmediate` here to ensure that it's inserted after flushing // the shell. Note that this implementation might get stale if impl // details of Fizz change in the future. - // Also we are not using `setImmediate` here because it's not available - // broadly in all runtimes, for example some edge workers might not - // have it. - setTimeout(async () => { + setImmediate(async () => { try { while (true) { const { done, value } = await dataStreamReader.read() @@ -284,7 +278,7 @@ export function createInlineDataStream( controller.error(err) } res() - }, 0) + }) ) } }, From 4ee17b2fc50c8ee2fa2fbfe6c66c803bf735d926 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 15:48:10 -0700 Subject: [PATCH 03/10] rm import --- packages/next/src/server/app-render/use-flight-response.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index 21f1613c7efa2..09166efa2b65f 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -1,6 +1,5 @@ import type { ClientReferenceManifest } from '../../build/webpack/plugins/flight-manifest-plugin' import type { FlightResponseRef } from './flight-response-ref' -import { readableStreamTee } from '../stream-utils/node-web-streams-helper' import { encodeText, decodeText } from '../stream-utils/encode-decode' import { htmlEscapeJsonString } from '../htmlescape' From 696a93c58483ea2a43986210f1c98075707298cf Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 15:58:49 -0700 Subject: [PATCH 04/10] Revert "s/setTimeout/setImmediate" This reverts commit e51dc2d29514173ad22fc4a2cb6326939f7916f7. --- .../stream-utils/node-web-streams-helper.ts | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index 83497d256b607..b8751df936539 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -6,6 +6,9 @@ import { getTracer } from '../lib/trace/tracer' import { AppRenderSpan } from '../lib/trace/constants' import { decodeText, encodeText } from './encode-decode' +const queueTask = + process.env.NEXT_RUNTIME === 'edge' ? globalThis.setTimeout : setImmediate + export type ReactReadableStream = ReadableStream & { allReady?: Promise | undefined } @@ -104,12 +107,12 @@ export function createBufferedTransformStream(): TransformStream< const flushBuffer = (controller: TransformStreamDefaultController) => { if (!pendingFlush) { pendingFlush = new Promise((resolve) => { - setImmediate(async () => { + setTimeout(async () => { controller.enqueue(bufferedBytes) bufferedBytes = new Uint8Array() pendingFlush = null resolve() - }) + }, 0) }) } return pendingFlush @@ -195,7 +198,7 @@ function createHeadInsertionTransformStream( if (!inserted) { controller.enqueue(chunk) } else { - setImmediate(() => { + queueTask(() => { freezing = false }) } @@ -227,7 +230,7 @@ function createDeferredSuffixStream( // NOTE: streaming flush // Enqueue suffix part before the major chunks are enqueued so that // suffix won't be flushed too early to interrupt the data stream - setImmediate(() => { + setTimeout(() => { controller.enqueue(encodeText(suffix)) res() }) @@ -262,10 +265,13 @@ export function createInlineDataStream( // the safe timing to pipe the data stream, this extra tick is // necessary. dataStreamFinished = new Promise((res) => - // We use `setImmediate` here to ensure that it's inserted after flushing + // We use `setTimeout` here to ensure that it's inserted after flushing // the shell. Note that this implementation might get stale if impl // details of Fizz change in the future. - setImmediate(async () => { + // Also we are not using `setImmediate` here because it's not available + // broadly in all runtimes, for example some edge workers might not + // have it. + setTimeout(async () => { try { while (true) { const { done, value } = await dataStreamReader.read() @@ -278,7 +284,7 @@ export function createInlineDataStream( controller.error(err) } res() - }) + }, 0) ) } }, From 993c64b5a5ee5c45d475caca0bf5f5bc67133d04 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 16:02:22 -0700 Subject: [PATCH 05/10] Replace setImmedaite with Promise.resolve().then(fn --- .../stream-utils/node-web-streams-helper.ts | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index b8751df936539..969bd304e91d8 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -6,8 +6,10 @@ import { getTracer } from '../lib/trace/tracer' import { AppRenderSpan } from '../lib/trace/constants' import { decodeText, encodeText } from './encode-decode' -const queueTask = - process.env.NEXT_RUNTIME === 'edge' ? globalThis.setTimeout : setImmediate +const queueTask: (fn: () => void) => void = + process.env.NEXT_RUNTIME === 'edge' + ? (fn: () => void) => Promise.resolve().then(fn) + : setImmediate export type ReactReadableStream = ReadableStream & { allReady?: Promise | undefined @@ -107,12 +109,12 @@ export function createBufferedTransformStream(): TransformStream< const flushBuffer = (controller: TransformStreamDefaultController) => { if (!pendingFlush) { pendingFlush = new Promise((resolve) => { - setTimeout(async () => { + queueTask(async () => { controller.enqueue(bufferedBytes) bufferedBytes = new Uint8Array() pendingFlush = null resolve() - }, 0) + }) }) } return pendingFlush @@ -230,7 +232,7 @@ function createDeferredSuffixStream( // NOTE: streaming flush // Enqueue suffix part before the major chunks are enqueued so that // suffix won't be flushed too early to interrupt the data stream - setTimeout(() => { + queueTask(() => { controller.enqueue(encodeText(suffix)) res() }) @@ -265,13 +267,10 @@ export function createInlineDataStream( // the safe timing to pipe the data stream, this extra tick is // necessary. dataStreamFinished = new Promise((res) => - // We use `setTimeout` here to ensure that it's inserted after flushing + // We use `queueTask` here to ensure that it's inserted after flushing // the shell. Note that this implementation might get stale if impl // details of Fizz change in the future. - // Also we are not using `setImmediate` here because it's not available - // broadly in all runtimes, for example some edge workers might not - // have it. - setTimeout(async () => { + queueTask(async () => { try { while (true) { const { done, value } = await dataStreamReader.read() @@ -284,7 +283,7 @@ export function createInlineDataStream( controller.error(err) } res() - }, 0) + }) ) } }, From 6b184717d7f803283da78900c8ed231fff48b0af Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 16:52:00 -0700 Subject: [PATCH 06/10] Revert "replace readableStreamTee" This reverts commit e52934a09d806a44e3f489700bfd0e7245a74ef1. --- .../server/app-render/use-flight-response.tsx | 2 +- .../stream-utils/node-web-streams-helper.ts | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index 09166efa2b65f..d61eeab80df97 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -24,7 +24,7 @@ export function useFlightResponse( createFromReadableStream, } = require(`react-server-dom-webpack/client.edge`) - const [renderStream, forwardStream] = req.tee() + const [renderStream, forwardStream] = readableStreamTee(req) const res = createFromReadableStream(renderStream, { moduleMap: isEdgeRuntime ? clientReferenceManifest.edgeSSRModuleMapping diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index 969bd304e91d8..a4e4d33c49f30 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -54,6 +54,29 @@ export function cloneTransformStream(source: TransformStream) { return clone } +export function readableStreamTee( + readable: ReadableStream +): [ReadableStream, ReadableStream] { + const transformStream = new TransformStream() + const transformStream2 = new TransformStream() + const writer = transformStream.writable.getWriter() + const writer2 = transformStream2.writable.getWriter() + + const reader = readable.getReader() + async function read() { + const { done, value } = await reader.read() + if (done) { + await Promise.all([writer.close(), writer2.close()]) + return + } + await Promise.all([writer.write(value), writer2.write(value)]) + await read() + } + read() + + return [transformStream.readable, transformStream2.readable] +} + export function chainStreams( streams: ReadableStream[] ): ReadableStream { From 21f8058f288dcc03d919729e271c4af0a88aa506 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 16:52:41 -0700 Subject: [PATCH 07/10] Revert "rm import" This reverts commit 4ee17b2fc50c8ee2fa2fbfe6c66c803bf735d926. --- packages/next/src/server/app-render/use-flight-response.tsx | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index d61eeab80df97..f441aa87e25cf 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -1,5 +1,6 @@ import type { ClientReferenceManifest } from '../../build/webpack/plugins/flight-manifest-plugin' import type { FlightResponseRef } from './flight-response-ref' +import { readableStreamTee } from '../stream-utils/node-web-streams-helper' import { encodeText, decodeText } from '../stream-utils/encode-decode' import { htmlEscapeJsonString } from '../htmlescape' From aab31cc46ad18c13196e8687c0d91e3781e68fc2 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 17:13:06 -0700 Subject: [PATCH 08/10] Revert "Replace setImmedaite with Promise.resolve().then(fn" This reverts commit 993c64b5a5ee5c45d475caca0bf5f5bc67133d04. --- .../stream-utils/node-web-streams-helper.ts | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index a4e4d33c49f30..be68dacdfac46 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -6,10 +6,8 @@ import { getTracer } from '../lib/trace/tracer' import { AppRenderSpan } from '../lib/trace/constants' import { decodeText, encodeText } from './encode-decode' -const queueTask: (fn: () => void) => void = - process.env.NEXT_RUNTIME === 'edge' - ? (fn: () => void) => Promise.resolve().then(fn) - : setImmediate +const queueTask = + process.env.NEXT_RUNTIME === 'edge' ? globalThis.setTimeout : setImmediate export type ReactReadableStream = ReadableStream & { allReady?: Promise | undefined @@ -132,12 +130,12 @@ export function createBufferedTransformStream(): TransformStream< const flushBuffer = (controller: TransformStreamDefaultController) => { if (!pendingFlush) { pendingFlush = new Promise((resolve) => { - queueTask(async () => { + setTimeout(async () => { controller.enqueue(bufferedBytes) bufferedBytes = new Uint8Array() pendingFlush = null resolve() - }) + }, 0) }) } return pendingFlush @@ -255,7 +253,7 @@ function createDeferredSuffixStream( // NOTE: streaming flush // Enqueue suffix part before the major chunks are enqueued so that // suffix won't be flushed too early to interrupt the data stream - queueTask(() => { + setTimeout(() => { controller.enqueue(encodeText(suffix)) res() }) @@ -290,10 +288,13 @@ export function createInlineDataStream( // the safe timing to pipe the data stream, this extra tick is // necessary. dataStreamFinished = new Promise((res) => - // We use `queueTask` here to ensure that it's inserted after flushing + // We use `setTimeout` here to ensure that it's inserted after flushing // the shell. Note that this implementation might get stale if impl // details of Fizz change in the future. - queueTask(async () => { + // Also we are not using `setImmediate` here because it's not available + // broadly in all runtimes, for example some edge workers might not + // have it. + setTimeout(async () => { try { while (true) { const { done, value } = await dataStreamReader.read() @@ -306,7 +307,7 @@ export function createInlineDataStream( controller.error(err) } res() - }) + }, 0) ) } }, From 9a6a360397f8b25a325067e5ea55181e4e4d1553 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 17:13:15 -0700 Subject: [PATCH 09/10] Revert "Revert "replace readableStreamTee"" This reverts commit 6b184717d7f803283da78900c8ed231fff48b0af. --- .../server/app-render/use-flight-response.tsx | 2 +- .../stream-utils/node-web-streams-helper.ts | 23 ------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index f441aa87e25cf..21f1613c7efa2 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -25,7 +25,7 @@ export function useFlightResponse( createFromReadableStream, } = require(`react-server-dom-webpack/client.edge`) - const [renderStream, forwardStream] = readableStreamTee(req) + const [renderStream, forwardStream] = req.tee() const res = createFromReadableStream(renderStream, { moduleMap: isEdgeRuntime ? clientReferenceManifest.edgeSSRModuleMapping diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index be68dacdfac46..b8751df936539 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -52,29 +52,6 @@ export function cloneTransformStream(source: TransformStream) { return clone } -export function readableStreamTee( - readable: ReadableStream -): [ReadableStream, ReadableStream] { - const transformStream = new TransformStream() - const transformStream2 = new TransformStream() - const writer = transformStream.writable.getWriter() - const writer2 = transformStream2.writable.getWriter() - - const reader = readable.getReader() - async function read() { - const { done, value } = await reader.read() - if (done) { - await Promise.all([writer.close(), writer2.close()]) - return - } - await Promise.all([writer.write(value), writer2.write(value)]) - await read() - } - read() - - return [transformStream.readable, transformStream2.readable] -} - export function chainStreams( streams: ReadableStream[] ): ReadableStream { From bbec170dc27146e9311b0d14401eefb360dc6b9d Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 15 Aug 2023 17:13:21 -0700 Subject: [PATCH 10/10] Revert "Revert "rm import"" This reverts commit 21f8058f288dcc03d919729e271c4af0a88aa506. --- packages/next/src/server/app-render/use-flight-response.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/next/src/server/app-render/use-flight-response.tsx b/packages/next/src/server/app-render/use-flight-response.tsx index 21f1613c7efa2..09166efa2b65f 100644 --- a/packages/next/src/server/app-render/use-flight-response.tsx +++ b/packages/next/src/server/app-render/use-flight-response.tsx @@ -1,6 +1,5 @@ import type { ClientReferenceManifest } from '../../build/webpack/plugins/flight-manifest-plugin' import type { FlightResponseRef } from './flight-response-ref' -import { readableStreamTee } from '../stream-utils/node-web-streams-helper' import { encodeText, decodeText } from '../stream-utils/encode-decode' import { htmlEscapeJsonString } from '../htmlescape'