diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index e4db3473..677302bb 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -42,14 +42,13 @@ on: description: name of the test artifact output jobs: - sdk-test-suite: if: github.repository_owner == 'restatedev' runs-on: ubuntu-latest name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})" strategy: matrix: - sdk-test-suite: [ "1.8" ] + sdk-test-suite: [ "2.0" ] permissions: contents: read issues: read diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt index dc79fa99..4dcc4159 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt @@ -23,7 +23,7 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.suspendCancellableCoroutine -internal class ContextImpl internal constructor(private val syscalls: Syscalls) : WorkflowContext { +internal class ContextImpl internal constructor(internal val syscalls: Syscalls) : WorkflowContext { override fun key(): String { return this.syscalls.objectKey() } @@ -168,14 +168,11 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls) } var actionReturnValue: T? = null - var actionFailure: TerminalException? = null + var actionFailure: Throwable? = null try { actionReturnValue = block() - } catch (e: TerminalException) { - actionFailure = e } catch (t: Throwable) { - syscalls.fail(t) - throw CancellationException("Side effect failure", t) + actionFailure = t } val exitCallback = @@ -189,12 +186,92 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls) } override fun onCancel(t: Throwable?) { - exitResult.cancel(CancellationException("Suspended", t)) + exitResult.cancel(CancellationException(message = null, cause = t)) } } if (actionFailure != null) { - syscalls.exitSideEffectBlockWithTerminalException(actionFailure, exitCallback) + syscalls.exitSideEffectBlockWithException(actionFailure, null, exitCallback) + } else { + syscalls.exitSideEffectBlock( + serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback) + } + + return serde.deserializeWrappingException(syscalls, exitResult.await()) + } + + @UsePreviewContext + override suspend fun runBlock( + serde: Serde, + name: String, + retryPolicy: RetryPolicy?, + block: suspend () -> T + ): T { + val exitResult = + suspendCancellableCoroutine { cont: CancellableContinuation> + -> + syscalls.enterSideEffectBlock( + name, + object : EnterSideEffectSyscallCallback { + override fun onSuccess(t: ByteBuffer?) { + val deferred: CompletableDeferred = CompletableDeferred() + deferred.complete(t!!) + cont.resume(deferred) + } + + override fun onFailure(t: TerminalException) { + val deferred: CompletableDeferred = CompletableDeferred() + deferred.completeExceptionally(t) + cont.resume(deferred) + } + + override fun onCancel(t: Throwable?) { + cont.cancel(t) + } + + override fun onNotExecuted() { + cont.resume(CompletableDeferred()) + } + }) + } + + if (exitResult.isCompleted) { + return serde.deserializeWrappingException(syscalls, exitResult.await())!! + } + + var actionReturnValue: T? = null + var actionFailure: Throwable? = null + try { + actionReturnValue = block() + } catch (t: Throwable) { + actionFailure = t + } + + val exitCallback = + object : ExitSideEffectSyscallCallback { + override fun onSuccess(t: ByteBuffer?) { + exitResult.complete(t!!) + } + + override fun onFailure(t: TerminalException) { + exitResult.completeExceptionally(t) + } + + override fun onCancel(t: Throwable?) { + exitResult.cancel(CancellationException(message = null, cause = t)) + } + } + + if (actionFailure != null) { + val javaRetryPolicy = + retryPolicy?.let { + dev.restate.sdk.common.RetryPolicy.exponential( + it.initialDelay.toJavaDuration(), it.exponentiationFactor) + .setMaxAttempts(it.maxAttempts) + .setMaxDelay(it.maxDelay?.toJavaDuration()) + .setMaxDuration(it.maxDuration?.toJavaDuration()) + } + syscalls.exitSideEffectBlockWithException(actionFailure, javaRetryPolicy, exitCallback) } else { syscalls.exitSideEffectBlock( serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback) diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt new file mode 100644 index 00000000..4d0595c8 --- /dev/null +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt @@ -0,0 +1,73 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.kotlin + +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +/** Retry policy configuration. */ +@UsePreviewContext +data class RetryPolicy( + /** Initial retry delay for the first retry attempt. */ + val initialDelay: Duration, + /** Exponentiation factor to use when computing the next retry delay. */ + val exponentiationFactor: Float, + /** Maximum delay between retries. */ + val maxDelay: Duration? = null, + /** + * Maximum number of attempts before giving up retrying. + * + * The policy gives up retrying when either at least the given number of attempts is reached, or + * the [maxDuration] (if set) is reached first. If both [maxAttempts] and [maxDuration] are + * `null`, the policy will retry indefinitely. + * + * **Note:** The number of actual retries may be higher than the provided value. This is due to + * the nature of the `run` operation, which executes the closure on the service and sends the + * result afterward to Restate. + */ + val maxAttempts: Int? = null, + /** + * Maximum duration of the retry loop. + * + * The policy gives up retrying when either the retry loop lasted at least for this given max + * duration, or the [maxAttempts] (if set) is reached first. If both [maxAttempts] and + * [maxDuration] are `null`, the policy will retry indefinitely. + * + * **Note:** The real retry loop duration may be higher than the given duration. TThis is due to + * the nature of the `run` operation, which executes the closure on the service and sends the + * result afterward to Restate. + */ + val maxDuration: Duration? = null +) { + + @UsePreviewContext + data class Builder + internal constructor( + var initialDelay: Duration = 100.milliseconds, + var exponentiationFactor: Float = 2.0f, + var maxDelay: Duration? = null, + var maxAttempts: Int? = null, + var maxDuration: Duration? = null + ) { + fun build() = + RetryPolicy( + initialDelay = initialDelay, + exponentiationFactor = exponentiationFactor, + maxDelay = maxDelay, + maxDuration = maxDuration, + maxAttempts = maxAttempts) + } +} + +@UsePreviewContext +fun retryPolicy(init: RetryPolicy.Builder.() -> Unit): RetryPolicy { + val builder = RetryPolicy.Builder() + builder.init() + return builder.build() +} diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/UsePreviewContext.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/UsePreviewContext.kt new file mode 100644 index 00000000..36d12791 --- /dev/null +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/UsePreviewContext.kt @@ -0,0 +1,20 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.kotlin + +/** + * Opt-in annotation to use the preview of new context features. + * + * In order to use these methods, you **MUST enable the preview context**, through the endpoint + * builders using `enablePreviewContext()`. + */ +@RequiresOptIn +@Retention(AnnotationRetention.BINARY) +@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION) +annotation class UsePreviewContext diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt index 7651c863..ccbfafa9 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -8,7 +8,11 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.kotlin -import dev.restate.sdk.common.* +import dev.restate.sdk.common.DurablePromiseKey +import dev.restate.sdk.common.Output +import dev.restate.sdk.common.Request +import dev.restate.sdk.common.Serde +import dev.restate.sdk.common.StateKey import dev.restate.sdk.common.Target import dev.restate.sdk.common.syscalls.Syscalls import java.util.* @@ -145,6 +149,21 @@ sealed interface Context { */ suspend fun runBlock(serde: Serde, name: String = "", block: suspend () -> T): T + /** + * Like [runBlock], but using a custom retry policy. + * + * When a retry policy is not specified, the `runBlock` will be retried using the + * [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server), which by + * default retries indefinitely. + */ + @UsePreviewContext + suspend fun runBlock( + serde: Serde, + name: String = "", + retryPolicy: RetryPolicy? = null, + block: suspend () -> T + ): T + /** * Create an [Awakeable], addressable through [Awakeable.id]. * @@ -229,6 +248,15 @@ suspend inline fun Context.runBlock( return this.runBlock(KtSerdes.json(), name, block) } +@UsePreviewContext +suspend inline fun Context.runBlock( + name: String = "", + retryPolicy: RetryPolicy? = null, + noinline block: suspend () -> T +): T { + return this.runBlock(KtSerdes.json(), name, retryPolicy, block) +} + /** * Create an [Awakeable] using [KtSerdes.json] deserializer, addressable through [Awakeable.id]. * diff --git a/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt b/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt index bb5d67c6..d7923acd 100644 --- a/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt +++ b/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt @@ -20,6 +20,7 @@ import dev.restate.sdk.core.TestDefinitions.TestInvocationBuilder import dev.restate.sdk.kotlin.KotlinCoroutinesTests.Companion.testDefinitionForService import java.util.* import kotlin.coroutines.coroutineContext +import kotlin.time.toKotlinDuration import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Dispatchers @@ -75,4 +76,24 @@ class SideEffectTest : SideEffectTestSuite() { testDefinitionForService("FailingSideEffect") { ctx, _: Unit -> ctx.runBlock(name) { throw IllegalStateException(reason) } } + + @OptIn(UsePreviewContext::class) + override fun failingSideEffectWithRetryPolicy( + reason: String, + retryPolicy: dev.restate.sdk.common.RetryPolicy? + ) = + testDefinitionForService("FailingSideEffectWithRetryPolicy") { ctx, _: Unit -> + ctx.runBlock( + retryPolicy = + retryPolicy?.let { + RetryPolicy( + initialDelay = it.initialDelay.toKotlinDuration(), + exponentiationFactor = it.exponentiationFactor, + maxDelay = it.maxDelay?.toKotlinDuration(), + maxDuration = it.maxDuration?.toKotlinDuration(), + maxAttempts = it.maxAttempts) + }) { + throw IllegalStateException(reason) + } + } } diff --git a/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java b/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java index 50fc06a1..db6a9f5e 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java +++ b/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java @@ -25,7 +25,7 @@ class ContextImpl implements ObjectContext, WorkflowContext { - private final Syscalls syscalls; + final Syscalls syscalls; ContextImpl(Syscalls syscalls) { this.syscalls = syscalls; @@ -163,18 +163,15 @@ public void onCancel(@Nullable Throwable t) { }; T res = null; - TerminalException failure = null; + Throwable failure = null; try { res = action.get(); - } catch (TerminalException e) { - failure = e; } catch (Throwable e) { - syscalls.fail(e); - AbortedExecutionException.sneakyThrow(); + failure = e; } if (failure != null) { - syscalls.exitSideEffectBlockWithTerminalException(failure, exitCallback); + syscalls.exitSideEffectBlockWithException(failure, null, exitCallback); } else { syscalls.exitSideEffectBlock( Util.serializeWrappingException(syscalls, serde, res), exitCallback); diff --git a/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java b/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java new file mode 100644 index 00000000..017210bf --- /dev/null +++ b/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java @@ -0,0 +1,157 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk; + +import dev.restate.sdk.common.RetryPolicy; +import dev.restate.sdk.common.Serde; +import dev.restate.sdk.common.TerminalException; +import dev.restate.sdk.common.function.ThrowingRunnable; +import dev.restate.sdk.common.function.ThrowingSupplier; +import dev.restate.sdk.common.syscalls.EnterSideEffectSyscallCallback; +import dev.restate.sdk.common.syscalls.ExitSideEffectSyscallCallback; +import dev.restate.sdk.common.syscalls.Syscalls; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import org.jspecify.annotations.Nullable; + +/** + * Preview of new context features. Please note that the methods in this class may break between + * minor releases, use with caution! + * + *

