Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Collectors #3280

Merged
merged 20 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions arrow-libs/fx/arrow-collectors/api/arrow-collectors.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
public final class arrow/collectors/Characteristics : java/lang/Enum {
public static final field CONCURRENT Larrow/collectors/Characteristics;
public static final field Companion Larrow/collectors/Characteristics$Companion;
public static final field IDENTITY_FINISH Larrow/collectors/Characteristics;
public static final field UNORDERED Larrow/collectors/Characteristics;
public static fun getEntries ()Lkotlin/enums/EnumEntries;
public static fun valueOf (Ljava/lang/String;)Larrow/collectors/Characteristics;
public static fun values ()[Larrow/collectors/Characteristics;
}

public final class arrow/collectors/Characteristics$Companion {
public final fun getCONCURRENT_UNORDERED ()Ljava/util/Set;
public final fun getIDENTITY_CONCURRENT ()Ljava/util/Set;
public final fun getIDENTITY_CONCURRENT_UNORDERED ()Ljava/util/Set;
}

public abstract interface class arrow/collectors/CollectorI {
public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun contramap (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getCharacteristics ()Ljava/util/Set;
public abstract fun map (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun zip (Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public abstract fun zip (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/CollectorI$DefaultImpls {
public static fun contramap (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun map (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/Collectors {
public static final field INSTANCE Larrow/collectors/Collectors;
public final fun bestBy (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public final fun constant (Ljava/lang/Object;)Larrow/collectors/CollectorI;
public final fun getLength ()Larrow/collectors/CollectorI;
public final fun getSum ()Larrow/collectors/CollectorI;
public final fun intReducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public final fun list ()Larrow/collectors/CollectorI;
public final fun map ()Larrow/collectors/CollectorI;
public final fun mapFromEntries ()Larrow/collectors/CollectorI;
public final fun reducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Z)Larrow/collectors/CollectorI;
public static synthetic fun reducer$default (Larrow/collectors/Collectors;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ZILjava/lang/Object;)Larrow/collectors/CollectorI;
public final fun set ()Larrow/collectors/CollectorI;
}

public final class arrow/collectors/CollectorsKt {
public static final fun concurrentMap (Larrow/collectors/Collectors;)Larrow/collectors/CollectorI;
public static final fun concurrentMapFromEntries (Larrow/collectors/Collectors;)Larrow/collectors/CollectorI;
public static final fun concurrentSet (Larrow/collectors/Collectors;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/FlowKt {
public static final fun collect (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun collect$default (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun collect$default (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun collectBlocking (Ljava/lang/Iterable;Larrow/collectors/CollectorI;)Ljava/lang/Object;
public static final fun collectBlocking (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;)Ljava/lang/Object;
}

public final class arrow/collectors/JvmCollectorKt {
public static final fun asCollector (Ljava/util/stream/Collector;)Larrow/collectors/CollectorI;
public static final fun jvm (Larrow/collectors/Collectors;Ljava/util/stream/Collector;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/ZipKt {
public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
}

44 changes: 44 additions & 0 deletions arrow-libs/fx/arrow-collectors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
@file:Suppress("DSL_SCOPE_VIOLATION")

plugins {
id(libs.plugins.kotlin.multiplatform.get().pluginId)
alias(libs.plugins.arrowGradleConfig.kotlin)
alias(libs.plugins.arrowGradleConfig.publish)
alias(libs.plugins.spotless)
}

apply(from = property("ANIMALSNIFFER_MPP"))

kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowFxCoroutines)
api(projects.arrowAtomic)
api(libs.coroutines.core)
implementation(libs.kotlin.stdlibCommon)
}
}

commonTest {
dependencies {
implementation(libs.kotlin.test)
implementation(libs.kotest.frameworkEngine)
implementation(libs.kotest.assertionsCore)
implementation(libs.kotest.property)
}
}
}

jvm {
tasks.jvmJar {
manifest {
attributes["Automatic-Module-Name"] = "arrow.collectors"
}
}
}
}

tasks.withType<Test> {
useJUnitPlatform()
}
4 changes: 4 additions & 0 deletions arrow-libs/fx/arrow-collectors/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
pom.name=Arrow Collectors
# Build configuration
kapt.incremental.apt=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package arrow.collectors

// https://hackage.haskell.org/package/foldl-1.4.15/docs/Control-Foldl.html

/**
* Defines how the [collect] function may run the collector.
*/
public enum class Characteristics {
/**
* The flow may be collected using several workers.
*/
CONCURRENT,

/**
* The final step of the collector simply returns the accumulator.
* That means that final call may be replaced by a cast.
*/
IDENTITY_FINISH,

/**
* The order in which elements are accumulated doesn't matter for the end result.
*/
UNORDERED;

public companion object {
public val CONCURRENT_UNORDERED: Set<Characteristics> = setOf(CONCURRENT, UNORDERED)
public val IDENTITY_CONCURRENT: Set<Characteristics> = setOf(IDENTITY_FINISH, CONCURRENT)
public val IDENTITY_CONCURRENT_UNORDERED: Set<Characteristics> = setOf(IDENTITY_FINISH, CONCURRENT, UNORDERED)
}
}


public typealias Collector<T, R> = CollectorI<*, T, R>

/**
* A [Collector] accumulates information from elements
* coming from some data source, usually a [kotlinx.coroutines.flow.Flow] or [Iterable].
*
* The accumulation is done in three phases:
* - Initialization of some (mutable) accumulator ([supply])
* - Updating the accumulator with each value ([accumulate])
* - Finalize the work, and extract the final result ([finish])
*
* This interface is heavily influenced by
* Java's [`Collector`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html)
* and Haskell's [`foldl` library](https://hackage.haskell.org/package/foldl/docs/Control-Foldl.html).
*/
public interface CollectorI<A, in T, out R> {
public val characteristics: Set<Characteristics>
public suspend fun supply(): A
public suspend fun accumulate(current: A, value: T)
public suspend fun finish(current: A): R

/**
* Performs additional work during the finalization phase,
* by applying a function to the end result.
*
* @param finish Additional function to apply to the end result.
*/
// functor over R
public fun <S> map(
finish: suspend (R) -> S,
): Collector<T, S> {
val me = this
return object : CollectorI<A, T, S> {
override val characteristics: Set<Characteristics> =
me.characteristics

override suspend fun supply(): A =
me.supply()

override suspend fun accumulate(current: A, value: T) =
me.accumulate(current, value)

override suspend fun finish(current: A): S =
finish(me.finish(current))
}
}

/**
* Applies a function over each element in the data source,
* before giving it to the collector.
*
* @param transform Function to apply to each value
*/
// contravariant functor over T
public fun <P> contramap(
transform: suspend (P) -> T,
): Collector<P, R> {
val me = this
return object : CollectorI<A, P, R> {
override val characteristics: Set<Characteristics> =
me.characteristics

override suspend fun supply(): A =
me.supply()

override suspend fun accumulate(current: A, value: P) =
me.accumulate(current, transform(value))

override suspend fun finish(current: A): R =
me.finish(current)
}
}

/**
* Combines two [Collector]s by performing the phases
* of each of them in parallel.
*
* @param other [Collector] to combine with [this]
*/
// applicative
public fun <B, S> zip(
other: CollectorI<B, @UnsafeVariance T, S>,
): Collector<T, Pair<R, S>> =
this.zip(other, ::Pair)

/**
* Combines two [Collector]s by performing the phases
* of each of them in parallel, and then combining
* the end result in a final step.
*
* @param other [Collector] to combine with [this]
* @param finish Function that combines the end results
*/
public fun <B, S, V> zip(
other: CollectorI<B, @UnsafeVariance T, S>,
finish: (R, S) -> V,
): Collector<T, V> {
val me = this
return object : CollectorI<Pair<A, B>, T, V> {
override val characteristics: Set<Characteristics> =
me.characteristics.intersect(other.characteristics)

override suspend fun supply(): Pair<A, B> =
Pair(me.supply(), other.supply())

override suspend fun accumulate(current: Pair<A, B>, value: T) {
me.accumulate(current.first, value)
other.accumulate(current.second, value)
}

override suspend fun finish(current: Pair<A, B>): V =
finish(me.finish(current.first), other.finish(current.second))
}
}
}
Loading
Loading