Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Collectors #3280

Merged
merged 20 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions arrow-libs/fx/arrow-collectors/api/arrow-collectors.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
public final class arrow/collectors/Characteristics : java/lang/Enum {
public static final field CONCURRENT Larrow/collectors/Characteristics;
public static final field Companion Larrow/collectors/Characteristics$Companion;
public static final field IDENTITY_FINISH Larrow/collectors/Characteristics;
public static final field UNORDERED Larrow/collectors/Characteristics;
public static fun getEntries ()Lkotlin/enums/EnumEntries;
public static fun valueOf (Ljava/lang/String;)Larrow/collectors/Characteristics;
public static fun values ()[Larrow/collectors/Characteristics;
}

public final class arrow/collectors/Characteristics$Companion {
public final fun getCONCURRENT_UNORDERED ()Ljava/util/Set;
public final fun getIDENTITY ()Ljava/util/Set;
public final fun getIDENTITY_CONCURRENT ()Ljava/util/Set;
public final fun getIDENTITY_CONCURRENT_UNORDERED ()Ljava/util/Set;
public final fun getIDENTITY_UNORDERED ()Ljava/util/Set;
}

public final class arrow/collectors/CollectKt {
public static final fun collect (Ljava/lang/Iterable;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object;
public static final fun collect (Ljava/util/Iterator;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object;
public static final fun collect (Lkotlin/sequences/Sequence;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun collect$default (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parCollect (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parCollect (Lkotlin/sequences/Sequence;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parCollect$default (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parCollect$default (Lkotlin/sequences/Sequence;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class arrow/collectors/CollectorI {
public static final field Companion Larrow/collectors/CollectorI$Companion;
public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun contramap (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getCharacteristics ()Ljava/util/Set;
public abstract fun map (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun zip (Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public abstract fun zip (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/CollectorI$Companion {
public final fun nonSuspendOf (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Ljava/util/Set;)Larrow/collectors/NonSuspendCollectorI;
public static synthetic fun nonSuspendOf$default (Larrow/collectors/CollectorI$Companion;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Ljava/util/Set;ILjava/lang/Object;)Larrow/collectors/NonSuspendCollectorI;
public final fun of (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Ljava/util/Set;)Larrow/collectors/CollectorI;
public static synthetic fun of$default (Larrow/collectors/CollectorI$Companion;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Ljava/util/Set;ILjava/lang/Object;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/CollectorI$DefaultImpls {
public static fun contramap (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun map (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/Collectors {
public static final field INSTANCE Larrow/collectors/Collectors;
public final fun bestBy (Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
public final fun constant (Ljava/lang/Object;)Larrow/collectors/NonSuspendCollectorI;
public final fun getLength ()Larrow/collectors/NonSuspendCollectorI;
public final fun getSum ()Larrow/collectors/NonSuspendCollectorI;
public final fun intReducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
public final fun list ()Larrow/collectors/NonSuspendCollectorI;
public final fun map ()Larrow/collectors/NonSuspendCollectorI;
public final fun mapFromEntries ()Larrow/collectors/NonSuspendCollectorI;
public final fun reducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Z)Larrow/collectors/NonSuspendCollectorI;
public static synthetic fun reducer$default (Larrow/collectors/Collectors;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ZILjava/lang/Object;)Larrow/collectors/NonSuspendCollectorI;
public final fun set ()Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/CollectorsKt {
public static final fun concurrentMap (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI;
public static final fun concurrentMapFromEntries (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI;
public static final fun concurrentSet (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/JvmCollectorKt {
public static final fun asCollector (Ljava/util/stream/Collector;)Larrow/collectors/NonSuspendCollectorI;
public static final fun jvm (Larrow/collectors/Collectors;Ljava/util/stream/Collector;)Larrow/collectors/NonSuspendCollectorI;
}

public abstract interface class arrow/collectors/NonSuspendCollectorI : arrow/collectors/CollectorI {
public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun accumulateNonSuspend (Ljava/lang/Object;Ljava/lang/Object;)V
public abstract fun contramapNonSuspend (Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun finishNonSuspend (Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun mapNonSuspend (Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun supplyNonSuspend ()Ljava/lang/Object;
public abstract fun zip (Larrow/collectors/NonSuspendCollectorI;)Larrow/collectors/NonSuspendCollectorI;
public abstract fun zipNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/NonSuspendCollectorI$DefaultImpls {
public static fun accumulate (Larrow/collectors/NonSuspendCollectorI;Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun contramap (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun contramapNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public static fun finish (Larrow/collectors/NonSuspendCollectorI;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun map (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun mapNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public static fun supply (Larrow/collectors/NonSuspendCollectorI;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;)Larrow/collectors/NonSuspendCollectorI;
public static fun zipNonSuspend (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/ZipKt {
public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function4;)Larrow/collectors/CollectorI;
public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
public static final fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/NonSuspendCollectorI;
public static final fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
}

65 changes: 65 additions & 0 deletions arrow-libs/fx/arrow-collectors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
@file:Suppress("DSL_SCOPE_VIOLATION")

import java.time.Duration

plugins {
id(libs.plugins.kotlin.multiplatform.get().pluginId)
alias(libs.plugins.arrowGradleConfig.kotlin)
alias(libs.plugins.arrowGradleConfig.publish)
alias(libs.plugins.spotless)
alias(libs.plugins.kotlinx.kover)
}

apply(from = property("ANIMALSNIFFER_MPP"))

kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowFxCoroutines)
api(projects.arrowAtomic)
api(libs.coroutines.core)
implementation(libs.kotlin.stdlibCommon)
}
}

commonTest {
dependencies {
implementation(libs.kotlin.test)
implementation(libs.kotest.frameworkEngine)
implementation(libs.kotest.assertionsCore)
implementation(libs.kotest.property)
}
}
}

jvm {
tasks.jvmJar {
manifest {
attributes["Automatic-Module-Name"] = "arrow.collectors"
}
}
}

js {
nodejs {
testTask {
useMocha {
timeout = "60s"
}
}
}
browser {
testTask {
useKarma {
useChromeHeadless()
timeout.set(Duration.ofMinutes(1))
}
}
}
Comment on lines +45 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic: I think would be good if we can put this in some common configuration at some point. Still not sure what's the best way to do that in Gradle 🙃

}
}

tasks.withType<Test> {
useJUnitPlatform()
}
4 changes: 4 additions & 0 deletions arrow-libs/fx/arrow-collectors/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
pom.name=Arrow Collectors
# Build configuration
kapt.incremental.apt=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package arrow.collectors

import arrow.fx.coroutines.parMap
import arrow.fx.coroutines.parMapUnordered
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.DEFAULT_CONCURRENCY
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart

/**
* Performs collection over the elements of [this].
* The amount of concurrency depends on the
* [Characteristics] of [collector], and can
* be tweaked using the [concurrency] parameter.
*
* [this] is consumed only once during collection.
* We recommend using a cold [Flow] to ensure that
* elements are produced only when needed.
*
* @receiver [Flow] of elements to collect
* @param collector Describes how to collect the values
* @param concurrency Defines the concurrency limit
*/
@OptIn(FlowPreview::class)
public suspend fun <T, R> Flow<T>.collect(
collector: Collector<T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R = collectI(collector, concurrency)

/**
* Performs collection over the elements of [this].
* The amount of concurrency depends on the
* [Characteristics] of [collector], and can
* be tweaked using the [concurrency] parameter.
*
* [this] is iterated only once during collection.
*
* Note: if you need to perform changes on the values
* before collection, we strongly recommend to convert
* the [Iterable] into a [Flow] using [asFlow],
* perform those changes, and then using [collect].
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
* @param concurrency Defines the concurrency limit
*/
@OptIn(FlowPreview::class)
public suspend fun <T, R> Iterable<T>.parCollect(
collector: Collector<T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R = asFlow().collect(collector, concurrency)

/**
* Performs collection over the elements of [this].
* The amount of concurrency depends on the
* [Characteristics] of [collector], and can
* be tweaked using the [concurrency] parameter.
*
* [this] is iterated only once during collection.
*
* Note: if you need to perform changes on the values
* before collection, we strongly recommend to convert
* the [Sequence] into a [Flow] using [asFlow],
* perform those changes, and then using [collect].
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
* @param concurrency Defines the concurrency limit
*/
@OptIn(FlowPreview::class)
public suspend fun <T, R> Sequence<T>.parCollect(
collector: Collector<T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R = asFlow().collect(collector, concurrency)

/**
* Performs collection over the elements of [this]
* in a non-concurrent fashion.
*
* [this] is iterated only once during collection.
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
*/
public fun <T, R> Iterable<T>.collect(
collector: NonSuspendCollector<T, R>
): R = iterator().collectI(collector)

/**
* Performs collection over the elements of [this]
* in a non-concurrent fashion.
*
* [this] is iterated only once during collection.
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
*/
public fun <T, R> Sequence<T>.collect(
collector: NonSuspendCollector<T, R>
): R = iterator().collectI(collector)

public fun <T, R> Iterator<T>.collect(
collector: NonSuspendCollector<T, R>
): R = collectI(collector)

@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class)
@Suppress("UNINITIALIZED_VARIABLE", "UNCHECKED_CAST")
internal suspend fun <A, T, R> Flow<T>.collectI(
collector: CollectorI<A, T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R {
var accumulator: A
val started = this.onStart { accumulator = collector.supply() }
val continued = when {
Characteristics.CONCURRENT in collector.characteristics -> when {
Characteristics.UNORDERED in collector.characteristics ->
started.parMapUnordered(concurrency) { collector.accumulate(accumulator, it) }

else ->
started.parMap(concurrency) { collector.accumulate(accumulator, it) }
}

else -> started.map { collector.accumulate(accumulator, it) }
}
continued.collect()

return when {
Characteristics.IDENTITY_FINISH in collector.characteristics -> accumulator as R
else -> collector.finish(accumulator)
}
}

@Suppress("UNCHECKED_CAST")
internal fun <A, T, R> Iterator<T>.collectI(
collector: NonSuspendCollectorI<A, T, R>
): R {
val accumulator = collector.supplyNonSuspend()
forEach { collector.accumulateNonSuspend(accumulator, it) }
return when {
Characteristics.IDENTITY_FINISH in collector.characteristics -> accumulator as R
else -> collector.finishNonSuspend(accumulator)
}
}
Loading