Skip to content

Commit

Permalink
fix(MockHttpSocket): handle response stream errors (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
kettanaito authored Apr 17, 2024
1 parent ab8656e commit 085d1ec
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 111 deletions.
52 changes: 31 additions & 21 deletions src/interceptors/ClientRequest/MockHttpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import { isPropertyAccessible } from '../../utils/isPropertyAccessible'
import { baseUrlFromConnectionOptions } from '../Socket/utils/baseUrlFromConnectionOptions'
import { parseRawHeaders } from '../Socket/utils/parseRawHeaders'
import { getRawFetchHeaders } from '../../utils/getRawFetchHeaders'
import { RESPONSE_STATUS_CODES_WITHOUT_BODY } from '../../utils/responseUtils'
import {
createServerErrorResponse,
RESPONSE_STATUS_CODES_WITHOUT_BODY,
} from '../../utils/responseUtils'
import { createRequestId } from '../../createRequestId'

type HttpConnectionOptions = any
Expand Down Expand Up @@ -248,34 +251,41 @@ export class MockHttpSocket extends MockSocket {
}

if (response.body) {
const reader = response.body.getReader()

while (true) {
const { done, value } = await reader.read()

if (done) {
break
}

// Flush the headers upon the first chunk in the stream.
// This ensures the consumer will start receiving the response
// as it streams in (subsequent chunks are pushed).
if (httpHeaders.length > 0) {
flushHeaders(value)
continue
try {
const reader = response.body.getReader()

while (true) {
const { done, value } = await reader.read()

if (done) {
break
}

// Flush the headers upon the first chunk in the stream.
// This ensures the consumer will start receiving the response
// as it streams in (subsequent chunks are pushed).
if (httpHeaders.length > 0) {
flushHeaders(value)
continue
}

// Subsequent body chukns are push to the stream.
this.push(value)
}
} catch (error) {
// Coerce response stream errors to 500 responses.
// Don't flush the original response headers because
// unhandled errors translate to 500 error responses forcefully.
this.respondWith(createServerErrorResponse(error))

// Subsequent body chukns are push to the stream.
this.push(value)
return
}
}

// If the headers were not flushed up to this point,
// this means the response either had no body or had
// an empty body stream. Flush the headers.
if (httpHeaders.length > 0) {
flushHeaders()
}
flushHeaders()

// Close the socket if the connection wasn't marked as keep-alive.
if (!this.shouldKeepAlive) {
Expand Down
23 changes: 22 additions & 1 deletion src/interceptors/ClientRequest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { MockAgent, MockHttpsAgent } from './agents'
import { emitAsync } from '../../utils/emitAsync'
import { toInteractiveRequest } from '../../utils/toInteractiveRequest'
import { normalizeClientRequestArgs } from './utils/normalizeClientRequestArgs'
import { isNodeLikeError } from '../../utils/isNodeLikeError'
import { createServerErrorResponse } from '../../utils/responseUtils'

export class ClientRequestInterceptor extends Interceptor<HttpRequestEventMap> {
static symbol = Symbol('client-request-interceptor')
Expand Down Expand Up @@ -144,13 +146,32 @@ export class ClientRequestInterceptor extends Interceptor<HttpRequestEventMap> {
})

if (listenerResult.error) {
socket.errorWith(listenerResult.error)
// Treat thrown Responses as mocked responses.
if (listenerResult.error instanceof Response) {
socket.respondWith(listenerResult.error)
return
}

// Allow mocking Node-like errors.
if (isNodeLikeError(listenerResult.error)) {
socket.errorWith(listenerResult.error)
return
}

// Unhandled exceptions in the request listeners are
// synonymous to unhandled exceptions on the server.
// Those are represented as 500 error responses.
socket.respondWith(createServerErrorResponse(listenerResult.error))
return
}

const mockedResponse = listenerResult.data

if (mockedResponse) {
/**
* @note The `.respondWith()` method will handle "Response.error()".
* Maybe we should make all interceptors do that?
*/
socket.respondWith(mockedResponse)
return
}
Expand Down
1 change: 0 additions & 1 deletion test/modules/http/compliance/http-req-write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ it('calls all write callbacks before the mocked response', async () => {
method: 'POST',
})
request.write('one', () => {
console.log('write callback!')
request.end()
})

Expand Down
145 changes: 145 additions & 0 deletions test/modules/http/response/http-response-readable-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* @vitest-environment node
*/
import { vi, it, expect, beforeAll, afterEach, afterAll } from 'vitest'
import { performance } from 'node:perf_hooks'
import http from 'node:http'
import https from 'node:https'
import { DeferredPromise } from '@open-draft/deferred-promise'
import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest'
import { sleep, waitForClientRequest } from '../../../helpers'

type ResponseChunks = Array<{ buffer: Buffer; timestamp: number }>

const encoder = new TextEncoder()

const interceptor = new ClientRequestInterceptor()

beforeAll(async () => {
interceptor.apply()
})

afterEach(() => {
interceptor.removeAllListeners()
})

afterAll(async () => {
interceptor.dispose()
})

it('supports ReadableStream as a mocked response', async () => {
const encoder = new TextEncoder()
interceptor.once('request', ({ request }) => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode('hello'))
controller.enqueue(encoder.encode(' '))
controller.enqueue(encoder.encode('world'))
controller.close()
},
})
request.respondWith(new Response(stream))
})

const request = http.get('http://example.com/resource')
const { text } = await waitForClientRequest(request)
expect(await text()).toBe('hello world')
})

