Skip to content

Commit

Permalink
Stable release and find Listeners with contravariant subtyping (#58)
Browse files Browse the repository at this point in the history
* Stable release and find Listeners with contravariant subtyping

Before Listener<SubClass> would also receive messages of type SupClass,
now it's inverted so the SupClass listeners would receive SubClass
messages and not other way around.

- bump minimum sdk to 3.0.0

- bump packages and remove unneded uuid package

- refactor MessageSinkRegister to handle contravariant message sinks
subtyping, add clear() for testing purposes

- change Listener _id type to _Contra<Message>

- refine Sender send() and getSend() to properly pass message type

- add tests

* Add CODEOWNERS file and remove 2.17 sdk from comms workflows

* Apply changes from CR
  • Loading branch information
lewandowski-jan authored Jul 20, 2023
1 parent b171159 commit e5002d8
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 47 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @iasiu
2 changes: 1 addition & 1 deletion .github/workflows/comms-prepare.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
sdk: [2.17.0, 3.0.0]
sdk: [3.0.0]

defaults:
run:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/comms-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: Install Dart
uses: dart-lang/setup-dart@v1
with:
sdk: 2.17.0
sdk: 3.0.0

- name: Install mobile-tools
uses: actions/checkout@v3
Expand Down
7 changes: 7 additions & 0 deletions packages/comms/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.0.0

- Introduce covariant listening by filtering `Listener`s contravariantly (#58)
- Before `Listener<SubClass>` would also receive `<SupClass>` messages which
would throw in `onMessage()` forcing you to only ever listen to `<SupClass>`,
now you can safely use subtyping of messages.

## 0.0.11

- Fix new dart compatibility issue
Expand Down
1 change: 0 additions & 1 deletion packages/comms/lib/comms.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import 'dart:async';

import 'package:logging/logging.dart';
import 'package:meta/meta.dart' show nonVirtual, protected, visibleForTesting;
import 'package:uuid/uuid.dart';

part 'src/listener.dart';
part 'src/message_sink_register.dart';
Expand Down
2 changes: 1 addition & 1 deletion packages/comms/lib/src/listener.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mixin Listener<Message> {
StreamSubscription<Message>? _messageSubscription;

/// Unique identifier of the [Listener]'s messageSink in [MessageSinkRegister].
String? _id;
_Contra<Message>? _id;

/// Starts message receiving.
///
Expand Down
67 changes: 42 additions & 25 deletions packages/comms/lib/src/message_sink_register.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ part of '../comms.dart';

typedef LoggerCallback = void Function(String message);

typedef _Contra<T> = void Function(T);
_Contra<T> _makeContra<T>() => (_) {};

/// Allows communication between [Listener]s and [Sender]s of the same type,
/// without the need of them knowing about each other.
class MessageSinkRegister {
Expand Down Expand Up @@ -32,57 +35,61 @@ class MessageSinkRegister {
_logger.info(message);
}

/// Used to create unique id for each message sink added with [_add].
final _uuid = const Uuid();

/// All message sinks and their id's added with [_add].
final _messageSinks = <String, StreamSink<dynamic>>{};
final Map<_Contra<Never>, StreamSink<dynamic>> _messageSinks = {};
final List<_Contra<Never>> _messageSinkKeys = [];

/// All last messages sent with each type
final _messageBuffers = <Type, _BufferedMessage<dynamic>>{};
final Map<Type, _BufferedMessage<dynamic>> _messageBuffers = {};

/// Adds a [messageSink] to [MessageSinkRegister]'s [_messageSinks] with
/// unique id from [_uuid]
String _add<Message>(
/// Adds a [messageSink] to [MessageSinkRegister]'s [_messageSinks]
_Contra<Message> _add<Message>(
StreamSink<Message> messageSink, {
required OnMessage<Message> onInitialMessage,
}) {
final id = _uuid.v1();
_messageSinks[id] = messageSink;
_log('Added sink ${messageSink.runtimeType}');
final key = _makeContra<Message>();
_messageSinkKeys.add(key);
_messageSinks[key] = messageSink;

final bufferedMessage = _messageBuffers[Message];
_log('Added sink of type $Message');

final message = bufferedMessage?.message as Message?;
final bufferedMessage = _messageBuffers.values
.whereType<_BufferedMessage<Message>>()
.firstOrNull;

if (message != null) {
onInitialMessage(message);
if (bufferedMessage != null) {
onInitialMessage(bufferedMessage.message);

if (bufferedMessage?.oneOff ?? false) {
if (bufferedMessage.oneOff) {
_messageBuffers.remove(Message);
}
}

return id;
return key;
}

/// Removes messageSink with [id] from [MessageSinkRegister]'s [_messageSinks]
void _remove(String id) {
final sink = _messageSinks.remove(id);
/// Removes messageSink with [key] from [MessageSinkRegister]'s [_messageSinks]
void _remove(_Contra<Never> key) {
final sink = _messageSinks.remove(key)!;
_messageSinkKeys.remove(key);
sink.close();
_log('Removed sink ${sink.runtimeType}');
sink?.close();
}

/// Returns all sinks in [MessageSinkRegister]'s [_messageSinks] of type
/// [Message]
@visibleForTesting
List<StreamSink<Message>> getSinksOfType<Message>() {
final sinks = _messageSinks.values.whereType<StreamSink<Message>>();
if (sinks.isEmpty) {
List<StreamSink<dynamic>> getSinksOfType<Message>() {
final messageSinks = _messageSinkKeys
.whereType<_Contra<Message>>()
.map((key) => _messageSinks[key]!)
.toList();

if (messageSinks.isEmpty) {
_log('Found no sinks of type $Message');
}

return sinks.toList();
return messageSinks.toList();
}

/// Adds [message] to all sinks in [MessageSinkRegister]'s [_messageSinks]
Expand All @@ -102,6 +109,16 @@ class MessageSinkRegister {
oneOff: oneOff,
);
}

@visibleForTesting
void clear() {
_messageBuffers.clear();
_messageSinkKeys.clear();
for (final sink in _messageSinks.values) {
sink.close();
}
_messageSinks.clear();
}
}

class _BufferedMessage<Message> {
Expand Down
29 changes: 19 additions & 10 deletions packages/comms/lib/src/sender.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
part of '../comms.dart';

/// Signature for functions sending message to [Listener]s listening for type
/// [Message].
typedef Send<Message> = void Function(Message message, {bool oneOff});

/// A mixin used on classes that want to send messages of type [Message], by
/// providing [send] function.
///
Expand All @@ -19,13 +15,26 @@ mixin Sender<Message> {
/// `onInitialMessage()`.
@protected
@nonVirtual
void send(Message message, {bool oneOff = false}) {
MessageSinkRegister().sendToSinksOfType<Message>(message, oneOff: oneOff);
void send<T extends Message>(T message, {bool oneOff = false}) {
MessageSinkRegister().sendToSinksOfType<T>(message, oneOff: oneOff);
}
}

/// Returns function to send messages to all [Listener]s of type [Message],
/// without the need of instatiating class with [Sender] mixin.
Send<Message> getSend<Message>() {
return MessageSinkRegister().sendToSinksOfType<Message>;
class SenderFunctor<Message> {
void call<T extends Message>(T message, {bool oneOff = false}) {
MessageSinkRegister().sendToSinksOfType<T>(message, oneOff: oneOff);
}
}

/// Returns an object that can be called like a function (a functor) which sends
/// messages to all [Listener]s of type [Message], without the need of
/// instatiating a class with [Sender] mixin.
///
/// Example usage:
/// ```dart
/// final send = getSend<SomeType>();
/// send(SomeType());
/// ```
SenderFunctor<Message> getSend<Message>() {
return SenderFunctor<Message>();
}
13 changes: 6 additions & 7 deletions packages/comms/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
name: comms
description: Simple communication pattern abstraction on streams, created for communication between logic classes.
version: 0.0.11
version: 1.0.0
homepage: https://github.com/leancodepl/comms

environment:
sdk: ">=2.17.0 <3.0.0"
sdk: ">=3.0.0 <4.0.0"

dependencies:
logging: ^1.0.2
meta: ^1.7.0
uuid: ^3.0.6
logging: ^1.2.0
meta: ^1.9.1

dev_dependencies:
leancode_lint: ^3.0.0
test: ^1.16.4
leancode_lint: ^4.0.0+1
test: ^1.17.5
81 changes: 81 additions & 0 deletions packages/comms/test/comms_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ int numberOfProductCountMessageSink() =>
MessageSinkRegister().getSinksOfType<ProductCountChangedMessage>().length;

void main() {
setUp(() => MessageSinkRegister().clear());

test(
'ProductCount message sink is added to register after it calls listen in constructor',
() {
Expand Down Expand Up @@ -70,4 +72,83 @@ void main() {
productCount.dispose();
},
);

test(
'ProductCountIncrementedListener listens only for ProductCountIncremented',
() async {
final basket = IncrementingBasket();
final productCount = ProductCountIncrementedListener();

basket
..add('Jeans')
..add('T-shirt');

await Future<void>.delayed(Duration.zero);

expect(productCount.value, 2);

getSend<ProductCountDecremented>()(ProductCountDecremented());
await Future<void>.delayed(Duration.zero);

expect(productCount.value, 2);

getSend<ProductCountChangedMessage>()(ProductCountIncremented());
await Future<void>.delayed(Duration.zero);

expect(productCount.value, 3);

getSend<ProductCountChangedMessage>()(ProductCountChangedMessage());
await Future<void>.delayed(Duration.zero);

expect(productCount.value, 3);

productCount.dispose();
},
);

test(
'ProductCountIncrementedListener receives buffered message of exact same type',
() async {
final basket = IncrementingBasket()..add('Product');
final productCount = ProductCountIncrementedListener();

basket
..add('Jeans')
..add('T-shirt');

await Future<void>.delayed(Duration.zero);

expect(productCount.value, 3);

productCount.dispose();
},
);

test(
'ProductCount receives buffered message of sub type',
() async {
getSend<ProductCountIncremented>()(ProductCountIncremented());
final productCount = ProductCount();

await Future<void>.delayed(Duration.zero);

expect(productCount.value, 1);

productCount.dispose();
},
);

test(
'ProductCountIncrementedListener does not receive buffered message of super type',
() async {
getSend<ProductCountChangedMessage>()(ProductCountChangedMessage());
final productCount = ProductCountIncrementedListener();

await Future<void>.delayed(Duration.zero);

expect(productCount.value, 0);

productCount.dispose();
},
);
}
24 changes: 23 additions & 1 deletion packages/comms/test/listeners/product_count.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,29 @@ class ProductCount with Listener<ProductCountChangedMessage> {
}
}

abstract class ProductCountChangedMessage {}
class ProductCountIncrementedListener with Listener<ProductCountIncremented> {
ProductCountIncrementedListener() {
listen();
}

int value = 0;

@override
void onMessage(ProductCountIncremented message) {
value += 1;
}

@override
void onInitialMessage(ProductCountIncremented message) {
onMessage(message);
}

void dispose() {
cancel();
}
}

class ProductCountChangedMessage {}

class ProductCountIncremented extends ProductCountChangedMessage {}

Expand Down
9 changes: 9 additions & 0 deletions packages/comms/test/senders/basket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,12 @@ class Basket with Sender<ProductCountChangedMessage> {
send(ProductCountDecremented());
}
}

class IncrementingBasket with Sender<ProductCountChangedMessage> {
List<String> products = [];

void add(String product) {
products.add(product);
send(ProductCountIncremented());
}
}

0 comments on commit e5002d8

Please sign in to comment.