Skip to content

Commit

Permalink
feat: add destroy() and destroyed, deprecate abort()
Browse files Browse the repository at this point in the history
BREAKING CHANGES:
`abort()` is deprecated with a warning, in favor of `destroy()`, to more
closely align with node's http requests
  • Loading branch information
fent committed Nov 13, 2020
1 parent c2d7614 commit 95fc69e
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 44 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ Defaults are held in `miniget.defaultOptions` and can be adjusted globally.

Miniget returns a readable stream, errors will then be emitted on the stream. Returned stream has additional methods added, and can emit the following events.

### Stream#abort()
### Stream#destroy([error])

Aborts the request.
Destroys the request.

### Stream#destroyed

Set to `true` after `Stream#destroy()` has been called.

### Stream#text()

Expand Down
23 changes: 10 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"@types/sinon": "^9.0.8",
"longjohn": "^0.2.12",
"mocha": "^7.0.1",
"nock": "^12.0.0",
"nock": "^13.0.4",
"nyc": "^15.0.0",
"sinon": "^9.2.0",
"stream-equal": "^1.1.1",
Expand Down
59 changes: 46 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const httpLibs: {
const redirectStatusCodes = new Set([301, 302, 303, 307, 308]);
const retryStatusCodes = new Set([429, 503]);

// `request`, `response`, `abort` left out, miniget will emit these.
// `request`, `response`, `abort`, `close` left out, miniget will emit these.
const requestEvents = ['connect', 'continue', 'information', 'socket', 'timeout', 'upgrade'];
const responseEvents = ['aborted', 'close'];

Expand All @@ -33,7 +33,10 @@ namespace Miniget {
export type MinigetError = Error;

export interface Stream extends PassThrough {
abort: () => void;
abort: (err?: Error) => void;
aborted: boolean;
destroy: (err?: Error) => void;
destroyed: boolean;
text: () => Promise<string>;
on(event: 'reconnect', listener: (attempt: number, err?: Miniget.MinigetError) => void): this;
on(event: 'retry', listener: (attempt: number, err?: Miniget.MinigetError) => void): this;
Expand All @@ -58,9 +61,9 @@ Miniget.defaultOptions = {
function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
const opts: Miniget.Options = Object.assign({}, Miniget.defaultOptions, options);
const stream = new PassThrough({ highWaterMark: opts.highWaterMark }) as Miniget.Stream;
stream.destroyed = stream.aborted = false;
let activeRequest: ClientRequest | null;
let activeDecodedStream: Transform | null;
let aborted = false;
let redirects = 0;
let retries = 0;
let retryTimeout: NodeJS.Timer;
Expand Down Expand Up @@ -112,7 +115,7 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
retryAfter?: number;
}
const retryRequest = (retryOptions: RetryOptions): boolean => {
if (aborted) { return false; }
if (stream.destroyed) { return false; }
if (downloadHasStarted()) {
return reconnectIfEndedEarly(retryOptions.err);
} else if (
Expand All @@ -128,19 +131,28 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
};

const onRequestError = (err: Miniget.MinigetError, statusCode?: number): void => {
activeRequest.removeListener('close', onRequestError);
if (!retryRequest({ err, statusCode })) {
stream.emit('error', err);
}
};

const noop = () => {};
const onRequestClose = () => {
activeRequest.removeListener('error', onRequestError);
activeRequest.on('error', noop);
if (!retryRequest({})) {
stream.emit('close');
}
};

const forwardEvents = (ee: EventEmitter, events: string[]) => {
for (let event of events) {
ee.on(event, stream.emit.bind(stream, event));
}
};

const doDownload = (): void => {
if (aborted) { return; }
const doDownload = () => {
let parsed: RequestOptions, httpLib;
try {
parsed = urlParse(url);
Expand Down Expand Up @@ -179,6 +191,7 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
}

activeRequest = httpLib.get(parsed, (res: IncomingMessage) => {
if (stream.destroyed) { return; }
if (redirectStatusCodes.has(res.statusCode)) {
if (redirects++ >= opts.maxRedirects) {
stream.emit('error', new Miniget.MinigetError('Too many redirects'));
Expand Down Expand Up @@ -207,20 +220,20 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
}

let decodedStream = res as unknown as Transform;
const cleanup = (): void => {
const cleanup = () => {
res.removeListener('data', ondata);
decodedStream.removeListener('end', onend);
decodedStream.removeListener('error', onerror);
res.removeListener('error', onerror);
};
const ondata = (chunk: Buffer): void => { downloaded += chunk.length; };
const onend = (): void => {
const ondata = (chunk: Buffer) => { downloaded += chunk.length; };
const onend = () => {
cleanup();
if (!reconnectIfEndedEarly()) {
stream.end();
}
};
const onerror = (err: Miniget.MinigetError): void => {
const onerror = (err: Miniget.MinigetError) => {
cleanup();
onRequestError(err);
};
Expand All @@ -247,19 +260,39 @@ function Miniget(url: string, options: Miniget.Options = {}): Miniget.Stream {
res.on('error', onerror);
forwardEvents(res, responseEvents);
});
activeRequest.removeListener('error', noop);
activeRequest.on('error', onRequestError);
activeRequest.on('close', onRequestClose);
forwardEvents(activeRequest, requestEvents);
if (stream.destroyed) {
streamDestroy(destroyErr);
}
stream.emit('request', activeRequest);
};

stream.abort = (): void => {
aborted = true;
stream.abort = (err?: Error) => {
console.warn('`MinigetStream#abort()` has been deprecated in favor of `MinigetStream#destroy()`');
stream.aborted = true;
stream.emit('abort');
activeRequest?.abort();
stream.destroy(err);
};

let destroyErr: Error;
const streamDestroy = (err?: Error) => {
activeRequest.destroy(err);
activeDecodedStream?.unpipe(stream);
clearTimeout(retryTimeout);
};

stream._destroy = (err?: Error) => {
stream.destroyed = true;
if (activeRequest) {
streamDestroy(err);
} else {
destroyErr = err;
}
};

stream.text = async () => new Promise((resolve, reject) => {
let body = '';
stream.setEncoding('utf8');
Expand Down
47 changes: 32 additions & 15 deletions test/request-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import 'longjohn';
nock.disableNetConnect();

describe('Make a request', () => {
afterEach(() => { nock.cleanAll(); });
afterEach(() => nock.cleanAll());
let clock: sinon.SinonFakeTimers;
beforeEach(() => clock = sinon.useFakeTimers());
afterEach(() => clock.uninstall());

const stub = sinon.stub(console, 'warn');
after(() => stub.restore());

describe('with `.text()`', () => {
it('Gives entire contents of page', async () => {
const scope = nock('http://webby.com')
Expand Down Expand Up @@ -106,8 +109,8 @@ describe('Make a request', () => {
clock.tick(retryCount * 100);
});
stream.on('error', (err) => {
assert.equal(err.message, 'Status code: 500');
scope.done();
assert.equal(err.message, 'Status code: 500');
done();
});
});
Expand Down Expand Up @@ -373,7 +376,8 @@ describe('Make a request', () => {
});

const destroy = (req: ClientRequest, res: IncomingMessage): void => {
req.abort();
req.destroy();
// res.destroy();
res.unpipe();
};

Expand Down Expand Up @@ -540,8 +544,7 @@ describe('Make a request', () => {
destroy(req, res);
}
});
stream.on('error', (err) => {
assert.equal(err.message, 'socket hang up');
stream.on('close', () => {
scope.done();
assert.equal(reconnects, 2);
assert.ok(destroyed);
Expand Down Expand Up @@ -603,7 +606,7 @@ describe('Make a request', () => {
});
});

describe('that gets aborted', () => {
describe('that gets destroyed', () => {
describe('immediately', () => {
it('Does not end stream', (done) => {
nock('http://anime.me')
Expand All @@ -615,34 +618,48 @@ describe('Make a request', () => {
});
stream.on('abort', done);
stream.on('error', done);
// Use `abort()` until nock fixes emitting events with `destroy()`.
stream.abort();
});
});

describe('after getting response but before end', () => {
it('Response does not give any more data', (done) => {
it('Response does not give any data', (done) => {
const scope = nock('http://www.google1.com')
.get('/one')
.delayBody(100)
.reply(200, '<html></html>');
const stream = miniget('http://www.google1.com/one');
stream.on('end', () => {
throw Error('`end` event should not be called');
throw Error('`end` event should not emit');
});
let abortCalled = false;
stream.on('abort', () => { abortCalled = true; });

stream.on('data', () => {
throw Error('Should not read any data');
});
stream.on('error', (err) => {
const errorSpy = sinon.spy();
stream.on('error', errorSpy);
stream.on('close', () => {
scope.done();
assert.ok(abortCalled);
assert.equal(err.message, 'socket hang up');
assert.ok(!errorSpy.called);
done();
});
stream.on('response', () => {
stream.abort();
stream.destroy();
});
});
});

describe('using `abort()`', () => {
it('Emits `abort` and does not end stream', (done) => {
nock('http://anime.me')
.get('/')
.reply(200, 'ooooaaaaaaaeeeee');
const stream = miniget('http://anime.me');
stream.on('end', () => {
throw Error('`end` event should not be called');
});
stream.on('abort', done);
stream.abort();
});
});
});
Expand Down

0 comments on commit 95fc69e

Please sign in to comment.