-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(operator): add buffer operators: buffer, bufferWhen, bufferTime,…
… bufferCount, and bufferToggle closes #207
- Loading branch information
Showing
13 changed files
with
575 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.buffer', function () { | ||
it('should emit buffers that close and reopen', function (done) { | ||
var expected = [ | ||
[0, 1, 2], | ||
[3, 4, 5], | ||
[6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.buffer(Observable.interval(320)) | ||
.take(3) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.bufferCount', function () { | ||
it('should emit buffers at intervals', function (done) { | ||
var expected = [ | ||
[0, 1], | ||
[1, 2], | ||
[2, 3], | ||
[3] | ||
]; | ||
Observable.range(0, 4) | ||
.bufferCount(2, 1) | ||
.take(3) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.bufferTime', function () { | ||
it('should emit buffers at intervals', function (done) { | ||
var expected = [ | ||
[0, 1, 2], | ||
[3, 4, 5], | ||
[6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.bufferTime(320) | ||
.take(3) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
|
||
|
||
it('should emit buffers that have been created at intervals and close after the specified delay', function (done) { | ||
var expected = [ | ||
[0, 1, 2, 3, 4], | ||
[2, 3, 4, 5, 6], | ||
[4, 5, 6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.bufferTime(520, 220) | ||
.take(3) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.bufferToggle', function () { | ||
it('should emit buffers that are opened by an observable from the first argument and closed by an observable returned by the function in the second argument', function (done) { | ||
Observable.interval(100).take(10) | ||
.bufferToggle(Observable.timer(320).mapTo('test'), function (n) { | ||
expect(n).toBe('test'); | ||
return Observable.timer(320); | ||
}) | ||
.subscribe(function (w) { | ||
expect(w).toEqual([3, 4, 5]) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* globals describe, it, expect */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.prototype.bufferWhen', function () { | ||
it('should emit buffers that close and reopen', function (done) { | ||
var expected = [ | ||
[0, 1, 2], | ||
[3, 4, 5], | ||
[6, 7, 8] | ||
]; | ||
Observable.interval(100) | ||
.bufferWhen(function () { return Observable.timer(320); }) | ||
.take(3) | ||
.subscribe(function (w) { | ||
expect(w).toEqual(expected.shift()) | ||
}, null, done); | ||
}, 2000); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import Operator from '../Operator'; | ||
import Observer from '../Observer'; | ||
import Subscriber from '../Subscriber'; | ||
import Observable from '../Observable'; | ||
|
||
import tryCatch from '../util/tryCatch'; | ||
import {errorObject} from '../util/errorObject'; | ||
import bindCallback from '../util/bindCallback'; | ||
|
||
export default function buffer<T>(closingNotifier: Observable<any>): Observable<T[]> { | ||
return this.lift(new BufferOperator(closingNotifier)); | ||
} | ||
|
||
export class BufferOperator<T, R> implements Operator<T, R> { | ||
|
||
constructor(private closingNotifier: Observable<any>) { | ||
} | ||
|
||
call(observer: Observer<T>): Observer<T> { | ||
return new BufferSubscriber(observer, this.closingNotifier); | ||
} | ||
} | ||
|
||
export class BufferSubscriber<T> extends Subscriber<T> { | ||
buffer: T[] = []; | ||
|
||
constructor(destination: Observer<T>, closingNotifier: Observable<any>) { | ||
super(destination); | ||
this.add(closingNotifier.subscribe(new BufferClosingNotifierSubscriber(this))); | ||
} | ||
|
||
_next(value: T) { | ||
this.buffer.push(value); | ||
} | ||
|
||
_error(err: any) { | ||
this.destination.error(err); | ||
} | ||
|
||
_complete() { | ||
this.flushBuffer(); | ||
this.destination.complete(); | ||
} | ||
|
||
flushBuffer() { | ||
const buffer = this.buffer; | ||
this.buffer = []; | ||
this.destination.next(buffer); | ||
} | ||
} | ||
|
||
export class BufferClosingNotifierSubscriber<T> extends Subscriber<T> { | ||
constructor(private parent: BufferSubscriber<any>) { | ||
super(null); | ||
} | ||
|
||
_next(value: T) { | ||
this.parent.flushBuffer(); | ||
} | ||
|
||
_error(err: any) { | ||
this.parent.error(err); | ||
} | ||
|
||
_complete() { | ||
// noop | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import Operator from '../Operator'; | ||
import Observer from '../Observer'; | ||
import Subscriber from '../Subscriber'; | ||
import Observable from '../Observable'; | ||
|
||
import tryCatch from '../util/tryCatch'; | ||
import {errorObject} from '../util/errorObject'; | ||
import bindCallback from '../util/bindCallback'; | ||
|
||
export default function bufferCount<T>(bufferSize: number, startBufferEvery: number = null): Observable<T[]> { | ||
return this.lift(new BufferCountOperator(bufferSize, startBufferEvery)); | ||
} | ||
|
||
export class BufferCountOperator<T, R> implements Operator<T, R> { | ||
|
||
constructor(private bufferSize: number, private startBufferEvery: number) { | ||
} | ||
|
||
call(observer: Observer<T>): Observer<T> { | ||
return new BufferCountSubscriber(observer, this.bufferSize, this.startBufferEvery); | ||
} | ||
} | ||
|
||
export class BufferCountSubscriber<T> extends Subscriber<T> { | ||
buffers: Array<T[]> = [[]]; | ||
count: number = 0; | ||
|
||
constructor(destination: Observer<T>, private bufferSize: number, private startBufferEvery: number) { | ||
super(destination); | ||
} | ||
|
||
_next(value: T) { | ||
const count = (this.count += 1); | ||
const destination = this.destination; | ||
const bufferSize = this.bufferSize; | ||
const startBufferEvery = this.startBufferEvery; | ||
const buffers = this.buffers; | ||
const len = buffers.length; | ||
let remove = -1; | ||
|
||
if (count % startBufferEvery === 0) { | ||
buffers.push([]); | ||
} | ||
|
||
for (let i = 0; i < len; i++) { | ||
let buffer = buffers[i]; | ||
buffer.push(value); | ||
if (buffer.length === bufferSize) { | ||
remove = i; | ||
this.destination.next(buffer); | ||
} | ||
} | ||
|
||
if (remove !== -1) { | ||
buffers.splice(remove, 1); | ||
} | ||
} | ||
|
||
_error(err: any) { | ||
this.destination.error(err); | ||
} | ||
|
||
_complete() { | ||
const destination = this.destination; | ||
const buffers = this.buffers; | ||
while (buffers.length > 0) { | ||
destination.next(buffers.shift()); | ||
} | ||
destination.complete(); | ||
} | ||
} |
Oops, something went wrong.