Skip to content

Commit

Permalink
Run retry policy
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Sep 9, 2024
1 parent ba7171b commit 7d4c2f9
Show file tree
Hide file tree
Showing 34 changed files with 1,000 additions and 71 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ on:
description: name of the test artifact output

jobs:

sdk-test-suite:
if: github.repository_owner == 'restatedev'
runs-on: ubuntu-latest
name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})"
strategy:
matrix:
sdk-test-suite: [ "1.8" ]
sdk-test-suite: [ "2.0" ]
permissions:
contents: read
issues: read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.suspendCancellableCoroutine

internal class ContextImpl internal constructor(private val syscalls: Syscalls) : WorkflowContext {
internal class ContextImpl internal constructor(internal val syscalls: Syscalls) : WorkflowContext {
override fun key(): String {
return this.syscalls.objectKey()
}
Expand Down Expand Up @@ -168,14 +168,11 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

var actionReturnValue: T? = null
var actionFailure: TerminalException? = null
var actionFailure: Throwable? = null
try {
actionReturnValue = block()
} catch (e: TerminalException) {
actionFailure = e
} catch (t: Throwable) {
syscalls.fail(t)
throw CancellationException("Side effect failure", t)
actionFailure = t
}

val exitCallback =
Expand All @@ -189,12 +186,92 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

override fun onCancel(t: Throwable?) {
exitResult.cancel(CancellationException("Suspended", t))
exitResult.cancel(CancellationException(message = null, cause = t))
}
}

if (actionFailure != null) {
syscalls.exitSideEffectBlockWithTerminalException(actionFailure, exitCallback)
syscalls.exitSideEffectBlockWithException(actionFailure, null, exitCallback)
} else {
syscalls.exitSideEffectBlock(
serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback)
}

return serde.deserializeWrappingException(syscalls, exitResult.await())
}

@UsePreviewContext
override suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String,
retryPolicy: RetryPolicy?,
block: suspend () -> T
): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteBuffer>>
->
syscalls.enterSideEffectBlock(
name,
object : EnterSideEffectSyscallCallback {
override fun onSuccess(t: ByteBuffer?) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.complete(t!!)
cont.resume(deferred)
}

override fun onFailure(t: TerminalException) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.completeExceptionally(t)
cont.resume(deferred)
}

override fun onCancel(t: Throwable?) {
cont.cancel(t)
}

override fun onNotExecuted() {
cont.resume(CompletableDeferred())
}
})
}

if (exitResult.isCompleted) {
return serde.deserializeWrappingException(syscalls, exitResult.await())!!
}

var actionReturnValue: T? = null
var actionFailure: Throwable? = null
try {
actionReturnValue = block()
} catch (t: Throwable) {
actionFailure = t
}

val exitCallback =
object : ExitSideEffectSyscallCallback {
override fun onSuccess(t: ByteBuffer?) {
exitResult.complete(t!!)
}

override fun onFailure(t: TerminalException) {
exitResult.completeExceptionally(t)
}

override fun onCancel(t: Throwable?) {
exitResult.cancel(CancellationException(message = null, cause = t))
}
}

