Skip to content

Commit

Permalink
Wrap incremental cache in an IPC server (#53030)
Browse files Browse the repository at this point in the history
This uses an IPC server (if available) for incremental cache methods to help prevent race conditions when reading/writing from cache and also to dedupe requests in cases where multiple cache reads are in flight. This cuts down on data fetching across the different build-time workers.

Co-authored-by: JJ Kasper <22380829+ijjk@users.noreply.github.com>
  • Loading branch information
ztanner and ijjk authored Jul 26, 2023
1 parent 0d4ec10 commit 22ca859
Show file tree
Hide file tree
Showing 16 changed files with 455 additions and 80 deletions.
58 changes: 53 additions & 5 deletions packages/next/src/build/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ import {
defaultOverrides,
experimentalOverrides,
} from '../server/require-hook'
import { initialize } from '../server/lib/incremental-cache-server'
import { nodeFs } from '../server/lib/node-fs-methods'

export type SsgRoute = {
initialRevalidateSeconds: number | false
Expand Down Expand Up @@ -1179,7 +1181,11 @@ export default async function build(
)
: config.experimental.cpus || 4

function createStaticWorker(type: 'app' | 'pages') {
function createStaticWorker(
type: 'app' | 'pages',
ipcPort: number,
ipcValidationKey: string
) {
let infoPrinted = false

return new Worker(staticWorkerPath, {
Expand Down Expand Up @@ -1217,6 +1223,8 @@ export default async function build(
forkOptions: {
env: {
...process.env,
__NEXT_INCREMENTAL_CACHE_IPC_PORT: ipcPort + '',
__NEXT_INCREMENTAL_CACHE_IPC_KEY: ipcValidationKey,
__NEXT_PRIVATE_PREBUNDLED_REACT:
type === 'app'
? config.experimental.serverActions
Expand Down Expand Up @@ -1248,9 +1256,45 @@ export default async function build(
>
}

const pagesStaticWorkers = createStaticWorker('pages')
let CacheHandler: any

if (incrementalCacheHandlerPath) {
CacheHandler = require(path.isAbsolute(incrementalCacheHandlerPath)
? incrementalCacheHandlerPath
: path.join(dir, incrementalCacheHandlerPath))
}

const { ipcPort, ipcValidationKey } = await initialize({
fs: nodeFs,
dev: false,
appDir: isAppDirEnabled,
fetchCache: isAppDirEnabled,
flushToDisk: config.experimental.isrFlushToDisk,
serverDistDir: path.join(distDir, 'server'),
fetchCacheKeyPrefix: config.experimental.fetchCacheKeyPrefix,
maxMemoryCacheSize: config.experimental.isrMemoryCacheSize,
getPrerenderManifest: () => ({
version: -1 as any, // letting us know this doesn't conform to spec
routes: {},
dynamicRoutes: {},
notFoundRoutes: [],
preview: null as any, // `preview` is special case read in next-dev-server
}),
requestHeaders: {},
CurCacheHandler: CacheHandler,
minimalMode: ciEnvironment.hasNextSupport,

allowedRevalidateHeaderKeys:
config.experimental.allowedRevalidateHeaderKeys,
})

const pagesStaticWorkers = createStaticWorker(
'pages',
ipcPort,
ipcValidationKey
)
const appStaticWorkers = isAppDirEnabled
? createStaticWorker('app')
? createStaticWorker('app', ipcPort, ipcValidationKey)
: undefined

const analysisBegin = process.hrtime()
Expand Down Expand Up @@ -3181,8 +3225,12 @@ export default async function build(
const exportApp: typeof import('../export').default =
require('../export').default

const pagesWorker = createStaticWorker('pages')
const appWorker = createStaticWorker('app')
const pagesWorker = createStaticWorker(
'pages',
ipcPort,
ipcValidationKey
)
const appWorker = createStaticWorker('app', ipcPort, ipcValidationKey)

const options: ExportOptions = {
isInvokedFromCli: false,
Expand Down
73 changes: 33 additions & 40 deletions packages/next/src/server/dev/next-dev-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ import { NextBuildContext } from '../../build/build-context'
import { IncrementalCache } from '../lib/incremental-cache'
import LRUCache from 'next/dist/compiled/lru-cache'
import { NextUrlWithParsedQuery } from '../request-meta'
import { deserializeErr, errorToJSON } from '../render'
import { invokeRequest } from '../lib/server-ipc/invoke-request'
import { errorToJSON } from '../render'
import { getMiddlewareRouteMatcher } from '../../shared/lib/router/utils/middleware-route-matcher'
import {
deserializeErr,
invokeIpcMethod,
} from '../lib/server-ipc/request-utils'

// Load ReactDevOverlay only when needed
let ReactDevOverlayImpl: FunctionComponent
Expand Down Expand Up @@ -478,46 +481,18 @@ export default class DevServer extends Server {
}
}

private async invokeIpcMethod(method: string, args: any[]): Promise<any> {
const ipcPort = process.env.__NEXT_PRIVATE_ROUTER_IPC_PORT
const ipcKey = process.env.__NEXT_PRIVATE_ROUTER_IPC_KEY
if (ipcPort) {
const res = await invokeRequest(
`http://${this.hostname}:${ipcPort}?key=${ipcKey}&method=${
method as string
}&args=${encodeURIComponent(JSON.stringify(args))}`,
{
method: 'GET',
headers: {},
}
)
const body = await res.text()

if (body.startsWith('{') && body.endsWith('}')) {
const parsedBody = JSON.parse(body)

if (
parsedBody &&
typeof parsedBody === 'object' &&
'err' in parsedBody &&
'stack' in parsedBody.err
) {
throw deserializeErr(parsedBody.err)
}
return parsedBody
}
}
}

protected async logErrorWithOriginalStack(
err?: unknown,
type?: 'unhandledRejection' | 'uncaughtException' | 'warning' | 'app-dir'
) {
if (this.isRenderWorker) {
await this.invokeIpcMethod('logErrorWithOriginalStack', [
errorToJSON(err as Error),
type,
])
await invokeIpcMethod({
hostname: this.hostname,
method: 'logErrorWithOriginalStack',
args: [errorToJSON(err as Error), type],
ipcPort: process.env.__NEXT_PRIVATE_ROUTER_IPC_PORT,
ipcKey: process.env.__NEXT_PRIVATE_ROUTER_IPC_KEY,
})
return
}
throw new Error(
Expand Down Expand Up @@ -767,7 +742,13 @@ export default class DevServer extends Server {
match?: RouteMatch
}) {
if (this.isRenderWorker) {
await this.invokeIpcMethod('ensurePage', [opts])
await invokeIpcMethod({
hostname: this.hostname,
method: 'ensurePage',
args: [opts],
ipcPort: process.env.__NEXT_PRIVATE_ROUTER_IPC_PORT,
ipcKey: process.env.__NEXT_PRIVATE_ROUTER_IPC_KEY,
})
return
}
throw new Error('Invariant ensurePage called outside render worker')
Expand Down Expand Up @@ -826,7 +807,13 @@ export default class DevServer extends Server {

protected async getFallbackErrorComponents(): Promise<LoadComponentsReturnType | null> {
if (this.isRenderWorker) {
await this.invokeIpcMethod('getFallbackErrorComponents', [])
await invokeIpcMethod({
hostname: this.hostname,
method: 'getFallbackErrorComponents',
args: [],
ipcPort: process.env.__NEXT_PRIVATE_ROUTER_IPC_PORT,
ipcKey: process.env.__NEXT_PRIVATE_ROUTER_IPC_KEY,
})
return await loadDefaultErrorComponents(this.distDir)
}
throw new Error(
Expand All @@ -836,7 +823,13 @@ export default class DevServer extends Server {

async getCompilationError(page: string): Promise<any> {
if (this.isRenderWorker) {
const err = await this.invokeIpcMethod('getCompilationError', [page])
const err = await invokeIpcMethod({
hostname: this.hostname,
method: 'getCompilationError',
args: [page],
ipcPort: process.env.__NEXT_PRIVATE_ROUTER_IPC_PORT,
ipcKey: process.env.__NEXT_PRIVATE_ROUTER_IPC_KEY,
})
return deserializeErr(err)
}
throw new Error(
Expand Down
44 changes: 44 additions & 0 deletions packages/next/src/server/lib/incremental-cache-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { createIpcServer } from './server-ipc'
import { IncrementalCache } from './incremental-cache'

let initializeResult:
| undefined
| {
ipcPort: number
ipcValidationKey: string
}

export async function initialize(
...constructorArgs: ConstructorParameters<typeof IncrementalCache>
): Promise<NonNullable<typeof initializeResult>> {
const incrementalCache = new IncrementalCache(...constructorArgs)

const { ipcPort, ipcValidationKey } = await createIpcServer({
async revalidateTag(
...args: Parameters<IncrementalCache['revalidateTag']>
) {
return incrementalCache.revalidateTag(...args)
},

async get(...args: Parameters<IncrementalCache['get']>) {
return incrementalCache.get(...args)
},

async set(...args: Parameters<IncrementalCache['set']>) {
return incrementalCache.set(...args)
},

async lock(...args: Parameters<IncrementalCache['lock']>) {
return incrementalCache.lock(...args)
},

async unlock(...args: Parameters<IncrementalCache['unlock']>) {
return incrementalCache.unlock(...args)
},
} as any)

return {
ipcPort,
ipcValidationKey,
}
}
104 changes: 104 additions & 0 deletions packages/next/src/server/lib/incremental-cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export class IncrementalCache {
fetchCacheKeyPrefix?: string
revalidatedTags?: string[]
isOnDemandRevalidate?: boolean
private locks = new Map<string, Promise<void>>()
private unlocks = new Map<string, () => Promise<void>>()

constructor({
fs,
Expand Down Expand Up @@ -200,7 +202,76 @@ export class IncrementalCache {
return fetchCache ? pathname : normalizePagePath(pathname)
}

async unlock(cacheKey: string) {
const unlock = this.unlocks.get(cacheKey)
if (unlock) {
unlock()
this.locks.delete(cacheKey)
this.unlocks.delete(cacheKey)
}
}

async lock(cacheKey: string) {
if (
process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT &&
process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY &&
process.env.NEXT_RUNTIME !== 'edge'
) {
const invokeIpcMethod = require('../server-ipc/request-utils')
.invokeIpcMethod as typeof import('../server-ipc/request-utils').invokeIpcMethod

await invokeIpcMethod({
method: 'lock',
ipcPort: process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT,
ipcKey: process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY,
args: [cacheKey],
})

return async () => {
await invokeIpcMethod({
method: 'unlock',
ipcPort: process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT,
ipcKey: process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY,
args: [cacheKey],
})
}
}

let unlockNext: () => Promise<void> = () => Promise.resolve()
const existingLock = this.locks.get(cacheKey)

if (existingLock) {
await existingLock
} else {
const newLock = new Promise<void>((resolve) => {
unlockNext = async () => {
resolve()
}
})

this.locks.set(cacheKey, newLock)
this.unlocks.set(cacheKey, unlockNext)
}

return unlockNext
}

async revalidateTag(tag: string) {
if (
process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT &&
process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY &&
process.env.NEXT_RUNTIME !== 'edge'
) {
const invokeIpcMethod = require('../server-ipc/request-utils')
.invokeIpcMethod as typeof import('../server-ipc/request-utils').invokeIpcMethod
return invokeIpcMethod({
method: 'revalidateTag',
ipcPort: process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT,
ipcKey: process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY,
args: [...arguments],
})
}

return this.cacheHandler?.revalidateTag?.(tag)
}

Expand Down Expand Up @@ -328,6 +399,22 @@ export class IncrementalCache {
fetchUrl?: string,
fetchIdx?: number
): Promise<IncrementalCacheEntry | null> {
if (
process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT &&
process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY &&
process.env.NEXT_RUNTIME !== 'edge'
) {
const invokeIpcMethod = require('../server-ipc/request-utils')
.invokeIpcMethod as typeof import('../server-ipc/request-utils').invokeIpcMethod

return invokeIpcMethod({
method: 'get',
ipcPort: process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT,
ipcKey: process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY,
args: [...arguments],
})
}

// we don't leverage the prerender cache in dev mode
// so that getStaticProps is always called for easier debugging
if (
Expand All @@ -339,6 +426,7 @@ export class IncrementalCache {

pathname = this._getPathname(pathname, fetchCache)
let entry: IncrementalCacheEntry | null = null

const cacheData = await this.cacheHandler?.get(
pathname,
fetchCache,
Expand Down Expand Up @@ -432,6 +520,22 @@ export class IncrementalCache {
fetchUrl?: string,
fetchIdx?: number
) {
if (
process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT &&
process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY &&
process.env.NEXT_RUNTIME !== 'edge'
) {
const invokeIpcMethod = require('../server-ipc/request-utils')
.invokeIpcMethod as typeof import('../server-ipc/request-utils').invokeIpcMethod

return invokeIpcMethod({
method: 'set',
ipcPort: process.env.__NEXT_INCREMENTAL_CACHE_IPC_PORT,
ipcKey: process.env.__NEXT_INCREMENTAL_CACHE_IPC_KEY,
args: [...arguments],
})
}

if (this.dev && !fetchCache) return
// fetchCache has upper limit of 2MB per-entry currently
if (fetchCache && JSON.stringify(data).length > 2 * 1024 * 1024) {
Expand Down
Loading

1 comment on commit 22ca859

@dougwithseismic
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS. Breaking cache #56355 (comment)

Please sign in to comment.