-
Notifications
You must be signed in to change notification settings - Fork 7
/
most-w3msg.js
172 lines (148 loc) · 5.98 KB
/
most-w3msg.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/** @license MIT License (c) copyright 2010-2015 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/* globals Promise */
var most = require('most');
var create = require('@most/create');
var fromPromise = most.fromPromise;
var defaultMessageEvent = 'message';
exports.fromWebSocket = fromMessageSource;
exports.toWebSocket = toWebSocket;
// EventSource is read-only. See https://developer.mozilla.org/en-US/docs/Web/API/EventSource
exports.fromEventSource = fromMessageSource;
exports.fromEventSourceOn = fromMessageSourceEvent;
exports.fromMessagePort = fromMessageSource;
exports.toMessagePort = toPort;
exports.fromWorker = fromMessageSource;
exports.toWorker = toPort;
/**
* Create a stream from a "source", which can be a WebSocket, EventSource,
* MessagePort, Worker, or anything that supports addEventListener and "message"
* events. The stream will end when the source closes (emits a "close" event),
* and will fail if the source fails (emits an "error" event)
* @param {WebSocket} source WebSocket (or compatible, eg SockJS), EventSource,
* Worker, etc from which to create a stream
* @param {function():*} dispose function to execute when the source is closed,
* fails, or all consumers lose interest.
* @returns {Stream} stream containing all the "message" events received by the source
*/
function fromMessageSource(source, dispose) {
return fromMessageSourceEvent(defaultMessageEvent, source, dispose);
}
/**
* Create a stream from a "source", which can be a WebSocket, EventSource,
* MessagePort, Worker, or anything that supports addEventListener
* events. The stream will end when the source closes (emits a "close" event),
* and will fail if the source fails (emits an "error" event)
* @private
* @param {string} eventName name of the specific event to listen to
* @param {WebSocket} source WebSocket (or compatible, eg SockJS), EventSource,
* Worker, etc from which to create a stream
* @param {function():*} dispose function to execute when the source is closed,
* fails, or all consumers lose interest.
* @returns {Stream} stream containing all the "message" events received by the source
*/
function fromMessageSourceEvent(eventName, source, dispose) {
return create(function(add, end, error) {
return pipeFromSource(source, eventName, dispose, add, end, error);
});
}
/**
* Send all events in a stream to a WebSocket
* @param {Stream} stream Stream whose events will be sent to the WebSocket
* @param {WebSocket} socket WebSocket (or compatible, eg SockJS) to which to
* send events
* @returns {Promise} promise for the end of the stream. If the WebSocket closes
* before the stream ends, the returned promise will fulfill if the WebSocket
* closes cleanly, or will reject if the WebSocket errors. If the stream ends
* before the WebSocket closes, the returned promise will fulfill if the stream
* ends cleanly, or will reject if the stream errors.
*/
function toWebSocket(stream, socket) {
return pipeToSink(stream, initOpenable, send, socket);
}
function send(socket, msg) {
socket.send(msg);
}
function initOpenable(openable) {
return new Promise(function(resolve, reject) {
openable.addEventListener('open', resolve);
openable.addEventListener('error', reject);
});
}
/**
* Send all events in a stream to anything with a postMessage API
* @param {Stream} stream Stream whose events will be posted to the sink
* @param {{postMessage:function(*)}} sink object with postMessage API
* @returns {Promise} promise for the end of the stream. If the WebSocket closes
* before the stream ends, the returned promise will fulfill if the sink
* closes cleanly, or will reject if the WebSocket errors. If the stream ends
* before the WebSocket closes, the returned promise will fulfill if the stream
* ends cleanly, or will reject if the stream errors.
*/
function toPort(stream, sink) {
return pipeToSink(stream, Promise.resolve, postMessage, sink);
}
function postMessage(sink, msg) {
sink.postMessage(msg);
}
/**
* Pipe all events from a source to a stream
* @private
* @param {{addEventListener:function}} source that supports at least "message" events
* @param {function} dispose optional function to execute when stream ends
* @param {function(x:*)} add function to add an event to the stream
* @param {function()} end function to end the stream
* @param {function(e:Error)} error function to signal the stream has failed
* @returns {function} function to remove event handlers and call dispose if provided
*/
function pipeFromSource(source, eventName, dispose, add, end, error) {
if(typeof dispose !== 'function') {
dispose = noop;
}
if('onopen' in source) {
source.addEventListener('open', onOpen);
} else {
onOpen();
}
function onOpen() {
source.addEventListener('close', end);
source.addEventListener('error', error);
source.addEventListener(eventName, add);
}
return function() {
if('onopen' in source) {
source.removeEventListener('open', add);
}
source.removeEventListener('close', end);
source.removeEventListener('error', error);
source.removeEventListener(eventName, add);
return dispose();
};
}
/**
* Pipe all events in a stream to a sink
* @private
* @param {Stream} stream event stream to pipe to sink
* @param {function:Promise} init function to initialize the sink before
* messages are sent.
* @param {function} send function to send a message to sink
* @param {*} sink
* @returns {Promise} promise that fulfills once the stream ends (ie all events have been sent
* to sink), or rejects when stream or sink fails.
*/
function pipeToSink(stream, init, send, sink) {
return init(sink).then(function() {
return doSendMessage(stream, send, sink);
});
}
function doSendMessage(stream, send, sink) {
var endSignal = fromPromise(new Promise(function(resolve, reject) {
sink.addEventListener('close', resolve);
sink.addEventListener('error', reject);
}));
return stream.takeUntil(endSignal).forEach(function (x) {
send(sink, x);
});
}
function noop() {}