Skip to content

Commit

Permalink
Merge pull request #231 from modelix/MODELIX-526
Browse files Browse the repository at this point in the history
fix(modelql): CCE from SimpleStepOutput to MultiplexedOutput
  • Loading branch information
slisson authored Sep 11, 2023
2 parents 74edf9d + cef7a7f commit 73b67b8
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import java.util.Collections
import java.util.SortedSet
import java.util.TreeSet
import kotlin.random.Random
import kotlin.test.Ignore
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.milliseconds
Expand Down Expand Up @@ -115,6 +116,7 @@ class ReplicatedRepositoryTest {
}
}

@Ignore
@Test
fun `concurrent write`() = runTest {
val url = "http://localhost/v2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import org.modelix.modelql.core.IMonoUnboundQuery
import org.modelix.modelql.core.IQueryExecutor
import org.modelix.modelql.core.IUnboundQuery
import org.modelix.modelql.core.IZip2Output
import org.modelix.modelql.core.SimpleStepOutput
import org.modelix.modelql.core.StepFlow
import org.modelix.modelql.core.asMono
import org.modelix.modelql.core.asStepOutput
import org.modelix.modelql.core.filterNotNull
import org.modelix.modelql.core.first
import org.modelix.modelql.core.flatMap
Expand Down Expand Up @@ -79,7 +79,7 @@ abstract class ModelQLNode(val client: ModelQLClient) : INode, ISupportsModelQL,
is IMonoUnboundQuery<*, *> -> {
val castedQuery = query as IMonoUnboundQuery<INode, Out>
val queryOnNode = IUnboundQuery.buildMono { replaceQueryRoot(it).map(castedQuery) }
emit(SimpleStepOutput(client.runQuery(queryOnNode), null))
emit(client.runQuery(queryOnNode).asStepOutput(null))
}
is IFluxUnboundQuery<*, *> -> {
val castedQuery = query as IFluxUnboundQuery<INode, Out>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class FirstElementStep<E>() : MonoTransformingStep<E, E>() {

override fun requiresSingularQueryInput(): Boolean = true

override fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<E>): IStepOutput<E> {
return input
}

override fun transform(evaluationContext: QueryEvaluationContext, input: E): E {
return input
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ abstract class MonoTransformingStep<In, Out> : TransformingStep<In, Out>(), IMon
fun connectAndDowncast(producer: IFluxStep<In>): IFluxStep<Out> = also { producer.connect(it) }

override fun createFlow(input: StepFlow<In>, context: IFlowInstantiationContext): StepFlow<Out> {
return input.map { transform(context.evaluationContext, it.value).asStepOutput(this) }
return input.map { transform(context.evaluationContext, it) }
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<Out> {
Expand All @@ -200,6 +200,11 @@ abstract class MonoTransformingStep<In, Out> : TransformingStep<In, Out>(), IMon
override fun evaluate(evaluationContext: QueryEvaluationContext, queryInput: Any?): Optional<Out> {
return getProducer().evaluate(evaluationContext, queryInput).map { transform(evaluationContext, it) }
}

protected open fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<In>): IStepOutput<Out> {
return transform(evaluationContext, input.value).asStepOutput(this)
}

abstract fun transform(evaluationContext: QueryEvaluationContext, input: In): Out
}

Expand Down Expand Up @@ -228,6 +233,10 @@ abstract class AggregationStep<In, Out> : MonoTransformingStep<In, Out>() {
}
}

override fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<In>): IStepOutput<Out> {
return aggregate(sequenceOf(input))
}

override fun transform(evaluationContext: QueryEvaluationContext, input: In): Out {
return aggregate(sequenceOf(input.asStepOutput(null))).value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ interface IStepOutput<out E> {
val value: E
}

fun <T> IStepOutput<*>.upcast(): IStepOutput<T> = this as IStepOutput<T>

typealias StepFlow<E> = Flow<IStepOutput<E>>
val <T> Flow<IStepOutput<T>>.value: Flow<T> get() = map { it.value }
fun <T> Flow<T>.asStepFlow(owner: IProducingStep<T>?): StepFlow<T> = map { SimpleStepOutput(it, owner) }
fun <T> Flow<T>.asStepFlow(owner: IProducingStep<T>?): StepFlow<T> = map { it.asStepOutput(owner) }

class SimpleStepOutput<out E>(override val value: E, val owner: IProducingStep<E>?) : IStepOutput<E>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class MapIfNotNullStep<In : Any, Out>(val query: MonoUnboundQuery<In, Out>) : Mo
}
}

override fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<In?>): IStepOutput<Out?> {
throw UnsupportedOperationException("use MapIfNotNullStep.createFlow")
}

override fun transform(evaluationContext: QueryEvaluationContext, input: In?): Out? {
return input?.let { query.outputStep.evaluate(evaluationContext, it).getOrElse(null) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kotlinx.coroutines.flow.map

abstract class TransformingStepWithParameter<In : CommonIn, ParameterT : CommonIn, CommonIn, Out> : MonoTransformingStep<CommonIn, Out>() {
private var hasStaticParameter: Boolean = false
private var staticParameterValue: ParameterT? = null
private var staticParameterValue: IStepOutput<ParameterT>? = null

private var targetProducer: IProducingStep<ParameterT>? = null

Expand All @@ -32,25 +32,25 @@ abstract class TransformingStepWithParameter<In : CommonIn, ParameterT : CommonI
require(!getParameterProducer().canBeMultiple()) { "only mono parameters are supported: ${getParameterProducer()}" }
hasStaticParameter = getParameterProducer().canEvaluateStatically()
if (hasStaticParameter) {
staticParameterValue = getParameterProducer().evaluateStatically()
staticParameterValue = getParameterProducer().evaluateStatically().asStepOutput(null)
}
}

override fun createFlow(input: StepFlow<CommonIn>, context: IFlowInstantiationContext): StepFlow<Out> {
if (hasStaticParameter) {
return input.map { transformElement(it as IStepOutput<In>, (staticParameterValue as ParameterT).asStepOutput(null)) }
return input.map { transformElement(it.upcast<In>(), staticParameterValue as IStepOutput<ParameterT>) }
} else {
val parameterFlow = context.getOrCreateFlow<ParameterT>(getParameterProducer())
return flow {
val parameterValue = parameterFlow.firstOrNull()
emitAll(input.map { transformElement(it as IStepOutput<In>, parameterValue) })
emitAll(input.map { transformElement(it.upcast<In>(), parameterValue) })
}
}
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<Out> {
val parameterValue: IStepOutput<ParameterT>? = if (hasStaticParameter) {
(staticParameterValue as ParameterT).asStepOutput(null)
staticParameterValue
} else {
getParameterProducer().evaluate(
evaluationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ class ZipElementAccessStep<Out>(val index: Int) : MonoTransformingStep<IZipOutpu
return zipSerializer.elementSerializers[index]
}

override fun transform(
evaluationContext: QueryEvaluationContext,
input: IStepOutput<IZipOutput<Any?>>,
): IStepOutput<Out> {
return (input as ZipStepOutput<*, *>).values[index] as IStepOutput<Out>
}

override fun transform(evaluationContext: QueryEvaluationContext, input: IZipOutput<Any?>): Out {
return input.values[index] as Out
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,18 @@ class ModelQLTest {
assertEquals(testDatabase.products, result)
}

@Test
fun zipElementAccess() = runTestWithTimeout {
val result = remoteProductDatabaseQuery { db ->
db.products.flatMap { enum ->
enum.images.allowEmpty().zip(enum.title.firstOrNull()).map { it ->
it.first.zip(it.second).mapLocal { "" }
}
}.toList()
}
assertEquals(testDatabase.products.flatMap { it.images }.map { "" }, result)
}

// @Test
// fun testIndexLookup() {
// val result = remoteProductDatabaseQuery { db ->
Expand Down

0 comments on commit 73b67b8

Please sign in to comment.