-
Notifications
You must be signed in to change notification settings - Fork 206
/
timeslice.js
112 lines (88 loc) · 2.71 KB
/
timeslice.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
import Stream from '../Stream'
import Pipe from '../sink/Pipe'
import * as dispose from '../disposable/dispose'
import { join } from '../combinator/flatMap'
export function takeUntil (signal, stream) {
return new Stream(new Until(signal.source, stream.source))
}
export function skipUntil (signal, stream) {
return new Stream(new Since(signal.source, stream.source))
}
export function during (timeWindow, stream) {
return takeUntil(join(timeWindow), skipUntil(timeWindow, stream))
}
function Until (maxSignal, source) {
this.maxSignal = maxSignal
this.source = source
}
Until.prototype.run = function (sink, scheduler) {
var min = new Bound(-Infinity, sink)
var max = new UpperBound(this.maxSignal, sink, scheduler)
var disposable = this.source.run(new TimeWindowSink(min, max, sink), scheduler)
return dispose.all([min, max, disposable])
}
function Since (minSignal, source) {
this.minSignal = minSignal
this.source = source
}
Since.prototype.run = function (sink, scheduler) {
var min = new LowerBound(this.minSignal, sink, scheduler)
var max = new Bound(Infinity, sink)
var disposable = this.source.run(new TimeWindowSink(min, max, sink), scheduler)
return dispose.all([min, max, disposable])
}
function Bound (value, sink) {
this.value = value
this.sink = sink
}
Bound.prototype.error = Pipe.prototype.error
Bound.prototype.event = noop
Bound.prototype.end = noop
Bound.prototype.dispose = noop
function TimeWindowSink (min, max, sink) {
this.min = min
this.max = max
this.sink = sink
}
TimeWindowSink.prototype.event = function (t, x) {
if (t >= this.min.value && t < this.max.value) {
this.sink.event(t, x)
}
}
TimeWindowSink.prototype.error = Pipe.prototype.error
TimeWindowSink.prototype.end = Pipe.prototype.end
function LowerBound (signal, sink, scheduler) {
this.value = Infinity
this.sink = sink
this.disposable = signal.run(this, scheduler)
}
LowerBound.prototype.event = function (t /*, x */) {
if (t < this.value) {
this.value = t
}
}
LowerBound.prototype.end = noop
LowerBound.prototype.error = Pipe.prototype.error
LowerBound.prototype.dispose = function () {
return this.disposable.dispose()
}
function UpperBound (signal, sink, scheduler) {
this.value = Infinity
this.sink = sink
this.disposable = signal.run(this, scheduler)
}
UpperBound.prototype.event = function (t, x) {
if (t < this.value) {
this.value = t
this.sink.end(t, x)
}
}
UpperBound.prototype.end = noop
UpperBound.prototype.error = Pipe.prototype.error
UpperBound.prototype.dispose = function () {
return this.disposable.dispose()
}
function noop () {}