Skip to content

Commit

Permalink
Rework subscribe methods as extensions (#9)
Browse files Browse the repository at this point in the history
* Add `EventHandler.factory`

* Remove and replace factory based functions with extension method

* Make the built-in handlers immutable

* Update example with `subscribeFactory` usage

* Add factory based request handler

* Remove and replace factory based functions with extension method

* Make test data immutable

* Rework pipeline behavior to use extensions for registration

* add more request integration tests

* Add more event integration tests

* Add more choregraphy integration tests

* Update comment
  • Loading branch information
Matthiee authored May 12, 2024
1 parent 8bab4ee commit 7bcefed
Show file tree
Hide file tree
Showing 24 changed files with 730 additions and 354 deletions.
68 changes: 53 additions & 15 deletions example/example.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';

import 'package:dart_mediator/mediator.dart';
import 'package:meta/meta.dart';

Future<void> main() async {
final mediator = Mediator.create(
Expand All @@ -15,17 +16,21 @@ Future<void> main() async {
mediator.requests.register(MyCommandHandler());

// Subscribe to the count event.
mediator.events.on<CountEvent>().subscribeFactory(createCountEventHandler);

mediator.events
.on<CountEvent>()
.map((event) => event.count)
.distinct()
.map((event) => event.count)
.subscribeFunction(
(count) => print('[CountEvent handler] received count: $count'),
);
(count) {
// Only distinct count events will get to this point.
// LoggingEventObserver will still see the event.
print('[CountEvent Handler] received distinct count: $count');
},
);

mediator.events.on<CountEvent>().subscribeFunction(
(count) => print('[Other Event Handler] received: $count'),
);
print('--- Query Example ---');

const getUserQuery = GetUserByIdQuery(123);

Expand All @@ -35,35 +40,59 @@ Future<void> main() async {

print('Got $getUserQuery response: $resp');

print('---');
print('\n--- Command Example ---');

const order66Command = MyCommand('Order 66');

print('Sending command $order66Command');
print('Sending $order66Command');

await mediator.requests.send(order66Command);

print('Command $order66Command completed');
print('$order66Command completed');

print('---');
print('\n--- Events Example ---');

const countEvent = CountEvent(123);

// Event will be handled by 2 event handlers.
await mediator.events.dispatch(countEvent);

// Event will only be handled by 1 event handler (distinct).
await mediator.events.dispatch(countEvent);

print('done');
}

@immutable
class CountEvent implements DomainEvent {
final int count;
const CountEvent(this.count);

@override
String toString() => 'CountEvent(count: $count)';

@override
int get hashCode => Object.hash(runtimeType, count);

@override
bool operator ==(Object other) {
return identical(this, other) ||
(other.runtimeType == runtimeType &&
other is CountEvent &&
other.count == count);
}
}

class CountEventHandler implements EventHandler<CountEvent> {
@override
FutureOr<void> handle(CountEvent event) {
final count = event.count;
print('[CountEvent Handler] received count: $count');
}
}

CountEventHandler createCountEventHandler() => CountEventHandler();

class MyCommand implements Command {
final String command;
const MyCommand(this.command);
Expand All @@ -75,9 +104,16 @@ class MyCommand implements Command {
class MyCommandHandler implements CommandHandler<MyCommand> {
@override
Future<void> handle(MyCommand request) async {
print('[MyCommandHandler] Executing "$request"');
await Future.delayed(const Duration(milliseconds: 500));
print('[MyCommandHandler] "$request" completed');
final command = request.command;
print('[MyCommandHandler] Execute $command');
{
await Future.delayed(const Duration(milliseconds: 300));
for (var i = 0; i < 3; i++) {
print('[MyCommandHandler] pew');
await Future.delayed(const Duration(milliseconds: 300));
}
}
print('[MyCommandHandler] $request executed!');
}
}

Expand All @@ -92,7 +128,7 @@ class GetUserByIdQuery implements Query<User> {
class GetUserByIdQueryHandler implements QueryHandler<User, GetUserByIdQuery> {
@override
Future<User> handle(GetUserByIdQuery request) async {
print('[GetUserByIdQueryHandler] handeling $request');
print('[GetUserByIdQueryHandler] handling $request');
final user = await getUserByIdAsync(request.userId);
print('[GetUserByIdQueryHandler] got $user');
return user;
Expand Down Expand Up @@ -136,7 +172,9 @@ class LoggingEventObserver implements EventObserver {
void onHandled<TEvent extends DomainEvent>(
TEvent event,
EventHandler<TEvent> handler,
) {}
) {
print('[LoggingEventObserver] onHandled $event handled by $handler');
}
}

class User {
Expand Down
50 changes: 49 additions & 1 deletion lib/src/event/handler/event_handler.dart
Original file line number Diff line number Diff line change
@@ -1,24 +1,72 @@
import 'dart:async';

import 'package:meta/meta.dart';

/// Factory to create a [EventHandler].
typedef EventHandlerFactory<TEvent> = EventHandler<TEvent> Function();

/// Handler for [TEvent].
abstract interface class EventHandler<TEvent> {
/// Function based event handler
/// Function based [EventHandler].
///
/// Each event the underlying [handler] will be executed.
const factory EventHandler.function(
FutureOr<void> Function(TEvent event) handler,
) = _FunctionEventHandler;

/// Factory based [EventHandler]
///
/// Each event the underlying [factory] will be instantiated and used
/// to handle the [TEvent].
const factory EventHandler.factory(
EventHandlerFactory<TEvent> factory,
) = _FactoryEventHandler;

/// Handles the given [event].
FutureOr<void> handle(TEvent event);
}

@immutable
class _FunctionEventHandler<T> implements EventHandler<T> {
final FutureOr<void> Function(T event) handler;

const _FunctionEventHandler(this.handler);

@override
FutureOr<void> handle(T event) => handler(event);

@override
int get hashCode => Object.hash(runtimeType, handler);

@override
bool operator ==(Object other) {
return identical(this, other) ||
(other.runtimeType == runtimeType &&
other is _FunctionEventHandler<T> &&
other.handler == handler);
}
}

@immutable
class _FactoryEventHandler<T> implements EventHandler<T> {
final EventHandlerFactory<T> factory;

const _FactoryEventHandler(this.factory);

@override
FutureOr<void> handle(T event) {
final handler = factory();
return handler.handle(event);
}

@override
int get hashCode => Object.hash(runtimeType, factory);

@override
bool operator ==(Object other) {
return identical(this, other) ||
(other.runtimeType == runtimeType &&
other is _FactoryEventHandler<T> &&
other.factory == factory);
}
}
43 changes: 3 additions & 40 deletions lib/src/event/handler/event_handler_store.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import 'dart:collection';

import 'package:dart_mediator/src/event/handler/event_handler.dart';

class EventHandlerStore {
final _handlers = <Type, Set<EventHandler>>{};
final _handlerFactories = <Type, Set<EventHandlerFactory>>{};

/// Registers the [handler] to a given [TEvent].
void register<TEvent>(EventHandler<TEvent> handler) {
Expand All @@ -16,18 +17,6 @@ class EventHandlerStore {
handlers.add(handler);
}

/// Registers the [factory] to a given [TEvent].
void registerFactory<TEvent>(EventHandlerFactory<TEvent> factory) {
final factories = _getHandlerFactoriesFor<TEvent>();

assert(
!factories.contains(factory),
'registerFactory<$TEvent> was called with an already registered factory',
);

factories.add(factory);
}

/// Unregisters the given [handler].
void unregister<TEvent>(EventHandler<TEvent> handler) {
final handlers = _getHandlersFor<TEvent>();
Expand All @@ -40,28 +29,11 @@ class EventHandlerStore {
handlers.remove(handler);
}

/// Unregisters the given [factory].
void unregisterFactory<TEvent>(EventHandlerFactory<TEvent> factory) {
final factories = _getHandlerFactoriesFor<TEvent>();

assert(
factories.contains(factory),
'unregisterFactory<$TEvent> was called for a factory that was never registered',
);

factories.remove(factory);
}

/// Returns all registered [EventHandler]'s for [TEvent].
Set<EventHandler<TEvent>> getHandlersFor<TEvent>() {
final handlers = _getHandlersFor<TEvent>();
final factories =
_getHandlerFactoriesFor<TEvent>().map((factory) => factory());

return {
...handlers,
...factories,
};
return UnmodifiableSetView(handlers);
}

Set<EventHandler<TEvent>> _getHandlersFor<TEvent>() {
Expand All @@ -72,13 +44,4 @@ class EventHandlerStore {

return handlers;
}

Set<EventHandlerFactory<TEvent>> _getHandlerFactoriesFor<TEvent>() {
final factories = _handlerFactories.putIfAbsent(
TEvent,
() => <EventHandlerFactory<TEvent>>{},
) as Set<EventHandlerFactory<TEvent>>;

return factories;
}
}
40 changes: 13 additions & 27 deletions lib/src/event/subscription_builder/event_subscription_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,9 @@ abstract class EventSubscriptionBuilder<T> {
/// This finalizes the builder and applies all the steps
/// before subscribing.
EventSubscription subscribe(EventHandler<T> handler);

/// Subscribes to the given [factory].
///
/// This finalizes the builder and applies all the steps
/// before subscribing.
EventSubscription subscribeFactory(EventHandlerFactory<T> factory);
}

extension EventSubscriptionBuilderFunctionExtension<T>
on EventSubscriptionBuilder<T> {
extension EventSubscriptionBuilderExtensions<T> on EventSubscriptionBuilder<T> {
/// Subscribes to the given [handler].
///
/// This finalizes the builder and applies all the steps
Expand All @@ -124,6 +117,18 @@ extension EventSubscriptionBuilderFunctionExtension<T>
) {
return subscribe(EventHandler.function(handler));
}

/// Subscribes to the given [factory].
///
/// This finalizes the builder and applies all the steps
/// before subscribing.
///
/// This factory will be resolved into an actual [EventHandler] at request time.
EventSubscription subscribeFactory(
EventHandlerFactory<T> factory,
) {
return subscribe(EventHandler.factory(factory));
}
}

class _EventSubscriptionBuilder<T> extends EventSubscriptionBuilder<T> {
Expand All @@ -141,17 +146,6 @@ class _EventSubscriptionBuilder<T> extends EventSubscriptionBuilder<T> {

return subscription;
}

@override
EventSubscription subscribeFactory(EventHandlerFactory<T> factory) {
final subscription = EventSubscription(
() => _store.unregisterFactory(factory),
);

_store.registerFactory(factory);

return subscription;
}
}

/// Base for implementing custom [EventSubscriptionBuilder].
Expand All @@ -177,12 +171,4 @@ abstract class BaseEventSubscriptionBuilder<TInput, TOutput>
EventSubscription subscribe(EventHandler<TOutput> handler) {
return parent.subscribe(createHandler(handler));
}

@override
EventSubscription subscribeFactory(EventHandlerFactory<TOutput> factory) {
return parent.subscribeFactory(() {
final handler = factory();
return createHandler(handler);
});
}
}
Loading

0 comments on commit 7bcefed

Please sign in to comment.