From 11da583d4ed4c2923efeab3b0760b0072c659827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B3gvan=20Olsen?= Date: Sat, 26 Jan 2019 18:39:48 +0000 Subject: [PATCH] #38: Added support for suspendable property resolvers --- .../kgraphql/schema/dsl/PropertyDSL.kt | 18 +++++ .../execution/ParallelRequestExecutor.kt | 75 ++++++++++--------- .../integration/ParallelExecutionTest.kt | 44 +++++++++-- .../language/ArgumentsSpecificationTest.kt | 50 +++++++++++++ 4 files changed, 146 insertions(+), 41 deletions(-) diff --git a/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/dsl/PropertyDSL.kt b/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/dsl/PropertyDSL.kt index b9bbbd3d..7fad31ee 100644 --- a/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/dsl/PropertyDSL.kt +++ b/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/dsl/PropertyDSL.kt @@ -40,6 +40,24 @@ class PropertyDSL(val name : String, block : PropertyDSL.() -> fun resolver(function: (T, E, W, Q, A, S) -> R) = resolver(FunctionWrapper.on(function, true)) + fun suspendResolver(function: suspend (T) -> R) + = resolver(FunctionWrapper.onSuspend(function, true)) + + fun suspendResolver(function: suspend (T, E) -> R) + = resolver(FunctionWrapper.onSuspend(function, true)) + + fun suspendResolver(function: suspend (T, E, W) -> R) + = resolver(FunctionWrapper.onSuspend(function, true)) + + fun suspendResolver(function: suspend (T, E, W, Q) -> R) + = resolver(FunctionWrapper.onSuspend(function, true)) + + fun suspendResolver(function: suspend (T, E, W, Q, A) -> R) + = resolver(FunctionWrapper.onSuspend(function, true)) + + fun suspendResolver(function: suspend (T, E, W, Q, A, S) -> R) + = resolver(FunctionWrapper.onSuspend(function, true)) + fun accessRule(rule: (T, Context) -> Exception?){ val accessRuleAdapter: (T?, Context) -> Exception? = { parent, ctx -> diff --git a/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/execution/ParallelRequestExecutor.kt b/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/execution/ParallelRequestExecutor.kt index 1b557e65..dc41db0a 100644 --- a/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/execution/ParallelRequestExecutor.kt +++ b/src/main/kotlin/com/github/pgutkowski/kgraphql/schema/execution/ParallelRequestExecutor.kt @@ -18,11 +18,8 @@ import com.github.pgutkowski.kgraphql.schema.scalar.serializeScalar import com.github.pgutkowski.kgraphql.schema.structure2.Field import com.github.pgutkowski.kgraphql.schema.structure2.InputValue import com.github.pgutkowski.kgraphql.schema.structure2.Type -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlin.coroutines.CoroutineContext import kotlin.reflect.KProperty1 @@ -51,36 +48,14 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro override suspend fun suspendExecute(plan: ExecutionPlan, variables: VariablesJson, context: Context): String { val root = jsonNodeFactory.objectNode() val data = root.putObject("data") - val channel = Channel>() - val jobs = plan - .map { execution -> - launch(dispatcher) { - try { - val writeOperation = writeOperation( - ctx = ExecutionContext(Variables(schema, variables, execution.variables), context), - node = execution, - operation = execution.field as Field.Function<*, *> - ) - channel.send(execution to writeOperation) - } catch (e: Exception) { - channel.close(e) - } - } - } - .toList() - //intermediate data structure necessary to preserve ordering - val resultMap = mutableMapOf() - repeat(plan.size) { - try { - val (execution, jsonNode) = channel.receive() - resultMap.put(execution, jsonNode) - } catch (e: Exception) { - jobs.forEach { it.cancel() } - throw e - } + val resultMap = plan.toMapAsync() { + writeOperation( + ctx = ExecutionContext(Variables(schema, variables, it.variables), context), + node = it, + operation = it.field as Field.Function<*, *> + ) } - channel.close() for (operation in plan) { data.set(operation.aliasOrKey, resultMap[operation]) @@ -93,6 +68,33 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro suspendExecute(plan, variables, context) } + private suspend fun Collection.toMapAsync(block: suspend (T) -> R): Map = coroutineScope { + val channel = Channel>() + val jobs = map { item -> + launch(dispatcher) { + try { + val res = block(item) + channel.send(item to res) + } catch (e: Exception) { + channel.close(e) + } + } + } + val resultMap = mutableMapOf() + repeat(size) { + try { + val (item, result) = channel.receive() + resultMap[item] = result + } catch (e: Exception) { + jobs.forEach(Job::cancel) + throw e + } + } + + channel.close() + resultMap + } + private suspend fun writeOperation(ctx: ExecutionContext, node: Execution.Node, operation: FunctionWrapper): JsonNode { node.field.checkAccess(null, ctx.requestContext) val operationResult: T? = operation.invoke( @@ -134,9 +136,12 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro //check value, not returnType, because this method can be invoked with element value value is Collection<*> -> { if (returnType.isList()) { - val arrayNode = jsonNodeFactory.arrayNode(value.size) - value.forEach { element -> arrayNode.add(createNode(ctx, element, node, returnType.unwrapList())) } - arrayNode + val valuesMap = value.toMapAsync { + createNode(ctx, it, node, returnType.unwrapList()) + } + value.fold(jsonNodeFactory.arrayNode(value.size)) { array, v -> + array.add(valuesMap[v]) + } } else { throw ExecutionException("Invalid collection value for non collection property") } diff --git a/src/test/kotlin/com/github/pgutkowski/kgraphql/integration/ParallelExecutionTest.kt b/src/test/kotlin/com/github/pgutkowski/kgraphql/integration/ParallelExecutionTest.kt index dd265aa3..4062d309 100644 --- a/src/test/kotlin/com/github/pgutkowski/kgraphql/integration/ParallelExecutionTest.kt +++ b/src/test/kotlin/com/github/pgutkowski/kgraphql/integration/ParallelExecutionTest.kt @@ -1,22 +1,24 @@ package com.github.pgutkowski.kgraphql.integration import com.github.pgutkowski.kgraphql.KGraphQL -import com.github.pgutkowski.kgraphql.assertNoErrors import com.github.pgutkowski.kgraphql.extract import com.github.pgutkowski.kgraphql.deserialize import kotlinx.coroutines.delay import org.hamcrest.CoreMatchers import org.hamcrest.MatcherAssert import org.junit.Test +import kotlin.random.Random class ParallelExecutionTest { + data class AType(val id: Int) + val syncResolversSchema = KGraphQL.schema { repeat(1000) { - query("automated-${it}") { + query("automated-$it") { resolver { -> Thread.sleep(3) - "${it}" + "$it" } } } @@ -24,16 +26,46 @@ class ParallelExecutionTest { val suspendResolverSchema = KGraphQL.schema { repeat(1000) { - query("automated-${it}") { + query("automated-$it") { suspendResolver { -> delay(3) - "${it}" + "$it" + } + } + } + } + + val suspendPropertySchema = KGraphQL.schema { + query("getAll") { + resolver { -> (0..999).map { AType(it) } } + } + type { + property>("children") { + suspendResolver { parent -> + (0..50).map { + delay(Random.nextLong(1, 100)) + AType((parent.id * 10) + it) + } } } } } - val query = "{ " + (0..999).map { "automated-${it}" }.joinToString(", ") + " }" + @Test + fun `Suspendable property resolvers`() { + val query = "{getAll{id,children{id}}}" + val map = deserialize(suspendPropertySchema.execute(query)) + + MatcherAssert.assertThat(map.extract("data/getAll[0]/id"), CoreMatchers.equalTo(0)) + MatcherAssert.assertThat(map.extract("data/getAll[500]/id"), CoreMatchers.equalTo(500)) + MatcherAssert.assertThat(map.extract("data/getAll[766]/id"), CoreMatchers.equalTo(766)) + + MatcherAssert.assertThat(map.extract("data/getAll[5]/children[5]/id"), CoreMatchers.equalTo(55)) + MatcherAssert.assertThat(map.extract("data/getAll[75]/children[9]/id"), CoreMatchers.equalTo(759)) + MatcherAssert.assertThat(map.extract("data/getAll[888]/children[50]/id"), CoreMatchers.equalTo(8930)) + } + + val query = "{ " + (0..999).map { "automated-$it" }.joinToString(", ") + " }" @Test fun `1000 synchronous resolvers sleeping with Thread sleep`(){ diff --git a/src/test/kotlin/com/github/pgutkowski/kgraphql/specification/language/ArgumentsSpecificationTest.kt b/src/test/kotlin/com/github/pgutkowski/kgraphql/specification/language/ArgumentsSpecificationTest.kt index bab1585e..23783f3e 100644 --- a/src/test/kotlin/com/github/pgutkowski/kgraphql/specification/language/ArgumentsSpecificationTest.kt +++ b/src/test/kotlin/com/github/pgutkowski/kgraphql/specification/language/ArgumentsSpecificationTest.kt @@ -30,6 +30,30 @@ class ArgumentsSpecificationTest { }.take(size) } } + property("none") { + suspendResolver { actor -> actor.age } + } + property("one") { + suspendResolver {actor, one: Int -> actor.age + one } + } + property("two") { + suspendResolver { actor, one: Int, two: Int -> actor.age + one + two } + } + property("three") { + suspendResolver { actor, one: Int, two: Int, three: Int -> + actor.age + one + two + three + } + } + property("four") { + suspendResolver { actor, one: Int, two: Int, three: Int, four: Int -> + actor.age + one + two + three + four + } + } + property("five") { + suspendResolver { actor, one: Int, two: Int, three: Int, four: Int, five: Int -> + actor.age + one + two + three + four + five + } + } } } @@ -50,5 +74,31 @@ class ArgumentsSpecificationTest { ) } + @Test + fun `all arguments to suspendResolvers`() { + val request = """ + { + actor { + none + one(one: 1) + two(one: 2, two: 3) + three(one: 4, two: 5, three: 6) + four(one: 7, two: 8, three: 9, four: 10) + five(one: 11, two: 12, three: 13, four: 14, five: 15) + } + } + """.trimIndent() + val response = deserialize(schema.execute(request)) as Map + assertThat(response, equalTo(mapOf( + "data" to mapOf("actor" to mapOf( + "none" to age, + "one" to age + 1, + "two" to age + 2 + 3, + "three" to age + 4 + 5 + 6, + "four" to age + 7 + 8 + 9 + 10, + "five" to age + 11 + 12 + 13 + 14 + 15 + )) + ))) + } }