Skip to content

Commit

Permalink
\pgutkowski#38: Added support for suspendable property resolvers
Browse files Browse the repository at this point in the history
  • Loading branch information
jeggy committed Feb 2, 2019
1 parent e3a02a2 commit 3b697a1
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ class PropertyDSL<T : Any, R>(val name : String, block : PropertyDSL<T, R>.() ->
fun <E, W, Q, A, S>resolver(function: (T, E, W, Q, A, S) -> R)
= resolver(FunctionWrapper.on(function, true))

fun suspendResolver(function: suspend (T) -> R)
= resolver(FunctionWrapper.onSuspend(function, true))

fun <E>suspendResolver(function: suspend (T, E) -> R)
= resolver(FunctionWrapper.onSuspend(function, true))

fun <E, W>suspendResolver(function: suspend (T, E, W) -> R)
= resolver(FunctionWrapper.onSuspend(function, true))

fun <E, W, Q>suspendResolver(function: suspend (T, E, W, Q) -> R)
= resolver(FunctionWrapper.onSuspend(function, true))

fun <E, W, Q, A>suspendResolver(function: suspend (T, E, W, Q, A) -> R)
= resolver(FunctionWrapper.onSuspend(function, true))

fun <E, W, Q, A, S>suspendResolver(function: suspend (T, E, W, Q, A, S) -> R)
= resolver(FunctionWrapper.onSuspend(function, true))

fun accessRule(rule: (T, Context) -> Exception?){

val accessRuleAdapter: (T?, Context) -> Exception? = { parent, ctx ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,14 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
override suspend fun suspendExecute(plan: ExecutionPlan, variables: VariablesJson, context: Context): String {
val root = jsonNodeFactory.objectNode()
val data = root.putObject("data")
val channel = Channel<Pair<Execution, JsonNode>>()
val jobs = plan
.map { execution ->
launch(dispatcher) {
try {
val writeOperation = writeOperation(
ctx = ExecutionContext(Variables(schema, variables, execution.variables), context),
node = execution,
operation = execution.field as Field.Function<*, *>
)
channel.send(execution to writeOperation)
} catch (e: Exception) {
channel.close(e)
}
}
}
.toList()

//intermediate data structure necessary to preserve ordering
val resultMap = mutableMapOf<Execution, JsonNode>()
repeat(plan.size) {
try {
val (execution, jsonNode) = channel.receive()
resultMap.put(execution, jsonNode)
} catch (e: Exception) {
jobs.forEach { it.cancel() }
throw e
}
val resultMap = listToMapAsync(plan) {
writeOperation(
ctx = ExecutionContext(Variables(schema, variables, it.variables), context),
node = it,
operation = it.field as Field.Function<*, *>
)
}
channel.close()

for (operation in plan) {
data.set(operation.aliasOrKey, resultMap[operation])
Expand All @@ -93,6 +71,34 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
suspendExecute(plan, variables, context)
}

private suspend fun <T, R> listToMapAsync(data: Collection<T>, block: suspend (T) -> R): Map<T, R> {
val channel = Channel<Pair<T, R>>()
val jobs = data.map { item ->
launch(dispatcher) {
try {
val res = block(item)
channel.send(item to res)
} catch (e: Exception) {
channel.close(e)
}
}
}

val resultMap = mutableMapOf<T, R>()
repeat(data.size) {
try {
val (item, result) = channel.receive()
resultMap[item] = result
} catch (e: Exception) {
jobs.forEach(Job::cancel)
throw e
}
}
channel.close()

return resultMap
}

private suspend fun <T> writeOperation(ctx: ExecutionContext, node: Execution.Node, operation: FunctionWrapper<T>): JsonNode {
node.field.checkAccess(null, ctx.requestContext)
val operationResult: T? = operation.invoke(
Expand Down Expand Up @@ -134,9 +140,12 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
//check value, not returnType, because this method can be invoked with element value
value is Collection<*> -> {
if (returnType.isList()) {
val arrayNode = jsonNodeFactory.arrayNode(value.size)
value.forEach { element -> arrayNode.add(createNode(ctx, element, node, returnType.unwrapList())) }
arrayNode
val valuesMap = listToMapAsync(value) {
createNode(ctx, it, node, returnType.unwrapList())
}
value.fold(jsonNodeFactory.arrayNode(value.size)) { array, v ->
array.add(valuesMap[v])
}
} else {
throw ExecutionException("Invalid collection value for non collection property")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,71 @@
package com.github.pgutkowski.kgraphql.integration

import com.github.pgutkowski.kgraphql.KGraphQL
import com.github.pgutkowski.kgraphql.assertNoErrors
import com.github.pgutkowski.kgraphql.extract
import com.github.pgutkowski.kgraphql.deserialize
import kotlinx.coroutines.delay
import org.hamcrest.CoreMatchers
import org.hamcrest.MatcherAssert
import org.junit.Test
import kotlin.random.Random

class ParallelExecutionTest {

data class AType(val id: Int)

val syncResolversSchema = KGraphQL.schema {
repeat(1000) {
query("automated-${it}") {
query("automated-$it") {
resolver { ->
Thread.sleep(3)
"${it}"
"$it"
}
}
}
}

val suspendResolverSchema = KGraphQL.schema {
repeat(1000) {
query("automated-${it}") {
query("automated-$it") {
suspendResolver { ->
delay(3)
"${it}"
"$it"
}
}
}
}

val suspendPropertySchema = KGraphQL.schema {
query("getAll") {
resolver { -> (0..999).map { AType(it) } }
}
type<AType> {
property<List<AType>>("children") {
suspendResolver { parent ->
(0..50).map {
delay(Random.nextLong(1, 100))
AType((parent.id * 10) + it)
}
}
}
}
}

val query = "{ " + (0..999).map { "automated-${it}" }.joinToString(", ") + " }"
@Test
fun `Suspendable property resolvers`() {
val query = "{getAll{id,children{id}}}"
val map = deserialize(suspendPropertySchema.execute(query))

MatcherAssert.assertThat(map.extract<Int>("data/getAll[0]/id"), CoreMatchers.equalTo(0))
MatcherAssert.assertThat(map.extract<Int>("data/getAll[500]/id"), CoreMatchers.equalTo(500))
MatcherAssert.assertThat(map.extract<Int>("data/getAll[766]/id"), CoreMatchers.equalTo(766))

MatcherAssert.assertThat(map.extract<Int>("data/getAll[5]/children[5]/id"), CoreMatchers.equalTo(55))
MatcherAssert.assertThat(map.extract<Int>("data/getAll[75]/children[9]/id"), CoreMatchers.equalTo(759))
MatcherAssert.assertThat(map.extract<Int>("data/getAll[888]/children[50]/id"), CoreMatchers.equalTo(8930))
}

val query = "{ " + (0..999).map { "automated-$it" }.joinToString(", ") + " }"

@Test
fun `1000 synchronous resolvers sleeping with Thread sleep`(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@ class ArgumentsSpecificationTest {
}.take(size)
}
}
property<Int>("none") {
suspendResolver { actor -> actor.age }
}
property<Int>("one") {
suspendResolver {actor, one: Int -> actor.age + one }
}
property<Int>("two") {
suspendResolver { actor, one: Int, two: Int -> actor.age + one + two }
}
property<Int>("three") {
suspendResolver { actor, one: Int, two: Int, three: Int ->
actor.age + one + two + three
}
}
property<Int>("four") {
suspendResolver { actor, one: Int, two: Int, three: Int, four: Int ->
actor.age + one + two + three + four
}
}
property<Int>("five") {
suspendResolver { actor, one: Int, two: Int, three: Int, four: Int, five: Int ->
actor.age + one + two + three + four + five
}
}
}
}

Expand All @@ -50,5 +74,31 @@ class ArgumentsSpecificationTest {
)
}

@Test
fun `all arguments to suspendResolvers`() {
val request = """
{
actor {
none
one(one: 1)
two(one: 2, two: 3)
three(one: 4, two: 5, three: 6)
four(one: 7, two: 8, three: 9, four: 10)
five(one: 11, two: 12, three: 13, four: 14, five: 15)
}
}
""".trimIndent()
val response = deserialize(schema.execute(request)) as Map<String, Any>
assertThat(response, equalTo(mapOf<String, Any>(
"data" to mapOf("actor" to mapOf(
"none" to age,
"one" to age + 1,
"two" to age + 2 + 3,
"three" to age + 4 + 5 + 6,
"four" to age + 7 + 8 + 9 + 10,
"five" to age + 11 + 12 + 13 + 14 + 15
))
)))
}

}

0 comments on commit 3b697a1

Please sign in to comment.