Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close cleanup #4

Merged
merged 13 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ language: dart
sudo: required
dist: trusty
addons:
chrome: stable
apt:
sources:
- google-chrome
Expand All @@ -29,7 +30,7 @@ before_install:
- sh -e /etc/init.d/xvfb start

before_script:
- wget http://chromedriver.storage.googleapis.com/2.35/chromedriver_linux64.zip
- wget http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip
- unzip chromedriver_linux64.zip
- export PATH=$PATH:$PWD
- ./tool/travis-setup.sh
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.0.0

- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
handled by the underlying `stream` / `sink`.
- Fix a bug where resources of the `SseConnection` were not properly closed.

## 1.0.0

- Internal cleanup.
Expand Down
54 changes: 31 additions & 23 deletions lib/server/sse_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'

/// A bi-directional SSE connection between server and browser.
class SseConnection extends StreamChannelMixin<String> {
/// Incoming messages from the Browser client.
final _incomingController = StreamController<String>();

/// Outgoing messages to the Browser client.
final _outgoingController = StreamController<String>();
final _closeCompleter = Completer<Null>();

final Sink _sink;
final String _clientId;

SseConnection(this._sink, this._clientId) {
final _closedCompleter = Completer<void>();

SseConnection(this._sink) {
_outgoingController.stream.listen((data) {
if (!_closeCompleter.isCompleted) {
if (!_closedCompleter.isCompleted) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
}
});
_outgoingController.onCancel = _close;
_incomingController.onCancel = _close;
}

Future get onClose => _closeCompleter.future;

/// The message added to the sink has to be JSON encodable.
@override
StreamSink<String> get sink => _outgoingController.sink;
Expand All @@ -50,8 +54,13 @@ class SseConnection extends StreamChannelMixin<String> {
@override
Stream<String> get stream => _incomingController.stream;

void close() {
if (!_closeCompleter.isCompleted) _closeCompleter.complete();
void _close() {
if (!_closedCompleter.isCompleted) {
_closedCompleter.complete();
_sink.close();
if (!_outgoingController.isClosed) _outgoingController.close();
if (!_incomingController.isClosed) _incomingController.close();
}
}
}

Expand All @@ -63,15 +72,15 @@ class SseConnection extends StreamChannelMixin<String> {
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;

final Set<SseConnection> _connections = Set<SseConnection>();

final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();

StreamQueue<SseConnection> _connectionsStream;

SseHandler(this._uri);

StreamQueue<SseConnection> get connections =>
StreamQueue(_connectionController.stream);
_connectionsStream ??= StreamQueue(_connectionController.stream);

shelf.Handler get handler => _handle;

Expand All @@ -82,19 +91,22 @@ class SseHandler {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
sink.add(_sseHeaders(req.headers['origin']));
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink, clientId);
_connections.add(connection);
unawaited(connection.onClose.then((_) {
_connections.remove(connection);
var connection = SseConnection(sink);
_connections[clientId] = connection;
unawaited(connection._closedCompleter.future.then((_) {
_connections.remove(clientId);
}));
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
connection.close();
connection._close();
});

_connectionController.add(connection);
});
return null;
return shelf.Response.notFound('');
}

String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
Expand Down Expand Up @@ -122,11 +134,7 @@ class SseHandler {
var clientId = req.url.queryParameters['sseClientId'];
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
for (var connection in _connections) {
if (connection._clientId == clientId) {
connection._incomingController.add(jsonObject);
}
}
_connections[clientId]?._incomingController?.add(jsonObject);
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
}
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: sse
version: 1.0.0
version: 2.0.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/sse
description: >-
Expand Down
46 changes: 39 additions & 7 deletions test/sse_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ void main() {
HttpServer server;
WebDriver webdriver;
SseHandler handler;
Process chromeDriver;

setUpAll(() async {
try {
chromeDriver = await Process.start(
'chromedriver', ['--port=4444', '--url-base=wd/hub']);
} catch (e) {
throw StateError(
'Could not start ChromeDriver. Is it installed?\nError: $e');
}
});

tearDownAll(() {
chromeDriver.kill();
});

setUp(() async {
handler = SseHandler(Uri.parse('/test'));
Expand All @@ -28,7 +43,11 @@ void main() {
listDirectories: true, defaultDocument: 'index.html'));

server = await io.serve(cascade.handler, 'localhost', 0);
webdriver = await createDriver();
webdriver = await createDriver(desired: {
'chromeOptions': {
'args': ['--headless']
}
});
});

tearDown(() async {
Expand All @@ -55,12 +74,12 @@ void main() {
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
var connectionA = await connections.next;
connectionA.sink.add('foo');
expect(await connectionA.stream.first, 'foo');

await webdriver.get('http://localhost:${server.port}');
var connectionB = await connections.next;

connectionA.sink.add('foo');
connectionB.sink.add('bar');
await connectionA.onClose;
expect(await connectionB.stream.first, 'bar');
});

Expand All @@ -69,8 +88,8 @@ void main() {
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
connection.close();
await connection.onClose;
await connection.sink.close();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

Expand All @@ -83,7 +102,20 @@ void main() {
var closeButton = await webdriver.findElement(const By.tagName('button'));
await closeButton.click();

await connection.onClose;
// Should complete since the connection is closed.
await connection.stream.toList();
expect(handler.numberOfClients, 0);
});

test('Cancelling the listener closes the connection', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

var sub = connection.stream.listen((_) {});
await sub.cancel();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

Expand Down
4 changes: 0 additions & 4 deletions tool/travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ if [[ $ANALYSIS_STATUS -ne 0 ]]; then
STATUS=$ANALYSIS_STATUS
fi

# Start chromedriver.
chromedriver --port=4444 --url-base=wd/hub &
PIDC=$!

# Run tests.
pub run test -r expanded -p vm -j 1
TEST_STATUS=$?
Expand Down