Skip to content

Commit

Permalink
fix: fix write after destroy error
Browse files Browse the repository at this point in the history
  • Loading branch information
fent committed Nov 13, 2020
1 parent 95fc69e commit 84f9c7d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 50 deletions.
85 changes: 43 additions & 42 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const retryStatusCodes = new Set([429, 503]);

// `request`, `response`, `abort`, `close` left out, miniget will emit these.
const requestEvents = ['connect', 'continue', 'information', 'socket', 'timeout', 'upgrade'];
const responseEvents = ['aborted', 'close'];
const responseEvents = ['aborted'];

namespace Miniget {
export interface Options extends RequestOptions {
Expand Down Expand Up @@ -63,6 +63,7 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
const stream = new PassThrough({ highWaterMark: opts.highWaterMark }) as Miniget.Stream;
stream.destroyed = stream.aborted = false;
let activeRequest: ClientRequest | null;
let activeResponse: IncomingMessage | null;
let activeDecodedStream: Transform | null;
let redirects = 0;
let retries = 0;
Expand Down Expand Up @@ -130,22 +131,6 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
return false;
};

const onRequestError = (err: Miniget.MinigetError, statusCode?: number): void => {
activeRequest.removeListener('close', onRequestError);
if (!retryRequest({ err, statusCode })) {
stream.emit('error', err);
}
};

const noop = () => {};
const onRequestClose = () => {
activeRequest.removeListener('error', onRequestError);
activeRequest.on('error', noop);
if (!retryRequest({})) {
stream.emit('close');
}
};

const forwardEvents = (ee: EventEmitter, events: string[]) => {
for (let event of events) {
ee.on(event, stream.emit.bind(stream, event));
Expand Down Expand Up @@ -190,7 +175,41 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
}
}

const onError = (err: Miniget.MinigetError, statusCode?: number): void => {
cleanup();
if (!retryRequest({ err, statusCode })) {
stream.emit('error', err);
} else {
activeRequest.removeListener('close', onRequestClose);
}
};

const onRequestClose = () => {
cleanup();
if (!retryRequest({})) {
stream.emit('close');
}
};

const cleanup = () => {
activeRequest.removeListener('error', onError);
activeResponse?.removeListener('data', onData);
activeDecodedStream?.removeListener('end', onEnd);
activeDecodedStream?.removeListener('error', onError);
activeResponse?.removeListener('error', onError);
};

const onData = (chunk: Buffer) => { downloaded += chunk.length; };
const onEnd = () => {
cleanup();
if (!reconnectIfEndedEarly()) {
stream.end();
}
};

activeRequest = httpLib.get(parsed, (res: IncomingMessage) => {
// Needed for node v10, v12.
// istanbul ignore next
if (stream.destroyed) { return; }
if (redirectStatusCodes.has(res.statusCode)) {
if (redirects++ >= opts.maxRedirects) {
Expand All @@ -212,38 +231,20 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
} else if (res.statusCode < 200 || 400 <= res.statusCode) {
let err = new Miniget.MinigetError('Status code: ' + res.statusCode);
if (res.statusCode >= 500) {
onRequestError(err, res.statusCode);
onError(err, res.statusCode);
} else {
stream.emit('error', err);
}
return;
}

let decodedStream = res as unknown as Transform;
const cleanup = () => {
res.removeListener('data', ondata);
decodedStream.removeListener('end', onend);
decodedStream.removeListener('error', onerror);
res.removeListener('error', onerror);
};
const ondata = (chunk: Buffer) => { downloaded += chunk.length; };
const onend = () => {
cleanup();
if (!reconnectIfEndedEarly()) {
stream.end();
}
};
const onerror = (err: Miniget.MinigetError) => {
cleanup();
onRequestError(err);
};

if (opts.acceptEncoding && res.headers['content-encoding']) {
for (let enc of res.headers['content-encoding'].split(', ').reverse()) {
let fn = opts.acceptEncoding[enc];
if (fn != null) {
decodedStream = decodedStream.pipe(fn());
decodedStream.on('error', onerror);
decodedStream.on('error', onError);
}
}
}
Expand All @@ -252,16 +253,16 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
acceptRanges = res.headers['accept-ranges'] === 'bytes' &&
contentLength > 0 && opts.maxReconnects > 0;
}
res.on('data', ondata);
decodedStream.on('end', onend);
res.on('data', onData);
decodedStream.on('end', onEnd);
decodedStream.pipe(stream, { end: !acceptRanges });
activeResponse = res;
activeDecodedStream = decodedStream;
stream.emit('response', res);
res.on('error', onerror);
res.on('error', onError);
forwardEvents(res, responseEvents);
});
activeRequest.removeListener('error', noop);
activeRequest.on('error', onRequestError);
activeRequest.on('error', onError);
activeRequest.on('close', onRequestClose);
forwardEvents(activeRequest, requestEvents);
if (stream.destroyed) {
Expand Down
44 changes: 36 additions & 8 deletions test/request-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ describe('Make a request', () => {
.get('/path')
.reply(500, 'oh no 3');
const stream = miniget('https://mysite.com/path');
stream.on('request', () => { clock.tick(1); });
stream.on('retry', (retryCount) => {
clock.tick(retryCount * 100);
process.nextTick(() => {
clock.tick(retryCount * 100);
});
});
stream.on('error', (err) => {
scope.done();
Expand Down Expand Up @@ -608,21 +609,45 @@ describe('Make a request', () => {

describe('that gets destroyed', () => {
describe('immediately', () => {
it('Does not end stream', (done) => {
it('Does not end stream', () => {
nock('http://anime.me')
.get('/')
.reply(200, 'ooooaaaaaaaeeeee');
const stream = miniget('http://anime.me');
stream.on('end', () => {
throw Error('`end` event should not be called');
});
stream.on('abort', done);
stream.on('error', done);
// Use `abort()` until nock fixes emitting events with `destroy()`.
stream.abort();
stream.on('error', () => {
// Ignore error on node v10, 12.
});
stream.destroy();
});
});
describe('after getting `request`', () => {
it('Does not start download, no `response` event', done => {
nock('https://friend.com')
.get('/yes')
.reply(200, '<html>my reply :)</html>');
const stream = miniget('https://friend.com/yes');
stream.on('end', () => {
throw Error('`end` event should not emit');
});
stream.on('response', () => {
throw Error('`response` event should not emit');
});
stream.on('data', () => {
throw Error('Should not read any data');
});
stream.on('error', () => {
// Ignore error on node v10, 12.
});
stream.on('request', () => {
stream.destroy();
done();
});
});
});
describe('after getting response but before end', () => {
describe('after getting `response` but before end', () => {
it('Response does not give any data', (done) => {
const scope = nock('http://www.google1.com')
.get('/one')
Expand Down Expand Up @@ -658,6 +683,9 @@ describe('Make a request', () => {
stream.on('end', () => {
throw Error('`end` event should not be called');
});
stream.on('error', () => {
// Ignore error on node v10, 12.
});
stream.on('abort', done);
stream.abort();
});
Expand Down

0 comments on commit 84f9c7d

Please sign in to comment.