if (actionFailure != null) {
val javaRetryPolicy =
retryPolicy?.let {
dev.restate.sdk.common.RetryPolicy.exponential(
it.initialDelay.toJavaDuration(), it.exponentiationFactor)
.setMaxAttempts(it.maxAttempts)
.setMaxDelay(it.maxDelay?.toJavaDuration())
.setMaxDuration(it.maxDuration?.toJavaDuration())
}
syscalls.exitSideEffectBlockWithException(actionFailure, javaRetryPolicy, exitCallback)
} else {
syscalls.exitSideEffectBlock(
serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/** Retry policy configuration. */
@UsePreviewContext
data class RetryPolicy(
/** Initial retry delay for the first retry attempt. */
val initialDelay: Duration,
/** Exponentiation factor to use when computing the next retry delay. */
val exponentiationFactor: Float,
/** Maximum delay between retries. */
val maxDelay: Duration? = null,
/**
* Maximum number of attempts before giving up retrying.
*
* The policy gives up retrying when either at least the given number of attempts is reached, or
* the [maxDuration] (if set) is reached first. If both [maxAttempts] and [maxDuration] are
* `null`, the policy will retry indefinitely.
*
* **Note:** The number of actual retries may be higher than the provided value. This is due to
* the nature of the `run` operation, which executes the closure on the service and sends the
* result afterward to Restate.
*/
val maxAttempts: Int? = null,
/**
* Maximum duration of the retry loop.
*
* The policy gives up retrying when either the retry loop lasted at least for this given max
* duration, or the [maxAttempts] (if set) is reached first. If both [maxAttempts] and
* [maxDuration] are `null`, the policy will retry indefinitely.
*
* **Note:** The real retry loop duration may be higher than the given duration. TThis is due to
* the nature of the `run` operation, which executes the closure on the service and sends the
* result afterward to Restate.
*/
val maxDuration: Duration? = null
) {

@UsePreviewContext
data class Builder
internal constructor(
var initialDelay: Duration = 100.milliseconds,
var exponentiationFactor: Float = 2.0f,
var maxDelay: Duration? = null,
var maxAttempts: Int? = null,
var maxDuration: Duration? = null
) {
fun build() =
RetryPolicy(
initialDelay = initialDelay,
exponentiationFactor = exponentiationFactor,
maxDelay = maxDelay,
maxDuration = maxDuration,
maxAttempts = maxAttempts)
}
}

@UsePreviewContext
fun retryPolicy(init: RetryPolicy.Builder.() -> Unit): RetryPolicy {
val builder = RetryPolicy.Builder()
builder.init()
return builder.build()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

/**
* Opt-in annotation to use the preview of new context features.
*
* In order to use these methods, you **MUST enable the preview context**, through the endpoint
* builders using `enablePreviewContext()`.
*/
@RequiresOptIn
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION)
annotation class UsePreviewContext
30 changes: 29 additions & 1 deletion sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.common.*
import dev.restate.sdk.common.DurablePromiseKey
import dev.restate.sdk.common.Output
import dev.restate.sdk.common.Request
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.common.Target
import dev.restate.sdk.common.syscalls.Syscalls
import java.util.*
Expand Down Expand Up @@ -145,6 +149,21 @@ sealed interface Context {
*/
suspend fun <T : Any?> runBlock(serde: Serde<T>, name: String = "", block: suspend () -> T): T

/**
* Like [runBlock], but using a custom retry policy.
*
* When a retry policy is not specified, the `runBlock` will be retried using the
* [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server), which by
* default retries indefinitely.
*/
@UsePreviewContext
suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String = "",
retryPolicy: RetryPolicy? = null,
block: suspend () -> T
): T

/**
* Create an [Awakeable], addressable through [Awakeable.id].
*
Expand Down Expand Up @@ -229,6 +248,15 @@ suspend inline fun <reified T : Any> Context.runBlock(
return this.runBlock(KtSerdes.json(), name, block)
}

@UsePreviewContext
suspend inline fun <reified T : Any> Context.runBlock(
name: String = "",
retryPolicy: RetryPolicy? = null,
noinline block: suspend () -> T
): T {
return this.runBlock(KtSerdes.json(), name, retryPolicy, block)
}

/**
* Create an [Awakeable] using [KtSerdes.json] deserializer, addressable through [Awakeable.id].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import dev.restate.sdk.core.TestDefinitions.TestInvocationBuilder
import dev.restate.sdk.kotlin.KotlinCoroutinesTests.Companion.testDefinitionForService
import java.util.*
import kotlin.coroutines.coroutineContext
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers

Expand Down Expand Up @@ -75,4 +76,24 @@ class SideEffectTest : SideEffectTestSuite() {
testDefinitionForService<Unit, String>("FailingSideEffect") { ctx, _: Unit ->
ctx.runBlock(name) { throw IllegalStateException(reason) }
}

@OptIn(UsePreviewContext::class)
override fun failingSideEffectWithRetryPolicy(
reason: String,
retryPolicy: dev.restate.sdk.common.RetryPolicy?
) =
testDefinitionForService<Unit, String>("FailingSideEffectWithRetryPolicy") { ctx, _: Unit ->
ctx.runBlock(
retryPolicy =
retryPolicy?.let {
RetryPolicy(
initialDelay = it.initialDelay.toKotlinDuration(),
exponentiationFactor = it.exponentiationFactor,
maxDelay = it.maxDelay?.toKotlinDuration(),
maxDuration = it.maxDuration?.toKotlinDuration(),
maxAttempts = it.maxAttempts)
}) {
throw IllegalStateException(reason)
}
}
}
11 changes: 4 additions & 7 deletions sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class ContextImpl implements ObjectContext, WorkflowContext {

private final Syscalls syscalls;
final Syscalls syscalls;

ContextImpl(Syscalls syscalls) {
this.syscalls = syscalls;
Expand Down Expand Up @@ -163,18 +163,15 @@ public void onCancel(@Nullable Throwable t) {
};

T res = null;
TerminalException failure = null;
Throwable failure = null;
try {
res = action.get();
} catch (TerminalException e) {
failure = e;
} catch (Throwable e) {
syscalls.fail(e);
AbortedExecutionException.sneakyThrow();
failure = e;
}

if (failure != null) {
syscalls.exitSideEffectBlockWithTerminalException(failure, exitCallback);
syscalls.exitSideEffectBlockWithException(failure, null, exitCallback);
} else {
syscalls.exitSideEffectBlock(
Util.serializeWrappingException(syscalls, serde, res), exitCallback);
Expand Down
Loading

0 comments on commit 7d4c2f9

Please sign in to comment.