Skip to content

Commit

Permalink
fix: Prevent HttpRequest from being closed
Browse files Browse the repository at this point in the history
In case a stream the request is being piped into closes,
we don't want to close the request since it shares a socket
with the response.
  • Loading branch information
joachimvh committed Apr 9, 2021
1 parent 218c8f4 commit 9534582
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 10 deletions.
7 changes: 7 additions & 0 deletions src/server/HttpRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ import type { Guarded } from '../util/GuardedStream';
* An incoming HTTP request;
*/
export type HttpRequest = Guarded<IncomingMessage>;

/**
* Checks if the given stream is an HttpRequest.
*/
export function isHttpRequest(stream: any): stream is HttpRequest {
return typeof stream.socket === 'object' && typeof stream.url === 'string' && typeof stream.method === 'string';
}
33 changes: 25 additions & 8 deletions src/util/StreamUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Readable, Transform } from 'stream';
import arrayifyStream from 'arrayify-stream';
import pump from 'pump';
import { getLoggerFor } from '../logging/LogUtil';
import { isHttpRequest } from '../server/HttpRequest';
import type { Guarded } from './GuardedStream';
import { guardStream } from './GuardedStream';

Expand Down Expand Up @@ -30,14 +31,30 @@ export async function readableToString(stream: Readable): Promise<string> {
*/
export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
mapError?: (error: Error) => Error): Guarded<T> {
// In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener
pump(readable, destination, (error): void => {
if (error) {
logger.warn(`Piped stream errored with ${error.message}`);
// Make sure the final error can be handled in a normal streaming fashion
destination.emit('error', mapError ? mapError(error) : error);
}
});
// We never want to closes the incoming HttpRequest if there is an error
// since that also closes the outgoing HttpResponse.
// Since `pump` sends stream errors both up and down the pipe chain,
// in this case we need to make sure the error only goes down the chain.
if (isHttpRequest(readable)) {
readable.pipe(destination);
readable.on('error', (error): void => {
logger.warn(`HttpRequest errored with ${error.message}`);
// From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options :
// One important caveat is that if the Readable stream emits an error during processing,
// the Writable destination is not closed automatically. If an error occurs,
// it will be necessary to manually close each stream in order to prevent memory leaks.
destination.destroy(mapError ? mapError(error) : error);
});
} else {
// In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener
pump(readable, destination, (error): void => {
if (error) {
logger.warn(`Piped stream errored with ${error.message}`);
// Make sure the final error can be handled in a normal streaming fashion
destination.emit('error', mapError ? mapError(error) : error);
}
});
}
// Guarding the stream now means the internal error listeners of pump will be ignored
// when checking if there is a valid error listener.
return guardStream(destination);
Expand Down
12 changes: 12 additions & 0 deletions test/integration/ServerFetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ describe('A Solid server', (): void => {
expect(res.status).toBe(205);
});

it('can handle PUT errors.', async(): Promise<void> => {
// There was a specific case where the following request caused the connection to close instead of error
const res = await fetch(baseUrl, {
method: 'PUT',
headers: {
'content-type': 'text/plain',
},
body: '"test"',
});
expect(res.status).toBe(400);
});

it('can POST to create a container.', async(): Promise<void> => {
const res = await fetch(baseUrl, {
method: 'POST',
Expand Down
10 changes: 10 additions & 0 deletions test/unit/server/HttpRequest.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { isHttpRequest } from '../../../src/server/HttpRequest';

describe('HttpRequest', (): void => {
describe('#isHttpRequest', (): void => {
it('can identify HttpRequests.', async(): Promise<void> => {
expect(isHttpRequest({})).toBe(false);
expect(isHttpRequest({ socket: {}, method: 'GET', url: '/url' })).toBe(true);
});
});
});
54 changes: 52 additions & 2 deletions test/unit/util/StreamUtil.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { PassThrough } from 'stream';
import arrayifyStream from 'arrayify-stream';
import streamifyArray from 'streamify-array';
import { isHttpRequest } from '../../../src/server/HttpRequest';
import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil';

jest.mock('../../../src/server/HttpRequest', (): any => ({
isHttpRequest: jest.fn(),
}));

describe('StreamUtil', (): void => {
describe('#readableToString', (): void => {
it('concatenates all elements of a Readable.', async(): Promise<void> => {
Expand All @@ -12,6 +17,10 @@ describe('StreamUtil', (): void => {
});

describe('#pipeSafely', (): void => {
beforeEach(async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockClear();
});

it('pipes data from one stream to the other.', async(): Promise<void> => {
const input = streamifyArray([ 'data' ]);
const output = new PassThrough();
Expand Down Expand Up @@ -56,6 +65,47 @@ describe('StreamUtil', (): void => {
await new Promise(setImmediate);
expect(input.destroyed).toBe(true);
});

it('does not destroy the source stream if it is an HttpRequest.', async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true);
const input = new PassThrough();
const output = new PassThrough();
const piped = pipeSafely(input, output);

// Catch errors to prevent problems in test output
output.on('error', (): void => {
// Empty
});

piped.destroy(new Error('error!'));
// Allow events to propagate
await new Promise(setImmediate);
expect(input.destroyed).toBe(false);
});

it('still sends errors downstream if the input is an HttpRequest.', async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true);
const input = new PassThrough();
input.read = (): any => {
input.emit('error', new Error('error'));
return null;
};
const output = new PassThrough();
const piped = pipeSafely(input, output);
await expect(readableToString(piped)).rejects.toThrow('error');
});

it('can map errors if the input is an HttpRequest.', async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true);
const input = streamifyArray([ 'data' ]);
input.read = (): any => {
input.emit('error', new Error('error'));
return null;
};
const output = new PassThrough();
const piped = pipeSafely(input, output, (): any => new Error('other error'));
await expect(readableToString(piped)).rejects.toThrow('other error');
});
});

describe('#transformSafely', (): void => {
Expand Down Expand Up @@ -158,8 +208,8 @@ describe('StreamUtil', (): void => {

describe('#guardedStreamFrom', (): void => {
it('converts data to a guarded stream.', async(): Promise<void> => {
const data = [ 'a', 'b' ];
await expect(readableToString(guardedStreamFrom(data))).resolves.toBe('ab');
await expect(readableToString(guardedStreamFrom([ 'a', 'b' ]))).resolves.toBe('ab');
await expect(readableToString(guardedStreamFrom('ab'))).resolves.toBe('ab');
});
});
});

0 comments on commit 9534582

Please sign in to comment.