Skip to content

Commit

Permalink
fix: cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 6, 2024
1 parent 20b8026 commit 8757139
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 43 deletions.
84 changes: 42 additions & 42 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const assert = require('node:assert')
const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
Expand Down Expand Up @@ -39,41 +40,37 @@ module.exports = (opts = {}) => {
return dispatch(opts, handler)
}

// TODO (perf): For small entries support returning a Buffer instead of a stream.
const stream = store.createReadStream(opts)
if (!stream) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, opts, handler))
}

let onErrorCalled = false

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream
* @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value
*/
const respondWithCachedValue = (stream, value) => {
const ac = new AbortController()
const signal = ac.signal
let completed = false

signal.onabort = (_, err) => {
stream.destroy()
if (!onErrorCalled) {
handler.onError(err)
onErrorCalled = true
}
}
assert(!stream.destroyed, 'stream should not be destroyed')
assert(!stream.readableDidRead, 'stream should not be readableDidRead')

stream.on('error', (err) => {
if (!onErrorCalled) {
if (!completed && typeof handler.onError === 'function') {
handler.onError(err)
onErrorCalled = true
}
})

try {
if (typeof handler.onConnect === 'function') {
handler.onConnect(ac.abort)
signal.throwIfAborted()
handler.onConnect((err) => {
stream.destroy(err)
})
if (stream.destroyed) {
return
}
}

if (typeof handler.onHeaders === 'function') {
Expand All @@ -83,36 +80,33 @@ module.exports = (opts = {}) => {

value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`))

handler.onHeaders(value.statusCode, value.rawHeaders, stream.resume, value.statusMessage)
signal.throwIfAborted()
handler.onHeaders(value.statusCode, value.rawHeaders, () => stream.resume(), value.statusMessage)
if (stream.destroyed) {
return
}
}

if (opts.method === 'HEAD') {
if (typeof handler.onComplete === 'function') {
completed = true
handler.onComplete(null)
stream.destroy()
}
stream.destroy()
} else {
if (typeof handler.onData === 'function') {
stream.on('data', chunk => {
if (!handler.onData(chunk)) {
stream.pause()
}
})
}

if (typeof handler.onComplete === 'function') {
stream.on('end', () => {
stream.on('data', chunk => {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}
})

stream.on('end', () => {
if (typeof handler.onComplete === 'function') {
handler.onComplete(value.rawTrailers ?? [])
})
}
}
})
}
} catch (err) {
stream.destroy(err)
if (!onErrorCalled && typeof handler.onError === 'function') {
handler.onError(err)
onErrorCalled = true
}
}
}

Expand All @@ -125,21 +119,25 @@ module.exports = (opts = {}) => {
return dispatch(opts, new CacheHandler(globalOpts, opts, handler))
}

// TODO (fix): It's weird that "value" lives on stream.
const { value } = stream

// Dump body on error
// Dump body if cached...
// TODO (fix): This is a bit suspect.
if (util.isStream(opts.body)) {
opts.body?.on('error', () => {}).resume()
}

// Check if the response is stale
const now = Date.now()
if (now >= value.staleAt) {
// TODO (fix): This whole bit is a bit suspect. In particular given that
// we dumped the body above.

if (now >= value.deleteAt) {
// Safety check in case the store gave us a response that should've been
// deleted already
dispatch(opts, new CacheHandler(globalOpts, opts, handler))
return
// deleted already
return dispatch(opts, new CacheHandler(globalOpts, opts, handler))
}

if (!opts.headers) {
Expand All @@ -149,21 +147,23 @@ module.exports = (opts = {}) => {
opts.headers['if-modified-since'] = new Date(value.cachedAt).toUTCString()

// Need to revalidate the response
dispatch(
return dispatch(
opts,
new CacheRevalidationHandler(
() => respondWithCachedValue(stream, value),
new CacheHandler(globalOpts, opts, handler)
)
)

return
}

respondWithCachedValue(stream, value)
}

Promise.resolve(stream).then(handleStream).catch(handler.onError)
Promise.resolve(stream).then(handleStream, err => {
if (typeof handler.onError === 'function') {
handler.onError(err)
}
})

return true
}
Expand Down
2 changes: 1 addition & 1 deletion types/cache-interceptor.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ declare namespace CacheHandler {
export interface CacheStoreValue {
statusCode: number;
statusMessage: string;
rawHeaders: (Buffer | Buffer[])[];
rawHeaders: Buffer[];
rawTrailers?: string[];
/**
* Headers defined by the Vary header and their respective values for
Expand Down

0 comments on commit 8757139

Please sign in to comment.