Skip to content

Commit

Permalink
refactor: Make piping consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimvh committed Nov 13, 2020
1 parent 715ba12 commit 95ab0b4
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 25 deletions.
6 changes: 5 additions & 1 deletion src/ldp/http/BasicResponseWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { getLoggerFor } from '../../logging/LogUtil';
import type { HttpResponse } from '../../server/HttpResponse';
import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { pipeSafe } from '../../util/Util';
import type { MetadataWriter } from './metadata/MetadataWriter';
import type { ResponseDescription } from './response/ResponseDescription';
import { ResponseWriter } from './ResponseWriter';
Expand Down Expand Up @@ -33,7 +34,10 @@ export class BasicResponseWriter extends ResponseWriter {
input.response.writeHead(input.result.statusCode);

if (input.result.data) {
input.result.data.pipe(input.response);
const pipe = pipeSafe(input.result.data, input.response);
pipe.on('error', (error): void => {
this.logger.error(`Writing to HttpResponse failed with message ${error.message}`);
});
} else {
// If there is input data the response will end once the input stream ends
input.response.end();
Expand Down
8 changes: 3 additions & 5 deletions src/ldp/http/SparqlUpdateBodyParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { getLoggerFor } from '../../logging/LogUtil';
import { APPLICATION_SPARQL_UPDATE } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
import { pipeStreamsAndErrors, readableToString } from '../../util/Util';
import { pipeSafe, readableToString } from '../../util/Util';
import type { BodyParserArgs } from './BodyParser';
import { BodyParser } from './BodyParser';
import type { SparqlUpdatePatch } from './SparqlUpdatePatch';
Expand All @@ -29,10 +29,8 @@ export class SparqlUpdateBodyParser extends BodyParser {
// Note that readableObjectMode is only defined starting from Node 12
// It is impossible to check if object mode is enabled in Node 10 (without accessing private variables)
const options = { objectMode: request.readableObjectMode };
const toAlgebraStream = new PassThrough(options);
const dataCopy = new PassThrough(options);
pipeStreamsAndErrors(request, toAlgebraStream);
pipeStreamsAndErrors(request, dataCopy);
const toAlgebraStream = pipeSafe(request, new PassThrough(options));
const dataCopy = pipeSafe(request, new PassThrough(options));
let algebra: Algebra.Operation;
try {
const sparql = await readableToString(toAlgebraStream);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/conversion/RdfToQuadConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { RepresentationMetadata } from '../../ldp/representation/RepresentationM
import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { CONTENT_TYPE } from '../../util/UriConstants';
import { pipeStreamsAndErrors } from '../../util/Util';
import { pipeSafe } from '../../util/Util';
import { checkRequest } from './ConversionUtil';
import type { RepresentationConverterArgs } from './RepresentationConverter';
import { TypedRepresentationConverter } from './TypedRepresentationConverter';
Expand Down Expand Up @@ -39,8 +39,8 @@ export class RdfToQuadConverter extends TypedRepresentationConverter {

// Wrap the stream such that errors are transformed
// (Node 10 requires both writableObjectMode and readableObjectMode)
const data = new PassThrough({ writableObjectMode: true, readableObjectMode: true });
pipeStreamsAndErrors(rawQuads, data, (error): Error => new UnsupportedHttpError(error.message));
const pass = new PassThrough({ writableObjectMode: true, readableObjectMode: true });
const data = pipeSafe(rawQuads, pass, (error): Error => new UnsupportedHttpError(error.message));

return {
binary: false,
Expand Down
6 changes: 3 additions & 3 deletions src/util/MetadataController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { RepresentationMetadata } from '../ldp/representation/RepresentationMeta
import { TEXT_TURTLE } from './ContentTypes';
import { LDP, RDF } from './UriConstants';
import { toNamedNode } from './UriUtil';
import { pipeStreamsAndErrors, pushQuad } from './Util';
import { pipeSafe, pushQuad } from './Util';

export class MetadataController {
/**
Expand Down Expand Up @@ -46,7 +46,7 @@ export class MetadataController {
* @returns The Readable object.
*/
public serializeQuads(quads: Quad[]): Readable {
return pipeStreamsAndErrors(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE }));
return pipeSafe(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE }));
}

/**
Expand All @@ -56,6 +56,6 @@ export class MetadataController {
* @returns A promise containing the array of quads.
*/
public async parseQuads(readable: Readable): Promise<Quad[]> {
return await arrayifyStream(pipeStreamsAndErrors(readable, new StreamParser({ format: TEXT_TURTLE })));
return await arrayifyStream(pipeSafe(readable, new StreamParser({ format: TEXT_TURTLE })));
}
}
16 changes: 11 additions & 5 deletions src/util/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,26 @@ export const matchingMediaType = (mediaA: string, mediaB: string): boolean => {
};

/**
* Pipes one stream into another.
* Makes sure an error of the first stream gets passed to the second.
* Pipes one stream into another and emits errors of the first stream with the second.
* In case of an error in the first stream the second one will be destroyed with the given error.
* @param readable - Initial readable stream.
* @param destination - The destination for writing data.
* @param mapError - Optional function that takes the error and converts it to a new error.
*
* @returns The destination stream.
*/
export const pipeStreamsAndErrors = <T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
export const pipeSafe = <T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
mapError?: (error: Error) => Error): T => {
// Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error
readable.pipe(destination);
readable.on('error', (error): boolean => {
readable.on('error', (error): void => {
logger.warn(`Piped stream errored with ${error.message}`);
return destination.emit('error', mapError ? mapError(error) : error);

// From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options :
// "One important caveat is that if the Readable stream emits an error during processing, the Writable destination
// is not closed automatically. If an error occurs, it will be necessary to manually close each stream
// in order to prevent memory leaks."
destination.destroy(mapError ? mapError(error) : error);
});
return destination;
};
Expand Down
23 changes: 23 additions & 0 deletions test/unit/ldp/http/BasicResponseWriter.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EventEmitter } from 'events';
import { PassThrough } from 'stream';
import type { MockResponse } from 'node-mocks-http';
import { createResponse } from 'node-mocks-http';
import streamifyArray from 'streamify-array';
Expand Down Expand Up @@ -66,4 +67,26 @@ describe('A BasicResponseWriter', (): void => {
expect(response._isEndCalled()).toBeTruthy();
expect(response._getStatusCode()).toBe(201);
});

it('can handle the data stream erroring.', async(): Promise<void> => {
const data = new PassThrough();
data.read = (): any => {
data.emit('error', new Error('bad data!'));
return null;
};
result = { statusCode: 201, data };

response = new PassThrough();
response.writeHead = jest.fn();

const end = new Promise((resolve): void => {
response.on('error', (error: Error): void => {
expect(error).toEqual(new Error('bad data!'));
resolve();
});
});

await expect(writer.handle({ response, result })).resolves.toBeUndefined();
await end;
});
});
16 changes: 8 additions & 8 deletions test/unit/util/Util.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
decodeUriPathComponents,
encodeUriPathComponents,
ensureTrailingSlash,
matchingMediaType, pipeStreamsAndErrors, pushQuad,
matchingMediaType, pipeSafe, pushQuad,
readableToString,
toCanonicalUriPath,
} from '../../../src/util/Util';
Expand Down Expand Up @@ -48,19 +48,19 @@ describe('Util function', (): void => {
it('pipes data from one stream to the other.', async(): Promise<void> => {
const input = streamifyArray([ 'data' ]);
const output = new PassThrough();
pipeStreamsAndErrors(input, output);
await expect(readableToString(output)).resolves.toEqual('data');
const piped = pipeSafe(input, output);
await expect(readableToString(piped)).resolves.toEqual('data');
});

it('pipes errors from one stream to the other.', async(): Promise<void> => {
const input = streamifyArray([ 'data' ]);
const input = new PassThrough();
input.read = (): any => {
input.emit('error', new Error('error'));
return null;
};
const output = new PassThrough();
pipeStreamsAndErrors(input, output);
await expect(readableToString(output)).rejects.toThrow(new Error('error'));
const piped = pipeSafe(input, output);
await expect(readableToString(piped)).rejects.toThrow(new Error('error'));
});

it('supports mapping errors to something else.', async(): Promise<void> => {
Expand All @@ -70,8 +70,8 @@ describe('Util function', (): void => {
return null;
};
const output = new PassThrough();
pipeStreamsAndErrors(input, output, (): any => new Error('other error'));
await expect(readableToString(output)).rejects.toThrow(new Error('other error'));
const piped = pipeSafe(input, output, (): any => new Error('other error'));
await expect(readableToString(piped)).rejects.toThrow(new Error('other error'));
});
});

Expand Down

0 comments on commit 95ab0b4

Please sign in to comment.