diff --git a/lib/cache/memory-cache-store.js b/lib/cache/memory-cache-store.js index ae6d6614d82..bd8f6e33d97 100644 --- a/lib/cache/memory-cache-store.js +++ b/lib/cache/memory-cache-store.js @@ -294,6 +294,7 @@ class MemoryStoreReadableStream extends Readable { * @type {MemoryStoreValue} */ #value + /** * @type {Buffer[]} */ diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index d48675c3760..9bc9a776747 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -7,6 +7,8 @@ const { parseVaryHeader } = require('../util/cache') +function noop () {} + /** * Writes a response to a CacheStore and then passes it on to the next handler */ @@ -46,6 +48,17 @@ class CacheHandler extends DecoratorHandler { this.#handler = handler } + onConnect (abort) { + if (this.#writeStream) { + this.#writeStream.destroy() + this.#writeStream = undefined + } + + if (typeof this.#handler.onConnect === 'function') { + this.#handler.onConnect(abort) + } + } + /** * @see {DispatchHandlers.onHeaders} * @@ -61,49 +74,46 @@ class CacheHandler extends DecoratorHandler { resume, statusMessage ) { - const downstreamOnHeaders = () => this.#handler.onHeaders( - statusCode, - rawHeaders, - resume, - statusMessage - ) + const downstreamOnHeaders = () => { + if (typeof this.#handler.onHeaders === 'function') { + return this.#handler.onHeaders( + statusCode, + rawHeaders, + resume, + statusMessage + ) + } else { + return true + } + } if ( !util.safeHTTPMethods.includes(this.#requestOptions.method) && statusCode >= 200 && statusCode <= 399 ) { - // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-respons - // Try/catch for if it's synchronous + // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response try { - const result = this.#store.deleteByOrigin(this.#requestOptions.origin) - if ( - result && - typeof result.catch === 'function' && - typeof this.#handler.onError === 'function' - ) { - // Fail silently - result.catch(_ => {}) - } - } catch (err) { + this.#store.deleteByOrigin(this.#requestOptions.origin.toString())?.catch?.(noop) + } catch { // Fail silently } - return downstreamOnHeaders() } const headers = util.parseHeaders(rawHeaders) const cacheControlHeader = headers['cache-control'] - const contentLengthHeader = headers['content-length'] - - if (!cacheControlHeader || !contentLengthHeader || this.#store.isFull) { - // Don't have the headers we need, can't cache + if (!cacheControlHeader || typeof cacheControlHeader !== 'string') { + // Don't have cache-control, can't cache. return downstreamOnHeaders() } - const contentLength = Number(contentLengthHeader) + const contentLengthHeader = headers['content-length'] + const contentLength = contentLengthHeader ? Number(contentLengthHeader) : null if (!Number.isInteger(contentLength)) { + // Don't know the final size, don't cache. + // TODO (fix): Why not cache? return downstreamOnHeaders() } @@ -137,18 +147,24 @@ class CacheHandler extends DecoratorHandler { }) if (this.#writeStream) { - this.#writeStream.on('drain', resume) - this.#writeStream.on('error', () => { - this.#writeStream = undefined - resume() - }) + const handler = this + this.#writeStream + .on('drain', resume) + .on('error', function () { + // TODO (fix): Make error somehow observable? + }) + .on('close', function () { + if (handler.#writeStream === this) { + handler.#writeStream = undefined + } + + // TODO (fix): Should we resume even if was paused downstream? + resume() + }) } } - if (typeof this.#handler.onHeaders === 'function') { - return downstreamOnHeaders() - } - return false + return downstreamOnHeaders() } /** @@ -178,10 +194,7 @@ class CacheHandler extends DecoratorHandler { */ onComplete (rawTrailers) { if (this.#writeStream) { - if (rawTrailers) { - this.#writeStream.rawTrailers = rawTrailers - } - + this.#writeStream.rawTrailers = rawTrailers ?? [] this.#writeStream.end() } @@ -332,7 +345,7 @@ function stripNecessaryHeaders (rawHeaders, parsedHeaders, cacheControlDirective for (let i = 0; i < headerNames.length; i++) { const header = headerNames[i] - if (headersToRemove.indexOf(header) !== -1) { + if (headersToRemove.includes(header)) { // We have a at least one header we want to remove if (!strippedHeaders) { // This is the first header we want to remove, let's create the object diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 53c58773022..729cb57d03d 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -1,5 +1,6 @@ 'use strict' +const assert = require('node:assert') const DecoratorHandler = require('../handler/decorator-handler') /** @@ -19,29 +20,35 @@ const DecoratorHandler = require('../handler/decorator-handler') class CacheRevalidationHandler extends DecoratorHandler { #successful = false /** - * @type {(() => void)} + * @type {((boolean) => void) | null} */ - #successCallback + #callback /** * @type {(import('../../types/dispatcher.d.ts').default.DispatchHandlers)} */ #handler + #abort /** - * @param {() => void} successCallback Function to call if the cached value is valid + * @param {(boolean) => void} callback Function to call if the cached value is valid * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler */ - constructor (successCallback, handler) { - if (typeof successCallback !== 'function') { - throw new TypeError('successCallback must be a function') + constructor (callback, handler) { + if (typeof callback !== 'function') { + throw new TypeError('callback must be a function') } super(handler) - this.#successCallback = successCallback + this.#callback = callback this.#handler = handler } + onConnect (abort) { + this.#successful = false + this.#abort = abort + } + /** * @see {DispatchHandlers.onHeaders} * @@ -57,13 +64,21 @@ class CacheRevalidationHandler extends DecoratorHandler { resume, statusMessage ) { + assert(this.#callback != null) + // https://www.rfc-editor.org/rfc/rfc9111.html#name-handling-a-validation-respo - if (statusCode === 304) { - this.#successful = true - this.#successCallback() + this.#successful = statusCode === 304 + this.#callback(this.#successful) + this.#callback = null + + if (this.#successful) { return true } + if (typeof this.#handler.onConnect === 'function') { + this.#handler.onConnect(this.#abort) + } + if (typeof this.#handler.onHeaders === 'function') { return this.#handler.onHeaders( statusCode, @@ -72,7 +87,8 @@ class CacheRevalidationHandler extends DecoratorHandler { statusMessage ) } - return false + + return true } /** @@ -90,7 +106,7 @@ class CacheRevalidationHandler extends DecoratorHandler { return this.#handler.onData(chunk) } - return false + return true } /** @@ -99,7 +115,11 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {string[] | null} rawTrailers */ onComplete (rawTrailers) { - if (!this.#successful && typeof this.#handler.onComplete === 'function') { + if (this.#successful) { + return + } + + if (typeof this.#handler.onComplete === 'function') { this.#handler.onComplete(rawTrailers) } } @@ -110,8 +130,19 @@ class CacheRevalidationHandler extends DecoratorHandler { * @param {Error} err */ onError (err) { + if (this.#successful) { + return + } + + if (this.#callback) { + this.#callback(false) + this.#callback = null + } + if (typeof this.#handler.onError === 'function') { this.#handler.onError(err) + } else { + throw err } } } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 63d56b6880b..945eeeb924a 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,5 +1,6 @@ 'use strict' +const assert = require('node:assert') const util = require('../core/util') const CacheHandler = require('../handler/cache-handler') const MemoryCacheStore = require('../cache/memory-cache-store') @@ -8,6 +9,10 @@ const { assertCacheStore, assertCacheMethods } = require('../util/cache.js') const AGE_HEADER = Buffer.from('age') +/** + * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} CacheStoreValue + */ + /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts] * @returns {import('../../types/dispatcher.d.ts').default.DispatcherComposeInterceptor} @@ -34,46 +39,60 @@ module.exports = (opts = {}) => { return dispatch => { return (opts, handler) => { + // TODO (fix): What if e.g. opts.headers has if-modified-since header? Or other headers + // that make things ambigious? + if (!opts.origin || safeMethodsToNotCache.includes(opts.method)) { // Not a method we want to cache or we don't have the origin, skip return dispatch(opts, handler) } + // TODO (perf): For small entries support returning a Buffer instead of a stream. + // Maybe store should return { staleAt, headers, body, etc... } instead of a stream + stream.value? + // Where body can be a Buffer, string, stream or blob? + const stream = store.createReadStream(opts) if (!stream) { // Request isn't cached return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } - let onErrorCalled = false - /** - * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream + * @param {import('node:stream').Readable} stream * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value */ const respondWithCachedValue = (stream, value) => { - const ac = new AbortController() - const signal = ac.signal - - signal.onabort = (_, err) => { - stream.destroy() - if (!onErrorCalled) { - handler.onError(err) - onErrorCalled = true - } - } - - stream.on('error', (err) => { - if (!onErrorCalled) { - handler.onError(err) - onErrorCalled = true - } - }) + assert(!stream.destroyed, 'stream should not be destroyed') + assert(!stream.readableDidRead, 'stream should not be readableDidRead') try { + stream + .on('error', function (err) { + if (!this.readableEnded) { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + process.nextTick(() => { + throw err + }) + } + } else { + // Ignore error... + } + }) + .on('close', function () { + if (!this.errored && typeof handler.onComplete === 'function') { + handler.onComplete(value.rawTrailers ?? []) + } + }) + if (typeof handler.onConnect === 'function') { - handler.onConnect(ac.abort) - signal.throwIfAborted() + handler.onConnect((err) => { + stream.destroy(err) + }) + if (stream.destroyed) { + return + } } if (typeof handler.onHeaders === 'function') { @@ -81,89 +100,85 @@ module.exports = (opts = {}) => { // https://www.rfc-editor.org/rfc/rfc9111.html#name-age const age = Math.round((Date.now() - value.cachedAt) / 1000) - value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`)) + // TODO (fix): What if rawHeaders already contains age header? + const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] - handler.onHeaders(value.statusCode, value.rawHeaders, stream.resume, value.statusMessage) - signal.throwIfAborted() + if (handler.onHeaders(value.statusCode, rawHeaders, () => stream.resume(), value.statusMessage) === false) { + stream.pause() + } } if (opts.method === 'HEAD') { - if (typeof handler.onComplete === 'function') { - handler.onComplete(null) - stream.destroy() - } + stream.destroy() } else { - if (typeof handler.onData === 'function') { - stream.on('data', chunk => { - if (!handler.onData(chunk)) { - stream.pause() - } - }) - } - - if (typeof handler.onComplete === 'function') { - stream.on('end', () => { - handler.onComplete(value.rawTrailers ?? []) - }) - } + stream.on('data', function (chunk) { + if (typeof handler.onData === 'function' && !handler.onData(chunk)) { + stream.pause() + } + }) } } catch (err) { stream.destroy(err) - if (!onErrorCalled && typeof handler.onError === 'function') { - handler.onError(err) - onErrorCalled = true - } } } /** - * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable | undefined} stream + * @param {import('node:stream').Readable | undefined} stream + * @param {CacheStoreValue | undefined} value */ - const handleStream = (stream) => { - if (!stream) { - // Request isn't cached + const handleStream = (stream, value) => { + if (!stream || !value) { + stream?.on('error', () => {}).destroy() return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } - const { value } = stream - - // Dump body on error - if (util.isStream(opts.body)) { - opts.body?.on('error', () => {}).resume() - } - // Check if the response is stale const now = Date.now() - if (now >= value.staleAt) { - if (now >= value.deleteAt) { - // Safety check in case the store gave us a response that should've been - // deleted already - dispatch(opts, new CacheHandler(globalOpts, opts, handler)) - return - } - - if (!opts.headers) { - opts.headers = {} + if (now < value.staleAt) { + // Dump request body. + if (util.isStream(opts.body)) { + opts.body.on('error', () => {}).destroy() } - - opts.headers['if-modified-since'] = new Date(value.cachedAt).toUTCString() - + respondWithCachedValue(stream, value) + } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { + // If body is is stream we can't revalidate... + // TODO (fix): This could be less strict... + dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + } else { // Need to revalidate the response dispatch( - opts, + { + ...opts, + headers: { + ...opts.headers, + 'if-modified-since': new Date(value.cachedAt).toUTCString() + } + }, new CacheRevalidationHandler( - () => respondWithCachedValue(stream, value), + (success) => { + if (success) { + respondWithCachedValue(stream, value) + } else { + stream.on('error', () => {}).destroy() + } + }, new CacheHandler(globalOpts, opts, handler) ) ) - - return } - - respondWithCachedValue(stream, value) } - Promise.resolve(stream).then(handleStream).catch(handler.onError) + if (util.isStream(stream)) { + handleStream(stream, stream.value) + } else { + Promise.resolve(stream).then(stream => handleStream(stream, stream?.value), err => { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + throw err + } + }) + } return true }