diff --git a/arrow-libs/fx/arrow-collectors/api/arrow-collectors.api b/arrow-libs/fx/arrow-collectors/api/arrow-collectors.api new file mode 100644 index 00000000000..3ede61b58be --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/api/arrow-collectors.api @@ -0,0 +1,116 @@ +public final class arrow/collectors/Characteristics : java/lang/Enum { + public static final field CONCURRENT Larrow/collectors/Characteristics; + public static final field Companion Larrow/collectors/Characteristics$Companion; + public static final field IDENTITY_FINISH Larrow/collectors/Characteristics; + public static final field UNORDERED Larrow/collectors/Characteristics; + public static fun getEntries ()Lkotlin/enums/EnumEntries; + public static fun valueOf (Ljava/lang/String;)Larrow/collectors/Characteristics; + public static fun values ()[Larrow/collectors/Characteristics; +} + +public final class arrow/collectors/Characteristics$Companion { + public final fun getCONCURRENT_UNORDERED ()Ljava/util/Set; + public final fun getIDENTITY ()Ljava/util/Set; + public final fun getIDENTITY_CONCURRENT ()Ljava/util/Set; + public final fun getIDENTITY_CONCURRENT_UNORDERED ()Ljava/util/Set; + public final fun getIDENTITY_UNORDERED ()Ljava/util/Set; +} + +public final class arrow/collectors/CollectKt { + public static final fun collect (Ljava/lang/Iterable;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object; + public static final fun collect (Ljava/util/Iterator;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object; + public static final fun collect (Lkotlin/sequences/Sequence;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object; + public static final fun collect (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun collect$default (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun parCollect (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun parCollect (Lkotlin/sequences/Sequence;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun parCollect$default (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun parCollect$default (Lkotlin/sequences/Sequence;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + +public abstract interface class arrow/collectors/CollectorI { + public static final field Companion Larrow/collectors/CollectorI$Companion; + public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun contramap (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI; + public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun getCharacteristics ()Ljava/util/Set; + public abstract fun map (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI; + public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun zip (Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI; + public abstract fun zip (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI; +} + +public final class arrow/collectors/CollectorI$Companion { + public final fun nonSuspendOf (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Ljava/util/Set;)Larrow/collectors/NonSuspendCollectorI; + public static synthetic fun nonSuspendOf$default (Larrow/collectors/CollectorI$Companion;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Ljava/util/Set;ILjava/lang/Object;)Larrow/collectors/NonSuspendCollectorI; + public final fun of (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Ljava/util/Set;)Larrow/collectors/CollectorI; + public static synthetic fun of$default (Larrow/collectors/CollectorI$Companion;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Ljava/util/Set;ILjava/lang/Object;)Larrow/collectors/CollectorI; +} + +public final class arrow/collectors/CollectorI$DefaultImpls { + public static fun contramap (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI; + public static fun map (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI; + public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI; + public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI; +} + +public final class arrow/collectors/Collectors { + public static final field INSTANCE Larrow/collectors/Collectors; + public final fun bestBy (Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI; + public final fun constant (Ljava/lang/Object;)Larrow/collectors/NonSuspendCollectorI; + public final fun getLength ()Larrow/collectors/NonSuspendCollectorI; + public final fun getSum ()Larrow/collectors/NonSuspendCollectorI; + public final fun intReducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI; + public final fun list ()Larrow/collectors/NonSuspendCollectorI; + public final fun map ()Larrow/collectors/NonSuspendCollectorI; + public final fun mapFromEntries ()Larrow/collectors/NonSuspendCollectorI; + public final fun reducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Z)Larrow/collectors/NonSuspendCollectorI; + public static synthetic fun reducer$default (Larrow/collectors/Collectors;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ZILjava/lang/Object;)Larrow/collectors/NonSuspendCollectorI; + public final fun set ()Larrow/collectors/NonSuspendCollectorI; +} + +public final class arrow/collectors/CollectorsKt { + public static final fun concurrentMap (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI; + public static final fun concurrentMapFromEntries (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI; + public static final fun concurrentSet (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI; +} + +public final class arrow/collectors/JvmCollectorKt { + public static final fun asCollector (Ljava/util/stream/Collector;)Larrow/collectors/NonSuspendCollectorI; + public static final fun jvm (Larrow/collectors/Collectors;Ljava/util/stream/Collector;)Larrow/collectors/NonSuspendCollectorI; +} + +public abstract interface class arrow/collectors/NonSuspendCollectorI : arrow/collectors/CollectorI { + public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun accumulateNonSuspend (Ljava/lang/Object;Ljava/lang/Object;)V + public abstract fun contramapNonSuspend (Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI; + public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun finishNonSuspend (Ljava/lang/Object;)Ljava/lang/Object; + public abstract fun mapNonSuspend (Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI; + public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun supplyNonSuspend ()Ljava/lang/Object; + public abstract fun zip (Larrow/collectors/NonSuspendCollectorI;)Larrow/collectors/NonSuspendCollectorI; + public abstract fun zipNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI; +} + +public final class arrow/collectors/NonSuspendCollectorI$DefaultImpls { + public static fun accumulate (Larrow/collectors/NonSuspendCollectorI;Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun contramap (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI; + public static fun contramapNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI; + public static fun finish (Larrow/collectors/NonSuspendCollectorI;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun map (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI; + public static fun mapNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI; + public static fun supply (Larrow/collectors/NonSuspendCollectorI;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI; + public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI; + public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;)Larrow/collectors/NonSuspendCollectorI; + public static fun zipNonSuspend (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI; +} + +public final class arrow/collectors/ZipKt { + public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function4;)Larrow/collectors/CollectorI; + public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI; + public static final fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/NonSuspendCollectorI; + public static final fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI; +} + diff --git a/arrow-libs/fx/arrow-collectors/build.gradle.kts b/arrow-libs/fx/arrow-collectors/build.gradle.kts new file mode 100644 index 00000000000..101fcda02ed --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/build.gradle.kts @@ -0,0 +1,65 @@ +@file:Suppress("DSL_SCOPE_VIOLATION") + +import java.time.Duration + +plugins { + id(libs.plugins.kotlin.multiplatform.get().pluginId) + alias(libs.plugins.arrowGradleConfig.kotlin) + alias(libs.plugins.arrowGradleConfig.publish) + alias(libs.plugins.spotless) + alias(libs.plugins.kotlinx.kover) +} + +apply(from = property("ANIMALSNIFFER_MPP")) + +kotlin { + sourceSets { + commonMain { + dependencies { + api(projects.arrowFxCoroutines) + api(projects.arrowAtomic) + api(libs.coroutines.core) + implementation(libs.kotlin.stdlibCommon) + } + } + + commonTest { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotest.frameworkEngine) + implementation(libs.kotest.assertionsCore) + implementation(libs.kotest.property) + } + } + } + + jvm { + tasks.jvmJar { + manifest { + attributes["Automatic-Module-Name"] = "arrow.collectors" + } + } + } + + js { + nodejs { + testTask { + useMocha { + timeout = "60s" + } + } + } + browser { + testTask { + useKarma { + useChromeHeadless() + timeout.set(Duration.ofMinutes(1)) + } + } + } + } +} + +tasks.withType { + useJUnitPlatform() +} diff --git a/arrow-libs/fx/arrow-collectors/gradle.properties b/arrow-libs/fx/arrow-collectors/gradle.properties new file mode 100644 index 00000000000..204dd6f3804 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/gradle.properties @@ -0,0 +1,4 @@ +# Maven publishing configuration +pom.name=Arrow Collectors +# Build configuration +kapt.incremental.apt=false diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collect.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collect.kt new file mode 100644 index 00000000000..2c4ea47f2e6 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collect.kt @@ -0,0 +1,148 @@ +package arrow.collectors + +import arrow.fx.coroutines.parMap +import arrow.fx.coroutines.parMapUnordered +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.DEFAULT_CONCURRENCY +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onStart + +/** + * Performs collection over the elements of [this]. + * The amount of concurrency depends on the + * [Characteristics] of [collector], and can + * be tweaked using the [concurrency] parameter. + * + * [this] is consumed only once during collection. + * We recommend using a cold [Flow] to ensure that + * elements are produced only when needed. + * + * @receiver [Flow] of elements to collect + * @param collector Describes how to collect the values + * @param concurrency Defines the concurrency limit + */ +@OptIn(FlowPreview::class) +public suspend fun Flow.collect( + collector: Collector, + concurrency: Int = DEFAULT_CONCURRENCY, +): R = collectI(collector, concurrency) + +/** + * Performs collection over the elements of [this]. + * The amount of concurrency depends on the + * [Characteristics] of [collector], and can + * be tweaked using the [concurrency] parameter. + * + * [this] is iterated only once during collection. + * + * Note: if you need to perform changes on the values + * before collection, we strongly recommend to convert + * the [Iterable] into a [Flow] using [asFlow], + * perform those changes, and then using [collect]. + * + * @receiver Sequence of values to collect + * @param collector Describes how to collect the values + * @param concurrency Defines the concurrency limit + */ +@OptIn(FlowPreview::class) +public suspend fun Iterable.parCollect( + collector: Collector, + concurrency: Int = DEFAULT_CONCURRENCY, +): R = asFlow().collect(collector, concurrency) + +/** + * Performs collection over the elements of [this]. + * The amount of concurrency depends on the + * [Characteristics] of [collector], and can + * be tweaked using the [concurrency] parameter. + * + * [this] is iterated only once during collection. + * + * Note: if you need to perform changes on the values + * before collection, we strongly recommend to convert + * the [Sequence] into a [Flow] using [asFlow], + * perform those changes, and then using [collect]. + * + * @receiver Sequence of values to collect + * @param collector Describes how to collect the values + * @param concurrency Defines the concurrency limit + */ +@OptIn(FlowPreview::class) +public suspend fun Sequence.parCollect( + collector: Collector, + concurrency: Int = DEFAULT_CONCURRENCY, +): R = asFlow().collect(collector, concurrency) + +/** + * Performs collection over the elements of [this] + * in a non-concurrent fashion. + * + * [this] is iterated only once during collection. + * + * @receiver Sequence of values to collect + * @param collector Describes how to collect the values + */ +public fun Iterable.collect( + collector: NonSuspendCollector +): R = iterator().collectI(collector) + +/** + * Performs collection over the elements of [this] + * in a non-concurrent fashion. + * + * [this] is iterated only once during collection. + * + * @receiver Sequence of values to collect + * @param collector Describes how to collect the values + */ +public fun Sequence.collect( + collector: NonSuspendCollector +): R = iterator().collectI(collector) + +public fun Iterator.collect( + collector: NonSuspendCollector +): R = collectI(collector) + +@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) +@Suppress("UNINITIALIZED_VARIABLE", "UNCHECKED_CAST") +internal suspend fun Flow.collectI( + collector: CollectorI, + concurrency: Int = DEFAULT_CONCURRENCY, +): R { + var accumulator: A + val started = this.onStart { accumulator = collector.supply() } + val continued = when { + Characteristics.CONCURRENT in collector.characteristics -> when { + Characteristics.UNORDERED in collector.characteristics -> + started.parMapUnordered(concurrency) { collector.accumulate(accumulator, it) } + + else -> + started.parMap(concurrency) { collector.accumulate(accumulator, it) } + } + + else -> started.map { collector.accumulate(accumulator, it) } + } + continued.collect() + + return when { + Characteristics.IDENTITY_FINISH in collector.characteristics -> accumulator as R + else -> collector.finish(accumulator) + } +} + +@Suppress("UNCHECKED_CAST") +internal fun Iterator.collectI( + collector: NonSuspendCollectorI +): R { + val accumulator = collector.supplyNonSuspend() + forEach { collector.accumulateNonSuspend(accumulator, it) } + return when { + Characteristics.IDENTITY_FINISH in collector.characteristics -> accumulator as R + else -> collector.finishNonSuspend(accumulator) + } +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt new file mode 100644 index 00000000000..9d5038c2623 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collector.kt @@ -0,0 +1,237 @@ +package arrow.collectors + +/** + * Defines how the [collect] function may run the collector. + */ +public enum class Characteristics { + /** + * The flow may be collected using several workers. + */ + CONCURRENT, + + /** + * The final step of the collector simply returns the accumulator. + * That means that final call may be replaced by a cast. + */ + IDENTITY_FINISH, + + /** + * The order in which elements are accumulated doesn't matter for the end result. + */ + UNORDERED; + + public companion object { + public val IDENTITY: Set = setOf(IDENTITY_FINISH) + public val IDENTITY_UNORDERED: Set = setOf(IDENTITY_FINISH, UNORDERED) + public val CONCURRENT_UNORDERED: Set = setOf(CONCURRENT, UNORDERED) + public val IDENTITY_CONCURRENT: Set = setOf(IDENTITY_FINISH, CONCURRENT) + public val IDENTITY_CONCURRENT_UNORDERED: Set = setOf(IDENTITY_FINISH, CONCURRENT, UNORDERED) + } +} + +public typealias Collector = CollectorI<*, Value, Result> +public typealias NonSuspendCollector = NonSuspendCollectorI<*, Value, Result> + +/** + * A [Collector] accumulates information from elements + * coming from some data source, usually a [kotlinx.coroutines.flow.Flow] or [Iterable]. + * + * The accumulation is done in three phases: + * - Initialization of some (mutable) accumulator ([supply]) + * - Updating the accumulator with each value ([accumulate]) + * - Finalize the work, and extract the final result ([finish]) + * + * This interface is heavily influenced by + * Java's [`Collector`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html) + * and Haskell's [`foldl` library](https://hackage.haskell.org/package/foldl/docs/Control-Foldl.html). + */ +public interface CollectorI { + public val characteristics: Set + public suspend fun supply(): InternalAccumulator + public suspend fun accumulate(current: InternalAccumulator, value: Value) + public suspend fun finish(current: InternalAccumulator): Result + + public companion object { + /** + * Constructs a new [Collector] from its components + */ + public fun of( + supply: suspend () -> InternalAccumulator, + accumulate: suspend (current: InternalAccumulator, value: Value) -> Unit, + finish: suspend (current: InternalAccumulator) -> Result, + characteristics: Set = setOf(), + ): Collector = object : CollectorI { + override val characteristics: Set = characteristics + override suspend fun supply(): InternalAccumulator = supply() + override suspend fun accumulate(current: InternalAccumulator, value: Value) { + accumulate(current, value) + } + + override suspend fun finish(current: InternalAccumulator): Result = finish(current) + } + + /** + * Constructs a new [Collector] from its components + */ + public fun nonSuspendOf( + supply: () -> InternalAccumulator, + accumulate: (current: InternalAccumulator, value: Value) -> Unit, + finish: (current: InternalAccumulator) -> Result, + characteristics: Set = setOf(), + ): NonSuspendCollector = object : NonSuspendCollectorI { + override val characteristics: Set = characteristics + override fun supplyNonSuspend(): InternalAccumulator = supply() + override fun accumulateNonSuspend(current: InternalAccumulator, value: Value) { + accumulate(current, value) + } + + override fun finishNonSuspend(current: InternalAccumulator): Result = finish(current) + } + } + + /** + * Performs additional work during the finalization phase, + * by applying a function to the end result. + * + * @param transform Additional function to apply to the end result. + */ + // functor over R + public fun map( + transform: suspend (Result) -> S, + ): Collector = of( + supply = this::supply, + accumulate = this::accumulate, + finish = { current -> transform(this.finish(current)) }, + characteristics = this.characteristics - Characteristics.IDENTITY_FINISH, + ) + + /** + * Applies a function over each element in the data source, + * before giving it to the collector. + * + * @param transform Function to apply to each value + */ + // contravariant functor over T + public fun

contramap( + transform: suspend (P) -> Value, + ): Collector = of( + supply = this::supply, + accumulate = { current, value -> this.accumulate(current, transform(value)) }, + finish = this::finish, + characteristics = this.characteristics, + ) + + /** + * Combines two [Collector]s by performing the phases + * of each of them in parallel. + * + * @param other [Collector] to combine with [this] + */ + // applicative + public fun zip( + other: CollectorI, + ): Collector> = + this.zip(other, ::Pair) + + /** + * Combines two [Collector]s by performing the phases + * of each of them in parallel, and then combining + * the end result in a final step. + * + * @param other [Collector] to combine with [this] + * @param combine Function that combines the end results + */ + public fun zip( + other: CollectorI, + combine: suspend (Result, S) -> V, + ): Collector = of( + supply = { Pair(this.supply(), other.supply()) }, + accumulate = { (currentThis, currentOther), value -> + this.accumulate(currentThis, value) + other.accumulate(currentOther, value) + }, + finish = { (currentThis, currentOther) -> + combine(this.finish(currentThis), other.finish(currentOther)) + }, + characteristics = this.characteristics intersect other.characteristics, + ) +} + +public interface NonSuspendCollectorI + : CollectorI { + public fun supplyNonSuspend(): InternalAccumulator + override suspend fun supply(): InternalAccumulator = supplyNonSuspend() + public fun accumulateNonSuspend(current: InternalAccumulator, value: Value) + override suspend fun accumulate(current: InternalAccumulator, value: Value) { + accumulateNonSuspend(current, value) + } + public fun finishNonSuspend(current: InternalAccumulator): Result + override suspend fun finish(current: InternalAccumulator): Result = finishNonSuspend(current) + + /** + * Performs additional work during the finalization phase, + * by applying a function to the end result. + * + * @param transform Additional function to apply to the end result. + */ + // functor over R + public fun mapNonSuspend( + transform: (Result) -> S, + ): NonSuspendCollector = CollectorI.nonSuspendOf( + supply = this::supplyNonSuspend, + accumulate = this::accumulateNonSuspend, + finish = { current -> transform(this.finishNonSuspend(current)) }, + characteristics = this.characteristics - Characteristics.IDENTITY_FINISH, + ) + + /** + * Applies a function over each element in the data source, + * before giving it to the collector. + * + * @param transform Function to apply to each value + */ + // contravariant functor over T + public fun

contramapNonSuspend( + transform: (P) -> Value, + ): NonSuspendCollector = CollectorI.nonSuspendOf( + supply = this::supplyNonSuspend, + accumulate = { current, value -> this.accumulateNonSuspend(current, transform(value)) }, + finish = this::finishNonSuspend, + characteristics = this.characteristics, + ) + + /** + * Combines two [Collector]s by performing the phases + * of each of them in parallel. + * + * @param other [Collector] to combine with [this] + */ + // applicative + public fun zip( + other: NonSuspendCollectorI, + ): NonSuspendCollector> = + this.zipNonSuspend(other, ::Pair) + + /** + * Combines two [Collector]s by performing the phases + * of each of them in parallel, and then combining + * the end result in a final step. + * + * @param other [Collector] to combine with [this] + * @param combine Function that combines the end results + */ + public fun zipNonSuspend( + other: NonSuspendCollectorI, + combine: (Result, S) -> V, + ): NonSuspendCollector = CollectorI.nonSuspendOf( + supply = { Pair(this.supplyNonSuspend(), other.supplyNonSuspend()) }, + accumulate = { (currentThis, currentOther), value -> + this.accumulateNonSuspend(currentThis, value) + other.accumulateNonSuspend(currentOther, value) + }, + finish = { (currentThis, currentOther) -> + combine(this.finishNonSuspend(currentThis), other.finishNonSuspend(currentOther)) + }, + characteristics = this.characteristics intersect other.characteristics, + ) +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt new file mode 100644 index 00000000000..94180b2027f --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Collectors.kt @@ -0,0 +1,137 @@ +package arrow.collectors + +import arrow.atomic.Atomic +import arrow.atomic.AtomicInt +import arrow.atomic.update + +/** + * Library of [Collector]s. + */ +public object Collectors { + + /** + * Returns always the same value, regardless of the collected flow. + * + * @param value Value returns as result of the collection. + */ + public fun constant(value: R): NonSuspendCollector = Collector.nonSuspendOf( + supply = { value }, + accumulate = { _, _ -> }, + finish = { it }, + characteristics = Characteristics.IDENTITY_CONCURRENT_UNORDERED + ) + + /** + * Counts the number of elements in the flow. + */ + public val length: NonSuspendCollector = Collector.nonSuspendOf( + supply = { AtomicInt(0) }, + accumulate = { current, _ -> current.incrementAndGet() }, + finish = AtomicInt::get, + characteristics = Characteristics.CONCURRENT_UNORDERED + ) + + /** + * Sum of all the values in the flow. + */ + public val sum: NonSuspendCollector = + intReducer({ 0 }, Int::plus) + + public fun intReducer( + initial: () -> Int, combine: (Int, Int) -> Int, + ): NonSuspendCollector = Collector.nonSuspendOf( + supply = { AtomicInt(initial()) }, + accumulate = { current, value -> current.update { combine(it, value) } }, + finish = AtomicInt::get, + ) + + public fun reducer( + initial: () -> M, combine: (M, M) -> M, unordered: Boolean = false, + ): NonSuspendCollector = Collector.nonSuspendOf( + supply = { Atomic(initial()) }, + accumulate = { current, value -> current.update { combine(it, value) } }, + finish = Atomic::get, + characteristics = if (unordered) Characteristics.CONCURRENT_UNORDERED else emptySet() + ) + + private data object BestByNotInitialized + + /** + * Returns the "best" value from the value. + * + * @param selectNew Decides whether the new value is "better" than the previous best. + */ + @Suppress("UNCHECKED_CAST") + public fun bestBy( + selectNew: (old: M, new: M) -> Boolean, + ): NonSuspendCollector = Collector.nonSuspendOf, M, M?>( + supply = { Atomic(BestByNotInitialized) }, + accumulate = { current, value -> + current.update { old -> + if (old == BestByNotInitialized) { + value + } else { + old as M + if (selectNew(old, value)) value else old + } + } + }, + finish = { current -> + when (val result = current.get()) { + BestByNotInitialized -> null + else -> result as M + } + }, + characteristics = Characteristics.CONCURRENT_UNORDERED + ) + + /** + * Collects all the values in a list, in the order in which they are emitted. + */ + @Suppress("UNCHECKED_CAST") + public fun list(): NonSuspendCollector> = + _list as NonSuspendCollector> + + /** + * Collects all the values in a set. + */ + @Suppress("UNCHECKED_CAST") + public fun set(): NonSuspendCollector> = + _set as NonSuspendCollector> + + /** + * Collects all the values in a map. + */ + public fun mapFromEntries(): NonSuspendCollector, Map> = + map().contramapNonSuspend { (k, v) -> k to v } + + /** + * Collects all the values in a map. + */ + @Suppress("UNCHECKED_CAST") + public fun map(): NonSuspendCollector, Map> = + _map as NonSuspendCollector, Map> + + /* These Collectors can be cached and casted accordingly */ + + private val _list: NonSuspendCollector> = Collector.nonSuspendOf( + supply = { mutableListOf() }, + accumulate = MutableList::add, + finish = { it }, + characteristics = Characteristics.IDENTITY + ) + + private val _set: NonSuspendCollector> = Collector.nonSuspendOf( + supply = ::mutableSetOf, + accumulate = MutableSet::add, + finish = { it }, + characteristics = Characteristics.IDENTITY_UNORDERED + ) + + private val _map: NonSuspendCollector, Map> = Collector.nonSuspendOf( + supply = { mutableMapOf() }, + accumulate = { current, (k, v) -> current[k] = v }, + finish = { it }, + characteristics = Characteristics.IDENTITY_UNORDERED + ) +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt new file mode 100644 index 00000000000..df55de7f834 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonMain/kotlin/arrow/collectors/Zip.kt @@ -0,0 +1,61 @@ +package arrow.collectors + +/** + * Combines two [Collector]s by performing the phases + * of each of them in parallel. + * + * @param x First [Collector] + * @param y Second [Collector] + * @param combine Function that combines the end results + */ +public fun zip( + x: Collector, + y: Collector, + combine: suspend (R, S) -> V, +): Collector = x.zip(y, combine) + +/** + * Combines two [NonSuspendCollector]s by performing the phases + * of each of them in parallel. + * + * @param x First [NonSuspendCollector] + * @param y Second [NonSuspendCollector] + * @param combine Function that combines the end results + */ +public fun zip( + x: NonSuspendCollector, + y: NonSuspendCollector, + combine: (R, S) -> V, +): NonSuspendCollector = x.zipNonSuspend(y, combine) + +/** + * Combines three [Collector]s by performing the phases + * of each of them in parallel. + * + * @param x First [Collector] + * @param y Second [Collector] + * @param z Third [Collector] + * @param combine Function that combines the end results + */ +public fun zip( + x: Collector, + y: Collector, + z: Collector, + combine: suspend (R, S, T) -> V, +): Collector = x.zip(y).zip(z) { (a, b), c -> combine(a, b, c) } + +/** + * Combines three [NonSuspendCollector]s by performing the phases + * of each of them in parallel. + * + * @param x First [NonSuspendCollector] + * @param y Second [NonSuspendCollector] + * @param z Third [NonSuspendCollector] + * @param combine Function that combines the end results + */ +public fun zip( + x: NonSuspendCollector, + y: NonSuspendCollector, + z: NonSuspendCollector, + combine: (R, S, T) -> V, +): NonSuspendCollector = x.zip(y).zipNonSuspend(z) { (a, b), c -> combine(a, b, c) } diff --git a/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt b/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt new file mode 100644 index 00000000000..ab66dfa7671 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/CollectorsTest.kt @@ -0,0 +1,48 @@ +package arrow.collectors + +import io.kotest.matchers.shouldBe +import kotlin.test.Test + +class CollectorsTest { + @Test + fun lengthWorks() = runTestOverLists { + it.collect(Collectors.length) shouldBe it.size + it.parCollect(Collectors.length) shouldBe it.size + } + + @Test + fun sumWorks() = runTestOverLists { + it.collect(Collectors.sum) shouldBe it.sum() + it.parCollect(Collectors.sum) shouldBe it.sum() + } + + @Test + fun zipSumWorks1() = runTestOverLists { + it.collect(zip(Collectors.sum, Collectors.sum, Int::plus)) shouldBe it.sum() * 2 + it.parCollect(zip(Collectors.sum, Collectors.sum, Int::plus)) shouldBe it.sum() * 2 + } + + @Test + fun zipSumWorks2() = runTestOverLists { + it.collect(zip(Collectors.sum, Collectors.sum, Int::minus)) shouldBe 0 + it.parCollect(zip(Collectors.sum, Collectors.sum, Int::minus)) shouldBe 0 + } + + @Test + fun bestWorks() = runTestOverLists { + it.collect(Collectors.bestBy { old, new -> new > old }) shouldBe it.maxOrNull() + it.parCollect(Collectors.bestBy { old, new -> new > old }) shouldBe it.maxOrNull() + } + + @Test + fun listWorks() = runTestOverLists { + it.collect(Collectors.list()) shouldBe it + it.parCollect(Collectors.list()) shouldBe it + } + + @Test + fun setWorks() = runTestOverLists { + it.collect(Collectors.set()) shouldBe it.toSet() + it.parCollect(Collectors.set()) shouldBe it.toSet() + } +} diff --git a/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/Util.kt b/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/Util.kt new file mode 100644 index 00000000000..e53498afdbd --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/commonTest/kotlin/arrow/collectors/Util.kt @@ -0,0 +1,23 @@ +package arrow.collectors + +import io.kotest.property.Arb +import io.kotest.property.PropertyContext +import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.list +import io.kotest.property.checkAll +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.test.TestResult +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext +import kotlin.time.Duration.Companion.seconds + +fun runTestWithDelay(testBody: suspend TestScope.() -> Unit): TestResult = runTest(timeout = 30.seconds) { + withContext(Dispatchers.Default) { + testBody() + } +} + +fun runTestOverLists(block: suspend PropertyContext.(List) -> Unit): TestResult = runTestWithDelay { + checkAll(Arb.list(Arb.int(-1000 .. 1000), range = 0 .. 15), block) +} diff --git a/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt new file mode 100644 index 00000000000..4ef81b5fdb3 --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/Collectors.kt @@ -0,0 +1,42 @@ +package arrow.collectors + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +/** + * Collects all the values in a map. + * + * This [Collector] supports concurrency, + * so it's potential faster than [Collectors.mapFromEntries]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.concurrentMapFromEntries(): NonSuspendCollector, ConcurrentMap> = + Collectors.concurrentMap().contramapNonSuspend { (k, v) -> k to v } + +/** + * Collects all the values in a map. + * + * This [Collector] supports concurrency, + * so it's potential faster than [Collectors.map]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.concurrentMap(): NonSuspendCollector, ConcurrentMap> = Collector.nonSuspendOf( + supply = { ConcurrentHashMap() }, + accumulate = { current, (k, v) -> current[k] = v }, + finish = { it }, + characteristics = Characteristics.IDENTITY_CONCURRENT_UNORDERED +) + +/** + * Collects all the values in a set. + * + * This [Collector] supports concurrency, + * so it's potential faster than [Collectors.set]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.concurrentSet(): NonSuspendCollector> = Collector.nonSuspendOf( + supply = { ConcurrentHashMap().keySet(Unit) }, + accumulate = ConcurrentHashMap.KeySetView::add, + finish = { it }, + characteristics = Characteristics.IDENTITY_CONCURRENT_UNORDERED +) diff --git a/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt new file mode 100644 index 00000000000..5476274673d --- /dev/null +++ b/arrow-libs/fx/arrow-collectors/src/jvmMain/kotlin/arrow/collectors/JvmCollector.kt @@ -0,0 +1,34 @@ +package arrow.collectors + +/** + * Wraps a [java.util.stream.Collector] to use with [collect]. + */ +public fun java.util.stream.Collector.asCollector(): NonSuspendCollector = + Collectors.jvm(this) + +/** + * Wraps a [java.util.stream.Collector] to use with [collect]. + */ +@Suppress("UnusedReceiverParameter") +public fun Collectors.jvm( + collector: java.util.stream.Collector, +): NonSuspendCollector = Collectors.jvmI(collector) + +private typealias JavaCharacteristics = java.util.stream.Collector.Characteristics + +@Suppress("UnusedReceiverParameter") +private fun Collectors.jvmI( + collector: java.util.stream.Collector, +): NonSuspendCollector = Collector.nonSuspendOf( + supply = { collector.supplier().get() }, + accumulate = { current, value -> collector.accumulator().accept(current, value) }, + finish = { collector.finisher().apply(it) }, + characteristics = + collector.characteristics().let { original -> + setOfNotNull( + Characteristics.CONCURRENT.takeIf { JavaCharacteristics.CONCURRENT in original }, + Characteristics.IDENTITY_FINISH.takeIf { JavaCharacteristics.IDENTITY_FINISH in original }, + Characteristics.UNORDERED.takeIf { JavaCharacteristics.UNORDERED in original }, + ) + } +) diff --git a/build.gradle.kts b/build.gradle.kts index c83b401be48..c927f753a5d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -75,6 +75,7 @@ dependencies { kover(projects.arrowOpticsKspPlugin) kover(projects.arrowOpticsReflect) kover(projects.arrowResilience) + kover(projects.arrowCollectors) } allprojects { diff --git a/settings.gradle.kts b/settings.gradle.kts index 3957a87e1a0..3c9494210f1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -63,6 +63,9 @@ project(":arrow-fx-coroutines").projectDir = file("arrow-libs/fx/arrow-fx-corout include("arrow-fx-stm") project(":arrow-fx-stm").projectDir = file("arrow-libs/fx/arrow-fx-stm") +include("arrow-collectors") +project(":arrow-collectors").projectDir = file("arrow-libs/fx/arrow-collectors") + include("arrow-resilience") project(":arrow-resilience").projectDir = file("arrow-libs/resilience/arrow-resilience")