diff --git a/spec/observables/bindNodeCallback-spec.ts b/spec/observables/bindNodeCallback-spec.ts index 16fa09e64d..a4324a7666 100644 --- a/spec/observables/bindNodeCallback-spec.ts +++ b/spec/observables/bindNodeCallback-spec.ts @@ -1,20 +1,20 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import * as Rx from '../../src/Rx'; +import { bindNodeCallback } from '../../src'; +import { TestScheduler } from '../../src/testing'; -declare const rxTestScheduler: Rx.TestScheduler; -const Observable = Rx.Observable; +declare const rxTestScheduler: TestScheduler; /** @test {bindNodeCallback} */ -describe('Observable.bindNodeCallback', () => { +describe('bindNodeCallback', () => { describe('when not scheduled', () => { it('should emit undefined when callback is called without success arguments', () => { - function callback(cb) { + function callback(cb: Function) { cb(null); } - const boundCallback = Observable.bindNodeCallback(callback); - const results = []; + const boundCallback = bindNodeCallback(callback); + const results: Array = []; boundCallback() .subscribe((x: any) => { @@ -27,14 +27,14 @@ describe('Observable.bindNodeCallback', () => { }); it('should emit one value from a callback', () => { - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum); } - const boundCallback = Observable.bindNodeCallback(callback); - const results = []; + const boundCallback = bindNodeCallback(callback); + const results: Array = []; boundCallback(42) - .subscribe((x: number) => { + .subscribe(x => { results.push(x); }, null, () => { results.push('done'); @@ -44,11 +44,11 @@ describe('Observable.bindNodeCallback', () => { }); it('should set context of callback to context of boundCallback', () => { - function callback(cb) { + function callback(this: { datum: number }, cb: Function) { cb(null, this.datum); } - const boundCallback = Observable.bindNodeCallback(callback); - const results = []; + const boundCallback = bindNodeCallback(callback); + const results: Array = []; boundCallback.call({datum: 42}) .subscribe( @@ -61,14 +61,14 @@ describe('Observable.bindNodeCallback', () => { }); it('should emit one value chosen by a selector', () => { - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum); } - const boundCallback = Observable.bindNodeCallback(callback, (datum: any) => datum); - const results = []; + const boundCallback = bindNodeCallback(callback, (datum: any) => datum); + const results: Array = []; boundCallback(42) - .subscribe((x: number) => { + .subscribe(x => { results.push(x); }, null, () => { results.push('done'); @@ -80,40 +80,40 @@ describe('Observable.bindNodeCallback', () => { it('should raise error from callback', () => { const error = new Error(); - function callback(cb) { + function callback(cb: Function) { cb(error); } - const boundCallback = Observable.bindNodeCallback(callback); - const results = []; + const boundCallback = bindNodeCallback(callback); + const results: Array = []; boundCallback() .subscribe(() => { - throw 'should not next'; + throw new Error('should not next'); }, (err: any) => { results.push(err); }, () => { - throw 'should not complete'; + throw new Error('should not complete'); }); expect(results).to.deep.equal([error]); }); it('should emit an error when the selector throws', () => { - function callback(cb) { + function callback(cb: Function) { cb(null, 42); } const expected = new Error('Yikes!'); - const boundCallback = Observable.bindNodeCallback(callback, (err: any) => { throw expected; }); + const boundCallback = bindNodeCallback(callback, (err: any) => { throw expected; }); boundCallback() .subscribe(() => { - throw 'should not next'; + throw new Error('should not next'); }, (err: any) => { expect(err).to.equal(expected); }, () => { - throw 'should not complete'; + throw new Error('should not complete'); }); }); @@ -121,14 +121,14 @@ describe('Observable.bindNodeCallback', () => { const nextSpy = sinon.spy(); const throwSpy = sinon.spy(); const completeSpy = sinon.spy(); - let timeout; - function callback(datum, cb) { + let timeout: number; + function callback(datum: number, cb: Function) { // Need to cb async in order for the unsub to trigger timeout = setTimeout(() => { cb(null, datum); }); } - const subscription = Observable.bindNodeCallback(callback)(42) + const subscription = bindNodeCallback(callback)(42) .subscribe(nextSpy, throwSpy, completeSpy); subscription.unsubscribe(); @@ -145,12 +145,12 @@ describe('Observable.bindNodeCallback', () => { describe('when scheduled', () => { it('should emit undefined when callback is called without success arguments', () => { - function callback(cb) { + function callback(cb: Function) { cb(null); } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); + const results: Array = []; boundCallback() .subscribe((x: any) => { @@ -165,14 +165,14 @@ describe('Observable.bindNodeCallback', () => { }); it('should emit one value from a callback', () => { - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum); } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); + const results: Array = []; boundCallback(42) - .subscribe((x: number) => { + .subscribe(x => { results.push(x); }, null, () => { results.push('done'); @@ -184,11 +184,11 @@ describe('Observable.bindNodeCallback', () => { }); it('should set context of callback to context of boundCallback', () => { - function callback(cb) { + function callback(this: { datum: number }, cb: Function) { cb(null, this.datum); } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); + const results: Array = []; boundCallback.call({datum: 42}) .subscribe( @@ -204,18 +204,18 @@ describe('Observable.bindNodeCallback', () => { it('should error if callback throws', () => { const expected = new Error('haha no callback for you'); - function callback(datum, cb) { + function callback(datum: number, cb: Function) { throw expected; } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); boundCallback(42) - .subscribe((x: number) => { - throw 'should not next'; + .subscribe(x => { + throw new Error('should not next'); }, (err: any) => { expect(err).to.equal(expected); }, () => { - throw 'should not complete'; + throw new Error('should not complete'); }); rxTestScheduler.flush(); @@ -224,20 +224,20 @@ describe('Observable.bindNodeCallback', () => { it('should raise error from callback', () => { const error = new Error(); - function callback(cb) { + function callback(cb: Function) { cb(error); } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); + const results: Array = []; boundCallback() .subscribe(() => { - throw 'should not next'; + throw new Error('should not next'); }, (err: any) => { results.push(err); }, () => { - throw 'should not complete'; + throw new Error('should not complete'); }); rxTestScheduler.flush(); @@ -247,35 +247,35 @@ describe('Observable.bindNodeCallback', () => { it('should error if selector throws', () => { const expected = new Error('what? a selector? I don\'t think so'); - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum); } function selector() { throw expected; } - const boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler); + const boundCallback = bindNodeCallback(callback, selector, rxTestScheduler); boundCallback(42) .subscribe((x: any) => { - throw 'should not next'; + throw new Error('should not next'); }, (err: any) => { expect(err).to.equal(expected); }, () => { - throw 'should not complete'; + throw new Error('should not complete'); }); rxTestScheduler.flush(); }); it('should use a selector', () => { - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum); } - function selector(x) { + function selector(x: number) { return x + '!!!'; } - const boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, selector, rxTestScheduler); + const results: Array = []; boundCallback(42) .subscribe((x: string) => { @@ -291,14 +291,14 @@ describe('Observable.bindNodeCallback', () => { }); it('should pass multiple inner arguments as an array', () => { - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum, 1, 2, 3); } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); + const results: Array = []; boundCallback(42) - .subscribe((x: number) => { + .subscribe(x => { results.push(x); }, null, () => { results.push('done'); @@ -310,18 +310,18 @@ describe('Observable.bindNodeCallback', () => { }); it('should pass multiple inner arguments to the selector if there is one', () => { - function callback(datum, cb) { + function callback(datum: number, cb: Function) { cb(null, datum, 1, 2, 3); } - function selector(a, b, c, d) { + function selector(a: number, b: number, c: number, d: number) { expect([a, b, c, d]).to.deep.equal([42, 1, 2, 3]); return a + b + c + d; } - const boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler); - const results = []; + const boundCallback = bindNodeCallback(callback, selector, rxTestScheduler); + const results: Array = []; boundCallback(42) - .subscribe((x: number) => { + .subscribe(x => { results.push(x); }, null, () => { results.push('done'); @@ -334,23 +334,23 @@ describe('Observable.bindNodeCallback', () => { it('should cache value for next subscription and not call callbackFunc again', () => { let calls = 0; - function callback(datum, cb) { + function callback(datum: number, cb: Function) { calls++; cb(null, datum); } - const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); - const results1 = []; - const results2 = []; + const boundCallback = bindNodeCallback(callback, null, rxTestScheduler); + const results1: Array = []; + const results2: Array = []; const source = boundCallback(42); - source.subscribe((x: number) => { + source.subscribe(x => { results1.push(x); }, null, () => { results1.push('done'); }); - source.subscribe((x: number) => { + source.subscribe(x => { results2.push(x); }, null, () => { results2.push('done'); @@ -362,4 +362,4 @@ describe('Observable.bindNodeCallback', () => { expect(results1).to.deep.equal([42, 'done']); expect(results2).to.deep.equal([42, 'done']); }); -}); \ No newline at end of file +}); diff --git a/src/internal/observable/BoundNodeCallbackObservable.ts b/src/internal/observable/BoundNodeCallbackObservable.ts deleted file mode 100644 index a53f8fe2c6..0000000000 --- a/src/internal/observable/BoundNodeCallbackObservable.ts +++ /dev/null @@ -1,279 +0,0 @@ -import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { IScheduler } from '../Scheduler'; -import { Action } from '../scheduler/Action'; -import { tryCatch } from '../util/tryCatch'; -import { errorObject } from '../util/errorObject'; -import { AsyncSubject } from '../../internal/AsyncSubject'; - -/** - * We need this JSDoc comment for affecting ESDoc. - * @extends {Ignored} - * @hide true - */ -export class BoundNodeCallbackObservable extends Observable { - subject: AsyncSubject; - - /* tslint:disable:max-line-length */ - static create(callbackFunc: (callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): () => Observable; - static create(callbackFunc: (v1: T, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T) => Observable; - static create(callbackFunc: (v1: T, v2: T2, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2) => Observable; - static create(callbackFunc: (v1: T, v2: T2, v3: T3, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3) => Observable; - static create(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4) => Observable; - static create(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable; - static create(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable; - static create(callbackFunc: Function, selector?: void, scheduler?: IScheduler): (...args: any[]) => Observable; - static create(callbackFunc: Function, selector?: (...args: any[]) => T, scheduler?: IScheduler): (...args: any[]) => Observable; - /* tslint:enable:max-line-length */ - - /** - * Converts a Node.js-style callback API to a function that returns an - * Observable. - * - * It's just like {@link bindCallback}, but the - * callback is expected to be of type `callback(error, result)`. - * - * `bindNodeCallback` is not an operator because its input and output are not - * Observables. The input is a function `func` with some parameters, but the - * last parameter must be a callback function that `func` calls when it is - * done. The callback function is expected to follow Node.js conventions, - * where the first argument to the callback is an error object, signaling - * whether call was successful. If that object is passed to callback, it means - * something went wrong. - * - * The output of `bindNodeCallback` is a function that takes the same - * parameters as `func`, except the last one (the callback). When the output - * function is called with arguments, it will return an Observable. - * If `func` calls its callback with error parameter present, Observable will - * error with that value as well. If error parameter is not passed, Observable will emit - * second parameter. If there are more parameters (third and so on), - * Observable will emit an array with all arguments, except first error argument. - * - * Optionally `bindNodeCallback` accepts selector function, which allows you to - * make resulting Observable emit value computed by selector, instead of regular - * callback arguments. It works similarly to {@link bindCallback} selector, but - * Node.js-style error argument will never be passed to that function. - * - * Note that `func` will not be called at the same time output function is, - * but rather whenever resulting Observable is subscribed. By default call to - * `func` will happen synchronously after subscription, but that can be changed - * with proper {@link Scheduler} provided as optional third parameter. Scheduler - * can also control when values from callback will be emitted by Observable. - * To find out more, check out documentation for {@link bindCallback}, where - * Scheduler works exactly the same. - * - * As in {@link bindCallback}, context (`this` property) of input function will be set to context - * of returned function, when it is called. - * - * After Observable emits value, it will complete immediately. This means - * even if `func` calls callback again, values from second and consecutive - * calls will never appear on the stream. If you need to handle functions - * that call callbacks multiple times, check out {@link fromEvent} or - * {@link fromEventPattern} instead. - * - * Note that `bindNodeCallback` can be used in non-Node.js environments as well. - * "Node.js-style" callbacks are just a convention, so if you write for - * browsers or any other environment and API you use implements that callback style, - * `bindNodeCallback` can be safely used on that API functions as well. - * - * Remember that Error object passed to callback does not have to be an instance - * of JavaScript built-in `Error` object. In fact, it does not even have to an object. - * Error parameter of callback function is interpreted as "present", when value - * of that parameter is truthy. It could be, for example, non-zero number, non-empty - * string or boolean `true`. In all of these cases resulting Observable would error - * with that value. This means usually regular style callbacks will fail very often when - * `bindNodeCallback` is used. If your Observable errors much more often then you - * would expect, check if callback really is called in Node.js-style and, if not, - * switch to {@link bindCallback} instead. - * - * Note that even if error parameter is technically present in callback, but its value - * is falsy, it still won't appear in array emitted by Observable or in selector function. - * - * - * @example Read a file from the filesystem and get the data as an Observable - * import * as fs from 'fs'; - * var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile); - * var result = readFileAsObservable('./roadNames.txt', 'utf8'); - * result.subscribe(x => console.log(x), e => console.error(e)); - * - * - * @example Use on function calling callback with multiple arguments - * someFunction((err, a, b) => { - * console.log(err); // null - * console.log(a); // 5 - * console.log(b); // "some string" - * }); - * var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction); - * boundSomeFunction() - * .subscribe(value => { - * console.log(value); // [5, "some string"] - * }); - * - * - * @example Use with selector function - * someFunction((err, a, b) => { - * console.log(err); // undefined - * console.log(a); // "abc" - * console.log(b); // "DEF" - * }); - * var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction, (a, b) => a + b); - * boundSomeFunction() - * .subscribe(value => { - * console.log(value); // "abcDEF" - * }); - * - * - * @example Use on function calling callback in regular style - * someFunction(a => { - * console.log(a); // 5 - * }); - * var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction); - * boundSomeFunction() - * .subscribe( - * value => {} // never gets called - * err => console.log(err) // 5 - *); - * - * - * @see {@link bindCallback} - * @see {@link from} - * @see {@link fromPromise} - * - * @param {function} func Function with a Node.js-style callback as the last parameter. - * @param {function} [selector] A function which takes the arguments from the - * callback and maps those to a value to emit on the output Observable. - * @param {Scheduler} [scheduler] The scheduler on which to schedule the - * callbacks. - * @return {function(...params: *): Observable} A function which returns the - * Observable that delivers the same values the Node.js callback would - * deliver. - * @static true - * @name bindNodeCallback - * @owner Observable - */ - static create(func: Function, - selector: Function | void = undefined, - scheduler?: IScheduler): (...args: any[]) => Observable { - return function(this: any, ...args: any[]): Observable { - return new BoundNodeCallbackObservable(func, selector, args, this, scheduler); - }; - } - - constructor(private callbackFunc: Function, - private selector: Function, - private args: any[], - private context: any, - public scheduler: IScheduler) { - super(); - } - - protected _subscribe(subscriber: Subscriber): Subscription { - const callbackFunc = this.callbackFunc; - const args = this.args; - const scheduler = this.scheduler; - let subject = this.subject; - - if (!scheduler) { - if (!subject) { - subject = this.subject = new AsyncSubject(); - const handler = function handlerFn(this: any, ...innerArgs: any[]) { - const source = (handlerFn).source; - const { selector, subject } = source; - const err = innerArgs.shift(); - - if (err) { - subject.error(err); - } else if (selector) { - const result = tryCatch(selector).apply(this, innerArgs); - if (result === errorObject) { - subject.error(errorObject.e); - } else { - subject.next(result); - subject.complete(); - } - } else { - subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs); - subject.complete(); - } - }; - // use named function instance to avoid closure. - (handler).source = this; - - const result = tryCatch(callbackFunc).apply(this.context, args.concat(handler)); - if (result === errorObject) { - subject.error(errorObject.e); - } - } - return subject.subscribe(subscriber); - } else { - return scheduler.schedule>(dispatch, 0, { source: this, subscriber, context: this.context }); - } - } -} - -interface DispatchState { - source: BoundNodeCallbackObservable; - subscriber: Subscriber; - context: any; -} - -function dispatch(this: Action>, state: DispatchState) { - const self = ( this); - const { source, subscriber, context } = state; - // XXX: cast to `any` to access to the private field in `source`. - const { callbackFunc, args, scheduler } = source as any; - let subject = source.subject; - - if (!subject) { - subject = source.subject = new AsyncSubject(); - - const handler = function handlerFn(this: any, ...innerArgs: any[]) { - const source = (handlerFn).source; - const { selector, subject } = source; - const err = innerArgs.shift(); - - if (err) { - self.add(scheduler.schedule(dispatchError, 0, { err, subject })); - } else if (selector) { - const result = tryCatch(selector).apply(this, innerArgs); - if (result === errorObject) { - self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject })); - } else { - self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject })); - } - } else { - const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; - self.add(scheduler.schedule(dispatchNext, 0, { value, subject })); - } - }; - // use named function to pass values in without closure - (handler).source = source; - - const result = tryCatch(callbackFunc).apply(context, args.concat(handler)); - if (result === errorObject) { - self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject })); - } - } - - self.add(subject.subscribe(subscriber)); -} - -interface DispatchNextArg { - subject: AsyncSubject; - value: T; -} -function dispatchNext(arg: DispatchNextArg) { - const { value, subject } = arg; - subject.next(value); - subject.complete(); -} - -interface DispatchErrorArg { - subject: AsyncSubject; - err: any; -} -function dispatchError(arg: DispatchErrorArg) { - const { err, subject } = arg; - subject.error(err); -} diff --git a/src/internal/observable/bindNodeCallback.ts b/src/internal/observable/bindNodeCallback.ts index eea04dfb58..a360560abd 100644 --- a/src/internal/observable/bindNodeCallback.ts +++ b/src/internal/observable/bindNodeCallback.ts @@ -1,3 +1,269 @@ -import { BoundNodeCallbackObservable } from './BoundNodeCallbackObservable'; +import { Observable } from '../Observable'; +import { IScheduler } from '../Scheduler'; +import { AsyncSubject } from '../AsyncSubject'; +import { Subscriber } from '../Subscriber'; +import { Action } from '../scheduler/Action'; -export const bindNodeCallback = BoundNodeCallbackObservable.create; \ No newline at end of file +/* tslint:disable:max-line-length */ +export function bindNodeCallback(callbackFunc: (callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): () => Observable; +export function bindNodeCallback(callbackFunc: (v1: T, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T) => Observable; +export function bindNodeCallback(callbackFunc: (v1: T, v2: T2, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2) => Observable; +export function bindNodeCallback(callbackFunc: (v1: T, v2: T2, v3: T3, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3) => Observable; +export function bindNodeCallback(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4) => Observable; +export function bindNodeCallback(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => Observable; +export function bindNodeCallback(callbackFunc: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, callback: (err: any, result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => Observable; +export function bindNodeCallback(callbackFunc: Function, selector?: void, scheduler?: IScheduler): (...args: any[]) => Observable; +export function bindNodeCallback(callbackFunc: Function, selector?: (...args: any[]) => T, scheduler?: IScheduler): (...args: any[]) => Observable; +/* tslint:enable:max-line-length */ + +/** + * Converts a Node.js-style callback API to a function that returns an + * Observable. + * + * It's just like {@link bindCallback}, but the + * callback is expected to be of type `callback(error, result)`. + * + * `bindNodeCallback` is not an operator because its input and output are not + * Observables. The input is a function `func` with some parameters, but the + * last parameter must be a callback function that `func` calls when it is + * done. The callback function is expected to follow Node.js conventions, + * where the first argument to the callback is an error object, signaling + * whether call was successful. If that object is passed to callback, it means + * something went wrong. + * + * The output of `bindNodeCallback` is a function that takes the same + * parameters as `func`, except the last one (the callback). When the output + * function is called with arguments, it will return an Observable. + * If `func` calls its callback with error parameter present, Observable will + * error with that value as well. If error parameter is not passed, Observable will emit + * second parameter. If there are more parameters (third and so on), + * Observable will emit an array with all arguments, except first error argument. + * + * Optionally `bindNodeCallback` accepts selector function, which allows you to + * make resulting Observable emit value computed by selector, instead of regular + * callback arguments. It works similarly to {@link bindCallback} selector, but + * Node.js-style error argument will never be passed to that function. + * + * Note that `func` will not be called at the same time output function is, + * but rather whenever resulting Observable is subscribed. By default call to + * `func` will happen synchronously after subscription, but that can be changed + * with proper {@link Scheduler} provided as optional third parameter. Scheduler + * can also control when values from callback will be emitted by Observable. + * To find out more, check out documentation for {@link bindCallback}, where + * Scheduler works exactly the same. + * + * As in {@link bindCallback}, context (`this` property) of input function will be set to context + * of returned function, when it is called. + * + * After Observable emits value, it will complete immediately. This means + * even if `func` calls callback again, values from second and consecutive + * calls will never appear on the stream. If you need to handle functions + * that call callbacks multiple times, check out {@link fromEvent} or + * {@link fromEventPattern} instead. + * + * Note that `bindNodeCallback` can be used in non-Node.js environments as well. + * "Node.js-style" callbacks are just a convention, so if you write for + * browsers or any other environment and API you use implements that callback style, + * `bindNodeCallback` can be safely used on that API functions as well. + * + * Remember that Error object passed to callback does not have to be an instance + * of JavaScript built-in `Error` object. In fact, it does not even have to an object. + * Error parameter of callback function is interpreted as "present", when value + * of that parameter is truthy. It could be, for example, non-zero number, non-empty + * string or boolean `true`. In all of these cases resulting Observable would error + * with that value. This means usually regular style callbacks will fail very often when + * `bindNodeCallback` is used. If your Observable errors much more often then you + * would expect, check if callback really is called in Node.js-style and, if not, + * switch to {@link bindCallback} instead. + * + * Note that even if error parameter is technically present in callback, but its value + * is falsy, it still won't appear in array emitted by Observable or in selector function. + * + * + * @example Read a file from the filesystem and get the data as an Observable + * import * as fs from 'fs'; + * var readFileAsObservable = bindNodeCallback(fs.readFile); + * var result = readFileAsObservable('./roadNames.txt', 'utf8'); + * result.subscribe(x => console.log(x), e => console.error(e)); + * + * + * @example Use on function calling callback with multiple arguments + * someFunction((err, a, b) => { + * console.log(err); // null + * console.log(a); // 5 + * console.log(b); // "some string" + * }); + * var boundSomeFunction = bindNodeCallback(someFunction); + * boundSomeFunction() + * .subscribe(value => { + * console.log(value); // [5, "some string"] + * }); + * + * + * @example Use with selector function + * someFunction((err, a, b) => { + * console.log(err); // undefined + * console.log(a); // "abc" + * console.log(b); // "DEF" + * }); + * var boundSomeFunction = bindNodeCallback(someFunction, (a, b) => a + b); + * boundSomeFunction() + * .subscribe(value => { + * console.log(value); // "abcDEF" + * }); + * + * + * @example Use on function calling callback in regular style + * someFunction(a => { + * console.log(a); // 5 + * }); + * var boundSomeFunction = bindNodeCallback(someFunction); + * boundSomeFunction() + * .subscribe( + * value => {} // never gets called + * err => console.log(err) // 5 + * ); + * + * + * @see {@link bindCallback} + * @see {@link from} + * @see {@link fromPromise} + * + * @param {function} func Function with a Node.js-style callback as the last parameter. + * @param {function} [selector] A function which takes the arguments from the + * callback and maps those to a value to emit on the output Observable. + * @param {Scheduler} [scheduler] The scheduler on which to schedule the + * callbacks. + * @return {function(...params: *): Observable} A function which returns the + * Observable that delivers the same values the Node.js callback would + * deliver. + * @name bindNodeCallback + */ +export function bindNodeCallback(callbackFunc: Function, + selector: Function | void = undefined, + scheduler?: IScheduler): (...args: any[]) => Observable { + return function(this: any, ...args: any[]): Observable { + const params: ParamsState = { + subject: undefined, + args, + callbackFunc, + scheduler, + selector, + context: this, + }; + return new Observable(subscriber => { + const { context } = params; + let { subject } = params; + if (!scheduler) { + if (!subject) { + subject = params.subject = new AsyncSubject(); + const handler = function handlerFn(this: any, ...innerArgs: any[]) { + const err = innerArgs.shift(); + + if (err) { + subject.error(err); + return; + } + + let result: T; + if (selector) { + try { + result = selector(...innerArgs); + } catch (err) { + subject.error(err); + return; + } + } else { + result = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; + } + + subject.next(result); + subject.complete(); + }; + + try { + callbackFunc.apply(context, [...args, handler]); + } catch (err) { + subject.error(err); + } + } + return subject.subscribe(subscriber); + } else { + return scheduler.schedule>(dispatch, 0, { params, subscriber, context }); + } + }); + }; +} + +interface DispatchState { + subscriber: Subscriber; + context: any; + params: ParamsState; +} + +interface ParamsState { + callbackFunc: Function; + args: any[]; + scheduler: IScheduler; + subject: AsyncSubject; + selector: Function | void; + context: any; +} + +function dispatch(this: Action>, state: DispatchState) { + const { params, subscriber, context } = state; + const { callbackFunc, args, scheduler, selector } = params; + let subject = params.subject; + + if (!subject) { + subject = params.subject = new AsyncSubject(); + + const handler = (...innerArgs: any[]) => { + const err = innerArgs.shift(); + if (err) { + this.add(scheduler.schedule>(dispatchError, 0, { err, subject })); + } else if (selector) { + let result: T; + try { + result = selector(...innerArgs); + } catch (err) { + this.add(scheduler.schedule>(dispatchError, 0, { err, subject })); + return; + } + this.add(scheduler.schedule>(dispatchNext, 0, { value: result, subject })); + } else { + const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; + this.add(scheduler.schedule>(dispatchNext, 0, { value, subject })); + } + }; + + try { + callbackFunc.apply(context, [...args, handler]); + } catch (err) { + this.add(scheduler.schedule>(dispatchError, 0, { err, subject })); + } + } + + this.add(subject.subscribe(subscriber)); +} + +interface DispatchNextArg { + subject: AsyncSubject; + value: T; +} + +function dispatchNext(arg: DispatchNextArg) { + const { value, subject } = arg; + subject.next(value); + subject.complete(); +} + +interface DispatchErrorArg { + subject: AsyncSubject; + err: any; +} + +function dispatchError(arg: DispatchErrorArg) { + const { err, subject } = arg; + subject.error(err); +}