Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Prototype of Context Receivers /JVM #76

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ kotlin {
compilations.all {
kotlinOptions.jvmTarget = "1.8"
kotlinOptions.verbose = true
kotlinOptions.freeCompilerArgs = kotlinOptions.freeCompilerArgs + "-Xcontext-receivers"
}

withJava()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,137 +16,138 @@

package com.fraktalio.fmodel.application

import arrow.core.continuations.Effect
import arrow.core.continuations.effect
import arrow.core.Either
import arrow.core.raise.either
import com.fraktalio.fmodel.application.Error.CommandHandlingFailed
import com.fraktalio.fmodel.application.Error.CommandPublishingFailed
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*

/**
* Extension function - Handles the command message of type [C]
*
* @param command Command message of type [C]
* @return [Flow] of [Effect] (either [Error] or Events of type [E])
* @return [Flow] of [Either] (either [Error] or Events of type [E])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(command: C): Flow<Effect<Error, E>> =
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(command: C): Flow<Either<Error, E>> =
command
.fetchEvents()
.computeNewEvents(command)
.save()
.map { effect<Error, E> { it } }
.catch { emit(effect { shift(CommandHandlingFailed(command)) }) }
.map { either <Error, E> { it } }
.catch { emit(either { raise(CommandHandlingFailed(command)) }) }

/**
* Extension function - Handles the command message of type [C]
*
* @param command Command message of type [C]
* @return [Flow] of [Effect] (either [Error] or Events of type [E])
* @return [Flow] of [Either] (either [Error] or Events of type [E])
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, S, E> EventSourcingOrchestratingAggregate<C, S, E>.handleWithEffect(command: C): Flow<Effect<Error, E>> =
fun <C, S, E> EventSourcingOrchestratingAggregate<C, S, E>.handleWithEffect(command: C): Flow<Either<Error, E>> =
command
.fetchEvents()
.computeNewEventsByOrchestrating(command) { it.fetchEvents() }
.save()
.map { effect<Error, E> { it } }
.catch { emit(effect { shift(CommandHandlingFailed(command)) }) }
.map { either<Error, E> { it } }
.catch { emit(either { raise(CommandHandlingFailed(command)) }) }

/**
* Extension function - Handles the command message of type [C]
*
* @param command Command message of type [C]
* @return [Flow] of [Effect] (either [Error] or Events of type [Pair]<[E], [V]>)
* @return [Flow] of [Either] (either [Error] or Events of type [Pair]<[E], [V]>)
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, S, E, V> EventSourcingLockingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(command: C): Flow<Effect<Error, Pair<E, V>>> =
fun <C, S, E, V> EventSourcingLockingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(command: C): Flow<Either<Error, Pair<E, V>>> =
flow {
val events = command.fetchEvents()
emitAll(
events.map { it.first }
.computeNewEvents(command)
.save(events.lastOrNull())
.map { effect<Error, Pair<E, V>> { it } }
.catch { emit(effect { shift(CommandHandlingFailed(command)) }) }
.map { either<Error, Pair<E, V>> { it } }
.catch { emit(either { raise(CommandHandlingFailed(command)) }) }
)
}

/**
* Extension function - Handles the command message of type [C]
*
* @param command Command message of type [C]
* @return [Flow] of [Effect] (either [Error] or Events of type [Pair]<[E], [V]>)
* @return [Flow] of [Either] (either [Error] or Events of type [Pair]<[E], [V]>)
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
@FlowPreview
fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(command: C): Flow<Effect<Error, Pair<E, V>>> =
fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(command: C): Flow<Either<Error, Pair<E, V>>> =
command
.fetchEvents().map { it.first }
.computeNewEventsByOrchestrating(command) { it.fetchEvents().map { pair -> pair.first } }
.save(latestVersionProvider)
.map { effect<Error, Pair<E, V>> { it } }
.catch { emit(effect { shift(CommandHandlingFailed(command)) }) }
.map { either<Error, Pair<E, V>> { it } }
.catch { emit(either { raise(CommandHandlingFailed(command)) }) }


@FlowPreview
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Effect<Error, E>> =
fun <C, S, E> EventSourcingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Either<Error, E>> =
commands
.flatMapConcat { handleWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(either { raise(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, S, E> EventSourcingOrchestratingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Effect<Error, E>> =
fun <C, S, E> EventSourcingOrchestratingAggregate<C, S, E>.handleWithEffect(commands: Flow<C>): Flow<Either<Error, E>> =
commands
.flatMapConcat { handleWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(either { raise(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, S, E, V> EventSourcingLockingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(commands: Flow<C>): Flow<Effect<Error, Pair<E, V>>> =
fun <C, S, E, V> EventSourcingLockingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(commands: Flow<C>): Flow<Either<Error, Pair<E, V>>> =
commands
.flatMapConcat { handleOptimisticallyWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(either { raise(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(commands: Flow<C>): Flow<Effect<Error, Pair<E, V>>> =
fun <C, S, E, V> EventSourcingLockingOrchestratingAggregate<C, S, E, V>.handleOptimisticallyWithEffect(commands: Flow<C>): Flow<Either<Error, Pair<E, V>>> =
commands
.flatMapConcat { handleOptimisticallyWithEffect(it) }
.catch { emit(effect { shift(CommandHandlingFailed(it)) }) }
.catch { emit(either { raise(CommandPublishingFailed(it)) }) }

@FlowPreview
fun <C, E> C.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Effect<Error, E>> =
fun <C, E> C.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Either<Error, E>> =
aggregate.handleWithEffect(this)

@FlowPreview
fun <C, E> C.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate<C, *, E>): Flow<Effect<Error, E>> =
fun <C, E> C.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate<C, *, E>): Flow<Either<Error, E>> =
aggregate.handleWithEffect(this)

@FlowPreview
fun <C, E, V> C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate<C, *, E, V>): Flow<Effect<Error, Pair<E, V>>> =
fun <C, E, V> C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate<C, *, E, V>): Flow<Either<Error, Pair<E, V>>> =
aggregate.handleOptimisticallyWithEffect(this)

@FlowPreview
fun <C, E, V> C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate<C, *, E, V>): Flow<Effect<Error, Pair<E, V>>> =
fun <C, E, V> C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate<C, *, E, V>): Flow<Either<Error, Pair<E, V>>> =
aggregate.handleOptimisticallyWithEffect(this)

@FlowPreview
fun <C, E> Flow<C>.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Effect<Error, E>> =
fun <C, E> Flow<C>.publishWithEffect(aggregate: EventSourcingAggregate<C, *, E>): Flow<Either<Error, E>> =
aggregate.handleWithEffect(this)

@FlowPreview
fun <C, E> Flow<C>.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate<C, *, E>): Flow<Effect<Error, E>> =
fun <C, E> Flow<C>.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate<C, *, E>): Flow<Either<Error, E>> =
aggregate.handleWithEffect(this)

@FlowPreview
fun <C, E, V> Flow<C>.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate<C, *, E, V>): Flow<Effect<Error, Pair<E, V>>> =
fun <C, E, V> Flow<C>.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate<C, *, E, V>): Flow<Either<Error, Pair<E, V>>> =
aggregate.handleOptimisticallyWithEffect(this)

@FlowPreview
fun <C, E, V> Flow<C>.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate<C, *, E, V>): Flow<Effect<Error, Pair<E, V>>> =
fun <C, E, V> Flow<C>.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate<C, *, E, V>): Flow<Either<Error, Pair<E, V>>> =
aggregate.handleOptimisticallyWithEffect(this)
Loading