Skip to content

Commit

Permalink
Isolate examples documentation (#5332)
Browse files Browse the repository at this point in the history
This PR is being split into two. The first one can be found
[here](#5402).

This PR updates documentation at language/concurrency with more
information, and adds a page language/isolates with examples.

Fixes #3720

**Staged:**

-
[Concurrency](https://dart-dev--pr5332-isolate-documentatio-gmqcme0e.web.app/language/concurrency)
_(Updated)_
-
[Isolates](https://dart-dev--pr5332-isolate-documentatio-gmqcme0e.web.app/language/isolates)
_(New)_

---------

Co-authored-by: Parker Lougheed <parlough@gmail.com>
Co-authored-by: Marya <111139605+MaryaBelanger@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 4, 2024
1 parent d81d2ba commit 87996d7
Show file tree
Hide file tree
Showing 25 changed files with 1,881 additions and 11 deletions.
56 changes: 56 additions & 0 deletions examples/concurrency/lib/basic_ports_example/complete.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
final worker = Worker();
await worker.spawn();
await worker.parseJson('{"key":"value"}');
}

// #docregion handleResponses parseJson
class Worker {
late SendPort _sendPort;
final Completer<void> _isolateReady = Completer.sync();
// #enddocregion handleResponses parseJson

// #docregion spawn
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
// #enddocregion spawn

// #docregion handleResponses
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
// #enddocregion handleResponses

// #docregion startRemoteIsolate
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);

receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
// #enddocregion startRemoteIsolate

// #docregion parseJson
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
// #enddocregion parseJson
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// ignore_for_file: unused_field

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

// #docregion
class Worker {
late SendPort _sendPort;

// spawn method

void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
} else if (message is Map<String, dynamic>) {
print(message);
}
}

// rest of class..
// #enddocregion

Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);

receivePort.listen((dynamic message) async {
final decoded = jsonDecode(message as String);
port.send(decoded);
});
}

Future<void> parseJson(String message) async {
// TODO: Define a public method that can
// be used to send messages to the worker isolate.
}
}
44 changes: 44 additions & 0 deletions examples/concurrency/lib/basic_ports_example/parse_json.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// ignore_for_file: unused_field

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

// #docregion
class Worker {
late SendPort _sendPort;
final Completer<void> _isolateReady = Completer.sync(); // New

void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete(); // New
} else if (message is Map<String, dynamic>) {
print(message);
}
}

// New
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
// rest of class..
// #enddocregion

Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);

receivePort.listen((dynamic message) async {
final decoded = jsonDecode(message as String);
port.send(decoded);
});
}
}
27 changes: 27 additions & 0 deletions examples/concurrency/lib/basic_ports_example/spawn.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// ignore_for_file: unused_field

import 'dart:async';
import 'dart:isolate';

class Worker {
// #docregion
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
// #enddocregion

void _handleResponsesFromIsolate(dynamic message) {
// TODO: Define code that should be executed on the worker isolate.
}

static void _startRemoteIsolate(SendPort port) {
// TODO: Handle messages sent back from the worker isolate.
}

Future<void> parseJson(String message) async {
// TODO: Define a public method that can
// be used to send messages to the worker isolate.
}
}
23 changes: 23 additions & 0 deletions examples/concurrency/lib/basic_ports_example/start.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// ignore_for_file: unused_field, unused_element
import 'dart:isolate';

// #docregion
class Worker {
Future<void> spawn() async {
// TODO: Add functionality to spawn a worker isolate.
}

void _handleResponsesFromIsolate(dynamic message) {
// TODO: Define code that should be executed on the worker isolate.
}

static void _startRemoteIsolate(SendPort port) {
// TODO: Handle messages sent back from the worker isolate.
}

Future<void> parseJson(String message) async {
// TODO: Define a public method that can
// be used to send messages to the worker isolate.
}
}
// #enddocregion
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// ignore_for_file: unused_field

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

class Worker {
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

void _handleResponsesFromIsolate(dynamic message) {
// TODO: Define code that should be executed on the worker isolate.
}

// #docregion
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);

receivePort.listen((dynamic message) async {
final decoded = jsonDecode(message as String);
port.send(decoded);
});
}
// #enddocregion

Future<void> parseJson(String message) async {
// TODO: Define a public method that can
// be used to send messages to the worker isolate.
}
}
109 changes: 109 additions & 0 deletions examples/concurrency/lib/robust_ports_example/complete.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
final worker = await Worker.spawn();
print(await worker.parseJson('{"key":"value"}'));
print(await worker.parseJson('"banana"'));
print(await worker.parseJson('[true, false, null, 1, "string"]'));
print(
await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
worker.close();
}

// #docregion constructor
class Worker {
final SendPort _commands;
final ReceivePort _responses;
// #enddocregion constructor
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;

Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}

static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};

// Spawn the isolate.
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}

final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;

return Worker._(receivePort, sendPort);
}

Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}

void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id)!;

if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}

if (_closed && _activeRequests.isEmpty) _responses.close();
}

static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}

static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}

void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
48 changes: 48 additions & 0 deletions examples/concurrency/lib/robust_ports_example/spawn_1.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// ignore_for_file: unused_field, body_might_complete_normally_nullable, unused_element

import 'dart:async';
import 'dart:isolate';

// #docregion
class Worker {
final SendPort _commands;
final ReceivePort _responses;

static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler.
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// #enddocregion
throw UnimplementedError();
// #docregion
}
// #enddocregion

Future<Object?> parseJson(String message) async {
// TODO: Ensure the port is still open.
_commands.send(message);
}

Worker._(this._commands, this._responses) {
// TODO: Initialize main isolate receive port listener.
}

void _handleResponsesFromIsolate(dynamic message) {
// TODO: Handle messages sent back from the worker isolate.
}

static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
// TODO: Handle messages sent back from the worker isolate.
}

static void _startRemoteIsolate(SendPort sp) {
// TODO: Initialize worker isolate's ports.
}
}
Loading

0 comments on commit 87996d7

Please sign in to comment.