it('supports delays when enqueuing chunks', async () => {
interceptor.once('request', ({ request }) => {
const stream = new ReadableStream({
async start(controller) {
controller.enqueue(encoder.encode('first'))
await sleep(200)

controller.enqueue(encoder.encode('second'))
await sleep(200)

controller.enqueue(encoder.encode('third'))
await sleep(200)

controller.close()
},
})

request.respondWith(
new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
},
})
)
})

const responseChunksPromise = new DeferredPromise<ResponseChunks>()

const request = https.get('https://api.example.com/stream', (response) => {
const chunks: ResponseChunks = []

response
.on('data', (data) => {
chunks.push({
buffer: Buffer.from(data),
timestamp: performance.now(),
})
})
.on('end', () => {
responseChunksPromise.resolve(chunks)
})
.on('error', responseChunksPromise.reject)
})

request.on('error', responseChunksPromise.reject)

const responseChunks = await responseChunksPromise
const textChunks = responseChunks.map((chunk) => {
return chunk.buffer.toString('utf8')
})
expect(textChunks).toEqual(['first', 'second', 'third'])

// Ensure that the chunks were sent over time,
// respecting the delay set in the mocked stream.
const chunkTimings = responseChunks.map((chunk) => chunk.timestamp)
expect(chunkTimings[1] - chunkTimings[0]).toBeGreaterThanOrEqual(150)
expect(chunkTimings[2] - chunkTimings[1]).toBeGreaterThanOrEqual(150)
})

it('forwards ReadableStream errors to the request', async () => {
const requestErrorListener = vi.fn()
const responseErrorListener = vi.fn()

interceptor.once('request', ({ request }) => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('original'))
queueMicrotask(() => {
controller.error(new Error('stream error'))
})
},
})
request.respondWith(new Response(stream))
})

const request = http.get('http://localhost/resource')
request.on('error', requestErrorListener)
request.on('response', (response) => {
response.on('error', responseErrorListener)
})

const response = await vi.waitFor(() => {
return new Promise<http.IncomingMessage>((resolve) => {
request.on('response', resolve)
})
})

// Response stream errors are translated to unhandled exceptions,
// and then the server decides how to handle them. This is often
// done as returning a 500 response.
expect(response.statusCode).toBe(500)
expect(response.statusMessage).toBe('Unhandled Exception')

// Response stream errors are not request errors.
expect(requestErrorListener).not.toHaveBeenCalled()
expect(request.destroyed).toBe(false)
})
78 changes: 0 additions & 78 deletions test/modules/http/response/readable-stream.test.ts

This file was deleted.

Loading

0 comments on commit 085d1ec

Please sign in to comment.