This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
groupjoin.js
98 lines (85 loc) · 3.71 KB
/
groupjoin.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* Correlates the elements of two sequences based on overlapping durations, and groups the results.
*
* @param {Observable} right The right observable sequence to join elements for.
* @param {Function} leftDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the left observable sequence, used to determine overlap.
* @param {Function} rightDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the right observable sequence, used to determine overlap.
* @param {Function} resultSelector A function invoked to compute a result element for any element of the left sequence with overlapping elements from the right observable sequence. The first parameter passed to the function is an element of the left sequence. The second parameter passed to the function is an observable sequence with elements from the right sequence that overlap with the left sequence's element.
* @returns {Observable} An observable sequence that contains result elements computed from source elements that have an overlapping duration.
*/
observableProto.groupJoin = function (right, leftDurationSelector, rightDurationSelector, resultSelector) {
var left = this;
return new AnonymousObservable(function (o) {
var group = new CompositeDisposable();
var r = new RefCountDisposable(group);
var leftMap = new Map(), rightMap = new Map();
var leftId = 0, rightId = 0;
var handleError = function (e) { return function (v) { v.onError(e); }; };
function handleError(e) { };
group.add(left.subscribe(
function (value) {
var s = new Subject();
var id = leftId++;
leftMap.set(id, s);
var result = tryCatch(resultSelector)(value, addRef(s, r));
if (result === errorObj) {
leftMap.forEach(handleError(result.e));
return o.onError(result.e);
}
o.onNext(result);
rightMap.forEach(function (v) { s.onNext(v); });
var md = new SingleAssignmentDisposable();
group.add(md);
var duration = tryCatch(leftDurationSelector)(value);
if (duration === errorObj) {
leftMap.forEach(handleError(duration.e));
return o.onError(duration.e);
}
md.setDisposable(duration.take(1).subscribe(
noop,
function (e) {
leftMap.forEach(handleError(e));
o.onError(e);
},
function () {
leftMap['delete'](id) && s.onCompleted();
group.remove(md);
}));
},
function (e) {
leftMap.forEach(handleError(e));
o.onError(e);
},
function () { o.onCompleted(); })
);
group.add(right.subscribe(
function (value) {
var id = rightId++;
rightMap.set(id, value);
var md = new SingleAssignmentDisposable();
group.add(md);
var duration = tryCatch(rightDurationSelector)(value);
if (duration === errorObj) {
leftMap.forEach(handleError(duration.e));
return o.onError(duration.e);
}
md.setDisposable(duration.take(1).subscribe(
noop,
function (e) {
leftMap.forEach(handleError(e));
o.onError(e);
},
function () {
rightMap['delete'](id);
group.remove(md);
}));
leftMap.forEach(function (v) { v.onNext(value); });
},
function (e) {
leftMap.forEach(handleError(e));
o.onError(e);
})
);
return r;
}, left);
};