In order to use these methods, you MUST enable the preview context, through the + * endpoint builders using {@code enablePreviewContext()}. + */ +public class PreviewContext { + + /** + * Like {@link Context#run(String, Serde, ThrowingSupplier)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + */ + public static T run( + Context ctx, String name, Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) + throws TerminalException { + Syscalls syscalls = ((ContextImpl) ctx).syscalls; + CompletableFuture> enterFut = new CompletableFuture<>(); + syscalls.enterSideEffectBlock( + name, + new EnterSideEffectSyscallCallback() { + @Override + public void onNotExecuted() { + enterFut.complete(new CompletableFuture<>()); + } + + @Override + public void onSuccess(ByteBuffer result) { + enterFut.complete(CompletableFuture.completedFuture(result)); + } + + @Override + public void onFailure(TerminalException t) { + enterFut.complete(CompletableFuture.failedFuture(t)); + } + + @Override + public void onCancel(Throwable t) { + enterFut.cancel(true); + } + }); + + // If a failure was stored, it's simply thrown here + CompletableFuture exitFut = Util.awaitCompletableFuture(enterFut); + if (exitFut.isDone()) { + // We already have a result, we don't need to execute the action + return Util.deserializeWrappingException( + syscalls, serde, Util.awaitCompletableFuture(exitFut)); + } + + ExitSideEffectSyscallCallback exitCallback = + new ExitSideEffectSyscallCallback() { + @Override + public void onSuccess(ByteBuffer result) { + exitFut.complete(result); + } + + @Override + public void onFailure(TerminalException t) { + exitFut.completeExceptionally(t); + } + + @Override + public void onCancel(@Nullable Throwable t) { + exitFut.cancel(true); + } + }; + + T res = null; + Throwable failure = null; + try { + res = action.get(); + } catch (Throwable e) { + failure = e; + } + + if (failure != null) { + syscalls.exitSideEffectBlockWithException(failure, retryPolicy, exitCallback); + } else { + syscalls.exitSideEffectBlock( + Util.serializeWrappingException(syscalls, serde, res), exitCallback); + } + + return Util.deserializeWrappingException(syscalls, serde, Util.awaitCompletableFuture(exitFut)); + } + + /** + * Like {@link Context#run(String, ThrowingRunnable)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + */ + public static void run( + Context ctx, String name, RetryPolicy retryPolicy, ThrowingRunnable runnable) + throws TerminalException { + run( + ctx, + name, + Serde.VOID, + retryPolicy, + () -> { + runnable.run(); + return null; + }); + } + + /** + * Like {@link Context#run(Serde, ThrowingSupplier)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + */ + public static T run( + Context ctx, Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) + throws TerminalException { + return run(ctx, null, serde, retryPolicy, action); + } + + /** + * Like {@link Context#run(ThrowingRunnable)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + */ + public static void run(Context ctx, RetryPolicy retryPolicy, ThrowingRunnable runnable) + throws TerminalException { + run(ctx, null, retryPolicy, runnable); + } +} diff --git a/sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java b/sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java index 76718b8e..60556d9a 100644 --- a/sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java +++ b/sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java @@ -11,6 +11,7 @@ import static dev.restate.sdk.JavaBlockingTests.testDefinitionForService; import static dev.restate.sdk.core.ProtoUtils.GREETER_SERVICE_TARGET; +import dev.restate.sdk.common.RetryPolicy; import dev.restate.sdk.common.Serde; import dev.restate.sdk.core.SideEffectTestSuite; import dev.restate.sdk.core.TestDefinitions.TestInvocationBuilder; @@ -107,4 +108,22 @@ protected TestInvocationBuilder failingSideEffect(String name, String reason) { return null; }); } + + @Override + protected TestInvocationBuilder failingSideEffectWithRetryPolicy( + String reason, RetryPolicy retryPolicy) { + return testDefinitionForService( + "FailingSideEffectWithRetryPolicy", + Serde.VOID, + JsonSerdes.STRING, + (ctx, unused) -> { + PreviewContext.run( + ctx, + retryPolicy, + () -> { + throw new IllegalStateException(reason); + }); + return null; + }); + } } diff --git a/sdk-common/src/main/java/dev/restate/sdk/common/RetryPolicy.java b/sdk-common/src/main/java/dev/restate/sdk/common/RetryPolicy.java new file mode 100644 index 00000000..11bd0da6 --- /dev/null +++ b/sdk-common/src/main/java/dev/restate/sdk/common/RetryPolicy.java @@ -0,0 +1,166 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.common; + +import java.time.Duration; +import java.util.Objects; +import org.jspecify.annotations.Nullable; + +/** Retry policy configuration. */ +public final class RetryPolicy { + + private Duration initialDelay; + private float exponentiationFactor; + private @Nullable Duration maxDelay; + private @Nullable Integer maxAttempts; + private @Nullable Duration maxDuration; + + RetryPolicy(Duration initialDelay, float exponentiationFactor) { + this.initialDelay = initialDelay; + this.exponentiationFactor = exponentiationFactor; + } + + /** Initial retry delay for the first retry attempt. */ + public RetryPolicy setInitialDelay(Duration initialDelay) { + this.initialDelay = initialDelay; + return this; + } + + /** Exponentiation factor to use when computing the next retry delay. */ + public RetryPolicy setExponentiationFactor(float exponentiationFactor) { + this.exponentiationFactor = exponentiationFactor; + return this; + } + + /** Maximum delay between retries. */ + public RetryPolicy setMaxDelay(@Nullable Duration maxDelay) { + this.maxDelay = maxDelay; + return this; + } + + /** + * Maximum number of attempts before giving up retrying. + * + *

The policy gives up retrying when either at least the given number of attempts is reached, + * or the {@code maxDuration} specified with {@link #setMaxDuration(Duration)} (if set) is reached + * first. If both {@link #getMaxAttempts()} and {@link #getMaxDuration()} are {@code null}, the + * policy will retry indefinitely. + * + *

Note: The number of actual retries may be higher than the provided value. This is due + * to the nature of the {@code run} operation, which executes the closure on the service and sends + * the result afterward to Restate. + */ + public RetryPolicy setMaxAttempts(@Nullable Integer maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + /** + * Maximum duration of the retry loop. + * + *

The policy gives up retrying when either the retry loop lasted at least for this given max + * duration, or the {@code maxAttempts} specified with {@link #setMaxAttempts(Integer)} (if set) + * is reached first. If both {@link #getMaxAttempts()} and {@link #getMaxDuration()} are {@code + * null}, the policy will retry indefinitely. + * + *

Note: The real retry loop duration may be higher than the given duration. TThis is + * due to the nature of the {@code run} operation, which executes the closure on the service and + * sends the result afterward to Restate. + */ + public RetryPolicy setMaxDuration(@Nullable Duration maxDuration) { + this.maxDuration = maxDuration; + return this; + } + + /** + * @see #setInitialDelay(Duration) + */ + public Duration getInitialDelay() { + return initialDelay; + } + + /** + * @see #setExponentiationFactor(float) + */ + public float getExponentiationFactor() { + return exponentiationFactor; + } + + /** + * @see #setMaxDelay(Duration) + */ + public @Nullable Duration getMaxDelay() { + return maxDelay; + } + + /** + * @see #setMaxAttempts(Integer) + */ + public @Nullable Integer getMaxAttempts() { + return maxAttempts; + } + + /** + * @see #setMaxDuration(Duration) + */ + public @Nullable Duration getMaxDuration() { + return maxDuration; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof RetryPolicy)) return false; + RetryPolicy that = (RetryPolicy) o; + return Float.compare(exponentiationFactor, that.exponentiationFactor) == 0 + && Objects.equals(initialDelay, that.initialDelay) + && Objects.equals(maxDelay, that.maxDelay) + && Objects.equals(maxAttempts, that.maxAttempts) + && Objects.equals(maxDuration, that.maxDuration); + } + + @Override + public int hashCode() { + return Objects.hash(initialDelay, exponentiationFactor, maxDelay, maxAttempts, maxDuration); + } + + @Override + public String toString() { + return "RunRetryPolicy{" + + "initialDelay=" + + initialDelay + + ", factor=" + + exponentiationFactor + + ", maxDelay=" + + maxDelay + + ", maxAttempts=" + + maxAttempts + + ", maxDuration=" + + maxDuration + + '}'; + } + + /** + * @return a default exponential bounded retry policy + */ + public static RetryPolicy defaultPolicy() { + return exponential(Duration.ofMillis(100), 2.0f) + .setMaxDelay(Duration.ofSeconds(2)) + .setMaxDuration(Duration.ofSeconds(60)); + } + + /** + * @return an unbounded retry policy, with the given initial delay and factor. + * @see #setInitialDelay(Duration) + * @see #setExponentiationFactor(float) + */ + public static RetryPolicy exponential(Duration initialDelay, float factor) { + return new RetryPolicy(initialDelay, factor); + } +} diff --git a/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/Syscalls.java b/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/Syscalls.java index f7128785..4373cb31 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/Syscalls.java +++ b/sdk-common/src/main/java/dev/restate/sdk/common/syscalls/Syscalls.java @@ -9,6 +9,7 @@ package dev.restate.sdk.common.syscalls; import dev.restate.sdk.common.Request; +import dev.restate.sdk.common.RetryPolicy; import dev.restate.sdk.common.Target; import dev.restate.sdk.common.TerminalException; import java.nio.ByteBuffer; @@ -72,9 +73,17 @@ void send( void exitSideEffectBlock(ByteBuffer toWrite, ExitSideEffectSyscallCallback callback); + /** + * @deprecated use {@link #exitSideEffectBlockWithException(Throwable, RetryPolicy, + * ExitSideEffectSyscallCallback)} instead. + */ + @Deprecated(since = "1.1.0", forRemoval = true) void exitSideEffectBlockWithTerminalException( TerminalException toWrite, ExitSideEffectSyscallCallback callback); + void exitSideEffectBlockWithException( + Throwable toWrite, @Nullable RetryPolicy retryPolicy, ExitSideEffectSyscallCallback callback); + void awakeable(SyscallCallback>> callback); void resolveAwakeable(String id, ByteBuffer payload, SyscallCallback requestCallback); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java index 7f62286f..6b1a80fb 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java @@ -8,8 +8,7 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.core; -import static dev.restate.sdk.core.ServiceProtocol.MAX_SERVICE_PROTOCOL_VERSION; -import static dev.restate.sdk.core.ServiceProtocol.MIN_SERVICE_PROTOCOL_VERSION; +import static dev.restate.sdk.core.ServiceProtocol.*; import dev.restate.sdk.common.HandlerType; import dev.restate.sdk.common.ServiceType; @@ -28,11 +27,14 @@ final class EndpointManifest { private final EndpointManifestSchema manifest; public EndpointManifest( - EndpointManifestSchema.ProtocolMode protocolMode, Stream> components) { + EndpointManifestSchema.ProtocolMode protocolMode, + Stream> components, + boolean experimentalContextEnabled) { this.manifest = new EndpointManifestSchema() .withMinProtocolVersion(MIN_SERVICE_PROTOCOL_VERSION.getNumber()) - .withMaxProtocolVersion(MAX_SERVICE_PROTOCOL_VERSION.getNumber()) + .withMaxProtocolVersion( + maxServiceProtocolVersion(experimentalContextEnabled).getNumber()) .withProtocolMode(protocolMode) .withServices( components diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingSyscalls.java b/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingSyscalls.java index fb0e7c22..cffc78c4 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingSyscalls.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingSyscalls.java @@ -9,6 +9,7 @@ package dev.restate.sdk.core; import dev.restate.sdk.common.Request; +import dev.restate.sdk.common.RetryPolicy; import dev.restate.sdk.common.Target; import dev.restate.sdk.common.TerminalException; import dev.restate.sdk.common.syscalls.Deferred; @@ -104,6 +105,15 @@ public void exitSideEffectBlockWithTerminalException( () -> syscalls.exitSideEffectBlockWithTerminalException(toWrite, callback)); } + @Override + public void exitSideEffectBlockWithException( + Throwable toWrite, + @Nullable RetryPolicy retryPolicy, + ExitSideEffectSyscallCallback callback) { + syscallsExecutor.execute( + () -> syscalls.exitSideEffectBlockWithException(toWrite, retryPolicy, callback)); + } + @Override public void awakeable(SyscallCallback>> callback) { syscallsExecutor.execute(() -> syscalls.awakeable(callback)); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java b/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java index 70c86b30..d88679e7 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java @@ -8,24 +8,26 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.core; +import static dev.restate.sdk.core.Util.durationMin; + import com.google.protobuf.ByteString; import com.google.protobuf.MessageLite; import dev.restate.generated.sdk.java.Java; import dev.restate.generated.service.protocol.Protocol; -import dev.restate.sdk.common.AbortedExecutionException; -import dev.restate.sdk.common.InvocationId; -import dev.restate.sdk.common.Request; -import dev.restate.sdk.common.TerminalException; +import dev.restate.sdk.common.*; import dev.restate.sdk.common.syscalls.*; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; +import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.Flow; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jspecify.annotations.Nullable; class InvocationStateMachine implements Flow.Processor { @@ -35,10 +37,12 @@ class InvocationStateMachine implements Flow.Processor 0 + ? this.startMessageDurationSinceLastStoredEntry + : Duration.ZERO; + } + + private int getRetryCountSinceLastStoredEntry() { + // We need to check if this is the first entry we try to commit after replay, and only in this + // case we need to return the info we got from the start message + return this.currentJournalEntryIndex == this.entriesToReplay + ? this.startMessageRetryCountSinceLastStoredEntry + : 0; + } + + // This function rethrows the exception if a retry needs to happen. + private TerminalException rethrowOrConvertToTerminal( + @Nullable RetryPolicy retryPolicy, Throwable t) throws Throwable { + if (retryPolicy != null + && this.negotiatedProtocolVersion.getNumber() + < Protocol.ServiceProtocolVersion.V2.getNumber()) { + throw ProtocolException.unsupportedFeature( + this.negotiatedProtocolVersion, "run retry policy"); + } + + if (retryPolicy == null) { + LOG.trace("The run completed with an exception and no retry policy was provided"); + // Default behavior is always retry + throw t; + } + + Duration retryLoopDuration = + this.getDurationSinceLastStoredEntry() + .plus(Duration.between(Instant.ofEpochMilli(this.sideEffectStart), Instant.now())); + int retryCount = this.getRetryCountSinceLastStoredEntry() + 1; + + if ((retryPolicy.getMaxAttempts() != null && retryPolicy.getMaxAttempts() <= retryCount) + || (retryPolicy.getMaxDuration() != null + && retryPolicy.getMaxDuration().compareTo(retryLoopDuration) <= 0)) { + LOG.trace("The run completed with a retryable exception, but all attempts were exhausted"); + // We need to convert it to TerminalException + return new TerminalException(t.toString()); + } + + // Compute next retry delay and throw it! + Duration nextComputedDelay = + retryPolicy + .getInitialDelay() + .multipliedBy((long) Math.pow(retryPolicy.getExponentiationFactor(), retryCount)); + Duration nextRetryDelay = + retryPolicy.getMaxDelay() != null + ? durationMin(retryPolicy.getMaxDelay(), nextComputedDelay) + : nextComputedDelay; + + this.failWithNextRetryDelay(t, nextRetryDelay); + throw t; + } + void completeSideEffectCallbackWithEntry( Protocol.RunEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) { if (sideEffectEntry.hasFailure()) { diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java b/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java index 33c265fa..72ab63f5 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java @@ -87,4 +87,15 @@ static ProtocolException invalidSideEffectCall() { static ProtocolException unauthorized(Throwable e) { return new ProtocolException("Unauthorized", UNAUTHORIZED_CODE, e); } + + static ProtocolException unsupportedFeature( + Protocol.ServiceProtocolVersion version, String name) { + return new ProtocolException( + "The feature \"" + + name + + "\" is not supported by the negotiated protocol version \"" + + version.getNumber() + + "\". This might be caused by rolling back a Restate setup to a previous version, while using the experimental context.", + UNSUPPORTED_MEDIA_TYPE_CODE); + } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/RestateEndpoint.java b/sdk-core/src/main/java/dev/restate/sdk/core/RestateEndpoint.java index bbb983b8..f161069f 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/RestateEndpoint.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/RestateEndpoint.java @@ -37,17 +37,23 @@ public class RestateEndpoint { private final Tracer tracer; private final RequestIdentityVerifier requestIdentityVerifier; private final EndpointManifest deploymentManifest; + private final boolean experimentalContextEnabled; private RestateEndpoint( EndpointManifestSchema.ProtocolMode protocolMode, Map> services, Tracer tracer, - RequestIdentityVerifier requestIdentityVerifier) { + RequestIdentityVerifier requestIdentityVerifier, + boolean experimentalContextEnabled) { this.services = services; this.tracer = tracer; this.requestIdentityVerifier = requestIdentityVerifier; this.deploymentManifest = - new EndpointManifest(protocolMode, services.values().stream().map(c -> c.service)); + new EndpointManifest( + protocolMode, + services.values().stream().map(c -> c.service), + experimentalContextEnabled); + this.experimentalContextEnabled = experimentalContextEnabled; LOG.info("Registered services: {}", this.services.keySet()); } @@ -64,7 +70,7 @@ public ResolvedEndpointHandler resolve( final Protocol.ServiceProtocolVersion serviceProtocolVersion = ServiceProtocol.parseServiceProtocolVersion(contentType); - if (!ServiceProtocol.isSupported(serviceProtocolVersion)) { + if (!ServiceProtocol.isSupported(serviceProtocolVersion, this.experimentalContextEnabled)) { throw new ProtocolException( String.format( "Service endpoint does not support the service protocol version '%s'.", contentType), @@ -107,7 +113,11 @@ public ResolvedEndpointHandler resolve( // Instantiate state machine, syscall and grpc bridge InvocationStateMachine stateMachine = new InvocationStateMachine( - componentName, fullyQualifiedServiceMethod, span, loggingContextSetter); + componentName, + fullyQualifiedServiceMethod, + span, + loggingContextSetter, + serviceProtocolVersion); return new ResolvedEndpointHandlerImpl( serviceProtocolVersion, stateMachine, handler, svc.options, syscallExecutor); @@ -146,6 +156,7 @@ public static class Builder { private final EndpointManifestSchema.ProtocolMode protocolMode; private RequestIdentityVerifier requestIdentityVerifier; private Tracer tracer = OpenTelemetry.noop().getTracer("NOOP"); + private boolean experimentalContextEnabled = false; public Builder(EndpointManifestSchema.ProtocolMode protocolMode) { this.protocolMode = protocolMode; @@ -166,13 +177,19 @@ public Builder withRequestIdentityVerifier(RequestIdentityVerifier requestIdenti return this; } + public Builder enablePreviewContext() { + this.experimentalContextEnabled = true; + return this; + } + public RestateEndpoint build() { return new RestateEndpoint( this.protocolMode, this.services.stream() .collect(Collectors.toMap(c -> c.service.getServiceName(), Function.identity())), tracer, - requestIdentityVerifier); + requestIdentityVerifier, + experimentalContextEnabled); } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java b/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java index 0905a126..66db14db 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java @@ -19,7 +19,7 @@ class ServiceProtocol { static final Protocol.ServiceProtocolVersion MIN_SERVICE_PROTOCOL_VERSION = Protocol.ServiceProtocolVersion.V1; - static final Protocol.ServiceProtocolVersion MAX_SERVICE_PROTOCOL_VERSION = + private static final Protocol.ServiceProtocolVersion MAX_SERVICE_PROTOCOL_VERSION = Protocol.ServiceProtocolVersion.V1; static final Discovery.ServiceDiscoveryProtocolVersion MIN_SERVICE_DISCOVERY_PROTOCOL_VERSION = @@ -33,6 +33,9 @@ static Protocol.ServiceProtocolVersion parseServiceProtocolVersion(String versio if (version.equals("application/vnd.restate.invocation.v1")) { return Protocol.ServiceProtocolVersion.V1; } + if (version.equals("application/vnd.restate.invocation.v2")) { + return Protocol.ServiceProtocolVersion.V2; + } return Protocol.ServiceProtocolVersion.SERVICE_PROTOCOL_VERSION_UNSPECIFIED; } @@ -40,13 +43,25 @@ static String serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersio if (Objects.requireNonNull(version) == Protocol.ServiceProtocolVersion.V1) { return "application/vnd.restate.invocation.v1"; } + if (Objects.requireNonNull(version) == Protocol.ServiceProtocolVersion.V2) { + return "application/vnd.restate.invocation.v2"; + } throw new IllegalArgumentException( String.format("Service protocol version '%s' has no header value", version.getNumber())); } - static boolean isSupported(Protocol.ServiceProtocolVersion serviceProtocolVersion) { + static Protocol.ServiceProtocolVersion maxServiceProtocolVersion( + boolean experimentalContextEnabled) { + return experimentalContextEnabled + ? Protocol.ServiceProtocolVersion.V2 + : Protocol.ServiceProtocolVersion.V1; + } + + static boolean isSupported( + Protocol.ServiceProtocolVersion serviceProtocolVersion, boolean experimentalContextEnabled) { return MIN_SERVICE_PROTOCOL_VERSION.getNumber() <= serviceProtocolVersion.getNumber() - && serviceProtocolVersion.getNumber() <= MAX_SERVICE_PROTOCOL_VERSION.getNumber(); + && serviceProtocolVersion.getNumber() + <= maxServiceProtocolVersion(experimentalContextEnabled).getNumber(); } static boolean isSupported( diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java index b89bee40..1d702c3c 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java @@ -13,8 +13,10 @@ import com.google.protobuf.ByteString; import dev.restate.generated.service.protocol.Protocol; import dev.restate.sdk.common.Request; +import dev.restate.sdk.common.RetryPolicy; import dev.restate.sdk.common.Target; import dev.restate.sdk.common.TerminalException; +import dev.restate.sdk.common.function.ThrowingRunnable; import dev.restate.sdk.common.syscalls.*; import dev.restate.sdk.core.DeferredResults.SingleDeferredInternal; import dev.restate.sdk.core.Entries.*; @@ -261,6 +263,19 @@ public void exitSideEffectBlockWithTerminalException( callback); } + @Override + public void exitSideEffectBlockWithException( + Throwable runException, + @Nullable RetryPolicy retryPolicy, + ExitSideEffectSyscallCallback callback) { + wrapAndPropagateExceptions( + () -> { + LOG.trace("exitSideEffectBlock with exception"); + this.stateMachine.exitSideEffectBlockWithThrowable(runException, retryPolicy, callback); + }, + callback); + } + @Override public void awakeable(SyscallCallback>> callback) { wrapAndPropagateExceptions( @@ -428,7 +443,7 @@ public void fail(Throwable cause) { // -- Wrapper for failure propagation - private void wrapAndPropagateExceptions(Runnable r, SyscallCallback handler) { + private void wrapAndPropagateExceptions(ThrowingRunnable r, SyscallCallback handler) { try { r.run(); } catch (Throwable e) { diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/Util.java b/sdk-core/src/main/java/dev/restate/sdk/core/Util.java index c556f2b7..25ddecb1 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/Util.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/Util.java @@ -18,6 +18,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -167,4 +168,8 @@ static boolean isEntry(MessageLite msg) { static ByteString nioBufferToProtobufBuffer(ByteBuffer nioBuffer) { return UnsafeByteOperations.unsafeWrap(nioBuffer); } + + static Duration durationMin(Duration a, Duration b) { + return (a.compareTo(b) <= 0) ? a : b; + } } diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java b/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java index 0dc1aeea..e144b8bc 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java @@ -80,7 +80,8 @@ public static EndpointManifestSchemaAssert assertThatDiscovery(Object... service } return RestateEndpoint.discoverServiceDefinitionFactory(svc).create(svc); - })) + }), + false) .manifest(), EndpointManifestSchemaAssert.class); } diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java index 92b6434d..5b4ba80c 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java @@ -37,7 +37,8 @@ void handleWithMultipleServices() { HandlerDefinition.of( HandlerSpecification.of( "greet", HandlerType.EXCLUSIVE, Serde.VOID, Serde.VOID), - null))))); + null)))), + false); EndpointManifestSchema manifest = deploymentManifest.manifest(); diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/MockMultiThreaded.java b/sdk-core/src/test/java/dev/restate/sdk/core/MockMultiThreaded.java index 475c5dc0..9698b963 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/MockMultiThreaded.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/MockMultiThreaded.java @@ -11,7 +11,6 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.protobuf.MessageLite; -import dev.restate.generated.service.protocol.Protocol; import dev.restate.sdk.common.syscalls.ServiceDefinition; import dev.restate.sdk.core.manifest.EndpointManifestSchema; import io.smallrye.mutiny.Multi; @@ -47,12 +46,16 @@ public void executeTest(TestDefinitions.TestDefinition definition) { .bind( (ServiceDefinition) serviceDefinition, definition.getServiceOptions()); + if (definition.isEnablePreviewContext()) { + builder.enablePreviewContext(); + } RestateEndpoint server = builder.build(); // Start invocation ResolvedEndpointHandler handler = server.resolve( - ServiceProtocol.serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersion.V1), + ServiceProtocol.serviceProtocolVersionToHeaderValue( + ServiceProtocol.maxServiceProtocolVersion(definition.isEnablePreviewContext())), serviceDefinition.getServiceName(), definition.getMethod(), k -> null, diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/MockSingleThread.java b/sdk-core/src/test/java/dev/restate/sdk/core/MockSingleThread.java index f1c568b4..55d2f324 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/MockSingleThread.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/MockSingleThread.java @@ -11,7 +11,6 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.protobuf.MessageLite; -import dev.restate.generated.service.protocol.Protocol; import dev.restate.sdk.common.syscalls.ServiceDefinition; import dev.restate.sdk.core.TestDefinitions.TestDefinition; import dev.restate.sdk.core.TestDefinitions.TestExecutor; @@ -44,12 +43,16 @@ public void executeTest(TestDefinition definition) { .bind( (ServiceDefinition) serviceDefinition, definition.getServiceOptions()); + if (definition.isEnablePreviewContext()) { + builder.enablePreviewContext(); + } RestateEndpoint server = builder.build(); // Start invocation ResolvedEndpointHandler handler = server.resolve( - ServiceProtocol.serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersion.V1), + ServiceProtocol.serviceProtocolVersionToHeaderValue( + ServiceProtocol.maxServiceProtocolVersion(definition.isEnablePreviewContext())), serviceDefinition.getServiceName(), definition.getMethod(), k -> null, diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java b/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java index 03e5c1a7..8390d310 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java @@ -31,6 +31,11 @@ public static String serviceProtocolContentTypeHeader() { return ServiceProtocol.serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersion.V1); } + public static String serviceProtocolContentTypeHeader(boolean enableContextPreview) { + return ServiceProtocol.serviceProtocolVersionToHeaderValue( + ServiceProtocol.maxServiceProtocolVersion(enableContextPreview)); + } + public static String serviceProtocolDiscoveryContentTypeHeader() { return ServiceProtocol.serviceDiscoveryProtocolVersionToHeaderValue( Discovery.ServiceDiscoveryProtocolVersion.V1); diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/SideEffectTestSuite.java b/sdk-core/src/test/java/dev/restate/sdk/core/SideEffectTestSuite.java index f8cabc4e..10048b30 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/SideEffectTestSuite.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/SideEffectTestSuite.java @@ -17,7 +17,9 @@ import com.google.protobuf.ByteString; import dev.restate.generated.service.protocol.Protocol; +import dev.restate.sdk.common.RetryPolicy; import dev.restate.sdk.common.TerminalException; +import java.time.Duration; import java.util.stream.Stream; public abstract class SideEffectTestSuite implements TestDefinitions.TestSuite { @@ -34,6 +36,9 @@ public abstract class SideEffectTestSuite implements TestDefinitions.TestSuite { protected abstract TestInvocationBuilder failingSideEffect(String name, String reason); + protected abstract TestInvocationBuilder failingSideEffectWithRetryPolicy( + String reason, RetryPolicy retryPolicy); + @Override public Stream definitions() { return Stream.of( @@ -106,7 +111,43 @@ public Stream definitions() { "my-side-effect", Protocol.ErrorMessage::getRelatedEntryName) .extracting(Protocol.ErrorMessage::getMessage, STRING) .contains("some failure")))), - + this.failingSideEffectWithRetryPolicy( + "some failure", + RetryPolicy.exponential(Duration.ofMillis(100), 1.0f).setMaxAttempts(2)) + .withInput(startMessage(1).setRetryCountSinceLastStoredEntry(0), inputMessage()) + .enablePreviewContext() + .onlyUnbuffered() + .assertingOutput( + containsOnly( + errorMessage( + errorMessage -> + assertThat(errorMessage) + .returns( + TerminalException.INTERNAL_SERVER_ERROR_CODE, + Protocol.ErrorMessage::getCode) + .returns(1, Protocol.ErrorMessage::getRelatedEntryIndex) + .returns( + (int) MessageType.RunEntryMessage.encode(), + Protocol.ErrorMessage::getRelatedEntryType) + .returns(100L, Protocol.ErrorMessage::getNextRetryDelay) + .extracting(Protocol.ErrorMessage::getMessage, STRING) + .contains("java.lang.IllegalStateException: some failure")))) + .named("Should fail as retryable error with the attached next retry delay"), + this.failingSideEffectWithRetryPolicy( + "some failure", + RetryPolicy.exponential(Duration.ofMillis(100), 1.0f).setMaxAttempts(2)) + .withInput( + startMessage(1).setRetryCountSinceLastStoredEntry(1), inputMessage(), ackMessage(1)) + .enablePreviewContext() + .onlyUnbuffered() + .expectingOutput( + Protocol.RunEntryMessage.newBuilder() + .setFailure( + Util.toProtocolFailure( + 500, "java.lang.IllegalStateException: some failure")), + outputMessage(500, "java.lang.IllegalStateException: some failure"), + END_MESSAGE) + .named("Should convert retryable error to terminal"), // --- Other tests this.checkContextSwitching() .withInput(startMessage(1), inputMessage(), ackMessage(1)) diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/TestDefinitions.java b/sdk-core/src/test/java/dev/restate/sdk/core/TestDefinitions.java index f6899717..46a2e437 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/TestDefinitions.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/TestDefinitions.java @@ -35,6 +35,8 @@ public interface TestDefinition { boolean isOnlyUnbuffered(); + boolean isEnablePreviewContext(); + List getInput(); Consumer> getOutputAssert(); @@ -132,6 +134,7 @@ public WithInputBuilder withInput(MessageLiteOrBuilder... messages) { public static class WithInputBuilder extends TestInvocationBuilder { private final List input; private boolean onlyUnbuffered = false; + private boolean enablePreviewContext = false; WithInputBuilder(@Nullable String invalidReason) { super(invalidReason); @@ -167,6 +170,11 @@ public WithInputBuilder onlyUnbuffered() { return this; } + public WithInputBuilder enablePreviewContext() { + this.enablePreviewContext = true; + return this; + } + public ExpectingOutputMessages expectingOutput(MessageLiteOrBuilder... messages) { List builtMessages = Arrays.stream(messages).map(ProtoUtils::build).collect(Collectors.toList()); @@ -175,7 +183,14 @@ public ExpectingOutputMessages expectingOutput(MessageLiteOrBuilder... messages) public ExpectingOutputMessages assertingOutput(Consumer> messages) { return new ExpectingOutputMessages( - service, options, invalidReason, handler, input, onlyUnbuffered, messages); + service, + options, + invalidReason, + handler, + input, + onlyUnbuffered, + enablePreviewContext, + messages); } } @@ -186,6 +201,7 @@ public abstract static class BaseTestDefinition implements TestDefinition { protected final String method; protected final List input; protected final boolean onlyUnbuffered; + protected final boolean enablePreviewContext; protected final String named; private BaseTestDefinition( @@ -195,6 +211,7 @@ private BaseTestDefinition( String method, List input, boolean onlyUnbuffered, + boolean enablePreviewContext, String named) { this.service = service; this.options = options; @@ -202,6 +219,7 @@ private BaseTestDefinition( this.method = method; this.input = input; this.onlyUnbuffered = onlyUnbuffered; + this.enablePreviewContext = enablePreviewContext; this.named = named; } @@ -230,6 +248,11 @@ public boolean isOnlyUnbuffered() { return onlyUnbuffered; } + @Override + public boolean isEnablePreviewContext() { + return enablePreviewContext; + } + @Override public String getTestCaseName() { return this.named; @@ -252,6 +275,7 @@ private ExpectingOutputMessages( String method, List input, boolean onlyUnbuffered, + boolean enablePreviewContext, Consumer> messagesAssert) { super( service, @@ -260,6 +284,7 @@ private ExpectingOutputMessages( method, input, onlyUnbuffered, + enablePreviewContext, service != null ? service.getServiceName() + "#" + method : "Unknown"); this.messagesAssert = messagesAssert; } @@ -271,9 +296,18 @@ private ExpectingOutputMessages( String method, List input, boolean onlyUnbuffered, + boolean enablePreviewContext, Consumer> messagesAssert, String named) { - super(service, options, invalidReason, method, input, onlyUnbuffered, named); + super( + service, + options, + invalidReason, + method, + input, + onlyUnbuffered, + enablePreviewContext, + named); this.messagesAssert = messagesAssert; } @@ -285,6 +319,7 @@ public ExpectingOutputMessages named(String name) { method, input, onlyUnbuffered, + enablePreviewContext, messagesAssert, this.named + ": " + name); } diff --git a/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RestateHttpEndpointBuilder.java b/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RestateHttpEndpointBuilder.java index fc3ec967..88942fc6 100644 --- a/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RestateHttpEndpointBuilder.java +++ b/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RestateHttpEndpointBuilder.java @@ -121,6 +121,11 @@ public RestateHttpEndpointBuilder withRequestIdentityVerifier( return this; } + public RestateHttpEndpointBuilder enablePreviewContext() { + this.endpointBuilder.enablePreviewContext(); + return this; + } + /** * Build and listen on the specified port. * diff --git a/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/HttpVertxTestExecutor.kt b/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/HttpVertxTestExecutor.kt index 135171a4..08203fa9 100644 --- a/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/HttpVertxTestExecutor.kt +++ b/sdk-http-vertx/src/test/kotlin/dev/restate/sdk/http/vertx/HttpVertxTestExecutor.kt @@ -36,12 +36,17 @@ class HttpVertxTestExecutor(private val vertx: Vertx) : TestExecutor { override fun executeTest(definition: TestDefinition) { runBlocking(vertx.dispatcher()) { // Build server - val server = + val serverBuilder = RestateHttpEndpointBuilder.builder(vertx) .withOptions(HttpServerOptions().setPort(0)) .bind( definition.serviceDefinition as ServiceDefinition, definition.serviceOptions) - .build() + if (definition.isEnablePreviewContext()) { + serverBuilder.enablePreviewContext() + } + + // Start server + val server = serverBuilder.build() server.listen().coAwait() val client = vertx.createHttpClient(RestateHttpEndpointTest.HTTP_CLIENT_OPTIONS) @@ -58,8 +63,12 @@ class HttpVertxTestExecutor(private val vertx: Vertx) : TestExecutor { // Prepare request header and send them request .setChunked(true) - .putHeader(HttpHeaders.CONTENT_TYPE, ProtoUtils.serviceProtocolContentTypeHeader()) - .putHeader(HttpHeaders.ACCEPT, ProtoUtils.serviceProtocolContentTypeHeader()) + .putHeader( + HttpHeaders.CONTENT_TYPE, + ProtoUtils.serviceProtocolContentTypeHeader(definition.isEnablePreviewContext)) + .putHeader( + HttpHeaders.ACCEPT, + ProtoUtils.serviceProtocolContentTypeHeader(definition.isEnablePreviewContext)) request.sendHead().coAwait() launch { diff --git a/sdk-lambda/src/main/java/dev/restate/sdk/lambda/RestateLambdaEndpointBuilder.java b/sdk-lambda/src/main/java/dev/restate/sdk/lambda/RestateLambdaEndpointBuilder.java index 0342c861..f1e61a79 100644 --- a/sdk-lambda/src/main/java/dev/restate/sdk/lambda/RestateLambdaEndpointBuilder.java +++ b/sdk-lambda/src/main/java/dev/restate/sdk/lambda/RestateLambdaEndpointBuilder.java @@ -71,6 +71,11 @@ public RestateLambdaEndpointBuilder withRequestIdentityVerifier( return this; } + public RestateLambdaEndpointBuilder enablePreviewContext() { + this.restateEndpoint.enablePreviewContext(); + return this; + } + /** Build the {@link RestateLambdaEndpoint} serving the Restate service endpoint. */ public RestateLambdaEndpoint build() { return new RestateLambdaEndpoint(this.restateEndpoint.build(), this.openTelemetry); diff --git a/settings.gradle.kts b/settings.gradle.kts index dcbf6201..b1986cf7 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -89,7 +89,7 @@ dependencyResolutionManagement { library("kotlinx-serialization-json", "org.jetbrains.kotlinx", "kotlinx-serialization-json") .version("1.6.3") - version("ksp", "2.0.0-1.0.21") + version("ksp", "2.0.0-1.0.24") library("symbol-processing-api", "com.google.devtools.ksp", "symbol-processing-api") .versionRef("ksp") plugin("ksp", "com.google.devtools.ksp").versionRef("ksp") diff --git a/test-services/README.md b/test-services/README.md index 4040d71d..024d2d84 100644 --- a/test-services/README.md +++ b/test-services/README.md @@ -1,11 +1,11 @@ -# Java services +# Java SDK test services ## Running the services -The Java services can be run via: +The services can be run via: ```shell SERVICES= gradle run ``` -For the list of supported services see [here](src/main/java/my/restate/e2e/services/Main.java). \ No newline at end of file +For the list of supported services see [here](src/main/kotlin/dev/restate/sdk/testservices/Main.kt). \ No newline at end of file diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt index b58f487e..3d6a4b7f 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt @@ -10,10 +10,13 @@ package dev.restate.sdk.testservices import dev.restate.sdk.common.TerminalException import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.UsePreviewContext +import dev.restate.sdk.kotlin.retryPolicy import dev.restate.sdk.kotlin.runBlock import dev.restate.sdk.testservices.contracts.Failing import dev.restate.sdk.testservices.contracts.FailingClient import java.util.concurrent.atomic.AtomicInteger +import kotlin.time.Duration.Companion.milliseconds import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger @@ -24,6 +27,7 @@ class FailingImpl : Failing { private val eventualSuccessCalls = AtomicInteger(0) private val eventualSuccessSideEffectCalls = AtomicInteger(0) + private val eventualFailureSideEffectCalls = AtomicInteger(0) override suspend fun terminallyFailingCall(context: ObjectContext, errorMessage: String) { LOG.info("Invoked fail") @@ -55,24 +59,54 @@ class FailingImpl : Failing { } } - override suspend fun failingSideEffectWithEventualSuccess(context: ObjectContext): Int { - val successAttempt: Int = - context.runBlock { - val currentAttempt = eventualSuccessSideEffectCalls.incrementAndGet() - if (currentAttempt >= 4) { - eventualSuccessSideEffectCalls.set(0) - return@runBlock currentAttempt - } else { - throw IllegalArgumentException("Failed at attempt: $currentAttempt") - } - } - - return successAttempt - } - override suspend fun terminallyFailingSideEffect(context: ObjectContext, errorMessage: String) { context.runBlock { throw TerminalException(errorMessage) } throw IllegalStateException("Should not be reached.") } + + @OptIn(UsePreviewContext::class) + override suspend fun sideEffectSucceedsAfterGivenAttempts( + context: ObjectContext, + minimumAttempts: Int + ): Int = + context.runBlock( + name = "failing_side_effect", + retryPolicy = + retryPolicy { + initialDelay = 10.milliseconds + exponentiationFactor = 1.0f + }) { + val currentAttempt = eventualSuccessSideEffectCalls.incrementAndGet() + if (currentAttempt >= 4) { + eventualSuccessSideEffectCalls.set(0) + return@runBlock currentAttempt + } else { + throw IllegalArgumentException("Failed at attempt: $currentAttempt") + } + } + + @OptIn(UsePreviewContext::class) + override suspend fun sideEffectFailsAfterGivenAttempts( + context: ObjectContext, + retryPolicyMaxRetryCount: Int + ): Int { + try { + context.runBlock( + name = "failing_side_effect", + retryPolicy = + retryPolicy { + initialDelay = 10.milliseconds + exponentiationFactor = 1.0f + maxAttempts = retryPolicyMaxRetryCount + }) { + val currentAttempt = eventualFailureSideEffectCalls.incrementAndGet() + throw IllegalArgumentException("Failed at attempt: $currentAttempt") + } + } catch (_: TerminalException) { + return eventualFailureSideEffectCalls.get() + } + // If I reach this point, the side effect succeeded... + throw TerminalException("Expecting the side effect to fail!") + } } diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt index 15ce2ac8..5bc16431 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt @@ -29,6 +29,8 @@ val KNOWN_SERVICES_FACTORIES: Map Any> = TestUtilsServiceDefinitions.SERVICE_NAME to { TestUtilsServiceImpl() }, ) +val NEEDS_EXPERIMENTAL_CONTEXT: Set = setOf(FailingDefinitions.SERVICE_NAME) + fun main(args: Array) { var env = System.getenv("SERVICES") if (env == null) { @@ -52,5 +54,9 @@ fun main(args: Array) { RestateRequestIdentityVerifier.fromKey(requestSigningKey)) } + if (env == "*" || NEEDS_EXPERIMENTAL_CONTEXT.any { env.contains(it) }) { + restateHttpEndpointBuilder.enablePreviewContext() + } + restateHttpEndpointBuilder.buildAndListen() } diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/Failing.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/Failing.kt index 53ec5e44..dca2e47a 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/Failing.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/Failing.kt @@ -12,7 +12,7 @@ import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.kotlin.ObjectContext -@VirtualObject +@VirtualObject(name = "Failing") interface Failing { @Handler suspend fun terminallyFailingCall(context: ObjectContext, errorMessage: String) @@ -21,7 +21,30 @@ interface Failing { @Handler suspend fun failingCallWithEventualSuccess(context: ObjectContext): Int - @Handler suspend fun failingSideEffectWithEventualSuccess(context: ObjectContext): Int - @Handler suspend fun terminallyFailingSideEffect(context: ObjectContext, errorMessage: String) + + /** + * `minimumAttempts` should be used to check when to succeed. The retry policy should be + * configured to be infinite. + * + * @return the number of executed attempts. In order to implement this count, an atomic counter in + * the service should be used. + */ + @Handler + suspend fun sideEffectSucceedsAfterGivenAttempts( + context: ObjectContext, + minimumAttempts: Int + ): Int + + /** + * `retryPolicyMaxRetryCount` should be used to configure the retry policy. + * + * @return the number of executed attempts. In order to implement this count, an atomic counter in + * the service should be used. + */ + @Handler + suspend fun sideEffectFailsAfterGivenAttempts( + context: ObjectContext, + retryPolicyMaxRetryCount: Int + ): Int }