From e6aab81205bbd5ab43a75161f9329481a8e400a1 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 22 Oct 2024 00:36:00 +0700 Subject: [PATCH 1/3] Add repro unit test --- .../Akka.Tests/Pattern/CircuitBreakerSpec.cs | 156 +++++++++++------- src/core/Akka/Pattern/CircuitBreaker.cs | 2 +- 2 files changed, 96 insertions(+), 62 deletions(-) diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs index 7a3d51eba57..43beec0a511 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -17,6 +17,7 @@ using Akka.Util.Internal; using FluentAssertions; using Xunit; +using static FluentAssertions.FluentActions; namespace Akka.Tests.Pattern { @@ -199,38 +200,45 @@ public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase public async Task Must_allow_calls_through() { var breaker = LongCallTimeoutCb(); - var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); - Assert.Equal("hi", result); + var result = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout); + result.Should().Be("hi"); } [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on exception")] public async Task Must_increment_failure_count_on_exception() { var breaker = LongCallTimeoutCb(); - await InterceptException(() => - breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); - Assert.True(CheckLatch(breaker.OpenLatch)); - breaker.Instance.CurrentFailureCount.ShouldBe(1); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + + CheckLatch(breaker.OpenLatch).Should().BeTrue(); + breaker.Instance.CurrentFailureCount.Should().Be(1); } [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on async failure")] - public void Must_increment_failure_count_on_async_failure() + public async Task Must_increment_failure_count_on_async_failure() { var breaker = LongCallTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.OpenLatch)); - breaker.Instance.CurrentFailureCount.ShouldBe(1); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + + CheckLatch(breaker.OpenLatch).Should().BeTrue(); + breaker.Instance.CurrentFailureCount.Should().Be(1); } [Fact(DisplayName = "An asynchronous circuit breaker that is closed must reset failure count after success")] public async Task Must_reset_failure_count_after_success() { var breaker = MultiFailureCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)); - Enumerable.Range(1, 4).ForEach(_ => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); - await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(4), AwaitTimeout); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)); - await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(0), AwaitTimeout); + _ = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout); + + await Task.WhenAll(Enumerable.Range(1, 4) + .Select(_ + => InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)))); + + breaker.Instance.CurrentFailureCount.Should().Be(4); + + var result = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout); + result.Should().Be("hi"); + breaker.Instance.CurrentFailureCount.ShouldBe(0); } [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on callTimeout")] @@ -238,26 +246,40 @@ public async Task Must_increment_failure_count_on_callTimeout() { var breaker = ShortCallTimeoutCb(); - var future = breaker.Instance.WithCircuitBreaker(async () => - { - await Task.Delay(150); - ThrowException(); - }); + var innerFuture = SlowThrowing(); + var future = breaker.Instance.WithCircuitBreaker(() => innerFuture); - Assert.True(CheckLatch(breaker.OpenLatch)); - breaker.Instance.CurrentFailureCount.ShouldBe(1); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); + breaker.Instance.CurrentFailureCount.Should().Be(1); // Since the timeout should have happened before the inner code finishes // we expect a timeout, not TestException - await InterceptException(() => future.WaitAsync(AwaitTimeout)); + await InterceptException(() => future); + + // Issue https://github.com/akkadotnet/akka.net/issues/7358 + // The actual exception is thrown out-of-band with no handler because inner Task is detached + // after a timeout and NOT protected + + // In the bug, the task is still running when it should've been stopped. + innerFuture.IsCompleted.Should().BeTrue(); + innerFuture.IsFaulted.Should().BeTrue(); + innerFuture.Exception.Should().BeOfType(); + + return; + + async Task SlowThrowing() + { + await Task.Delay(150); + await ThrowExceptionAsync(); + } } [Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")] - public void Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state() + public async Task Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state() { var breaker = ShortCallTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.OpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); } } @@ -267,35 +289,36 @@ public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase public async Task Must_pass_through_next_call_and_close_on_success() { var breaker = ShortResetTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.HalfOpenLatch)); - var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); - Assert.Equal("hi", result); - Assert.True(CheckLatch(breaker.ClosedLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.HalfOpenLatch).Should().BeTrue(); + + var result = await breaker.Instance.WithCircuitBreaker(SayHiAsync).WaitAsync(AwaitTimeout); + result.Should().Be("hi"); + CheckLatch(breaker.ClosedLatch).Should().BeTrue(); } [Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on exception in call")] public async Task Must_reopen_on_exception_in_call() { var breaker = ShortResetTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.HalfOpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.HalfOpenLatch).Should().BeTrue(); + breaker.OpenLatch.Reset(); - await InterceptException(() => - breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); - Assert.True(CheckLatch(breaker.OpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); } [Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on async failure")] - public void Must_reopen_on_async_failure() + public async Task Must_reopen_on_async_failure() { var breaker = ShortResetTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.HalfOpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.HalfOpenLatch).Should().BeTrue(); breaker.OpenLatch.Reset(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.OpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); } } @@ -305,43 +328,40 @@ public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase public async Task Must_throw_exceptions_when_called_before_reset_timeout() { var breaker = LongResetTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - - Assert.True(CheckLatch(breaker.OpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); - await InterceptException( - () => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(SayHiAsync)); } [Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")] - public void Must_transition_to_half_open_on_reset_timeout() + public async Task Must_transition_to_half_open_on_reset_timeout() { var breaker = ShortResetTimeoutCb(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.HalfOpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.HalfOpenLatch).Should().BeTrue(); } [Fact(DisplayName = "An asynchronous circuit breaker that is open must increase the reset timeout after it transits to open again")] public async Task Must_increase_reset_timeout_after_it_transits_to_open_again() { var breaker = NonOneFactorCb(); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.OpenLatch)); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); - var e1 = await InterceptException( - () => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); + var e1 = await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); var shortRemainingDuration = e1.RemainingDuration; await Task.Delay(Dilated(TimeSpan.FromMilliseconds(1000))); - Assert.True(CheckLatch(breaker.HalfOpenLatch)); + CheckLatch(breaker.HalfOpenLatch).Should().BeTrue(); // transit to open again breaker.OpenLatch.Reset(); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); - Assert.True(CheckLatch(breaker.OpenLatch)); + await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); + CheckLatch(breaker.OpenLatch).Should().BeTrue(); - var e2 = await InterceptException(() => - breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayHi()))); + var e2 = await InterceptException(() => breaker.Instance.WithCircuitBreaker(SayHiAsync)); var longRemainingDuration = e2.RemainingDuration; shortRemainingDuration.ShouldBeLessThan(longRemainingDuration); @@ -357,15 +377,29 @@ public class CircuitBreakerSpecBase : AkkaSpec [DebuggerStepThrough] public static void ThrowException() => throw new TestException("Test Exception"); + [DebuggerStepThrough] + public static async Task ThrowExceptionAsync() + { + await Task.Yield(); + throw new TestException("Test Exception"); + } + public static string SayHi() => "hi"; - protected T InterceptException(Action actionThatThrows) where T : Exception => - Intercept(actionThatThrows); + public static async Task SayHiAsync() + { + await Task.Yield(); + return "hi"; + } + + protected static T InterceptException(Action actionThatThrows) where T : Exception => + actionThatThrows.Should().Throw().And; - protected static async Task InterceptException(Func actionThatThrows) + protected async Task InterceptException(Func actionThatThrows) where T : Exception { - return (await actionThatThrows.Should().ThrowExactlyAsync()).And; + return (await Awaiting(() => actionThatThrows().WaitAsync(AwaitTimeout)) + .Should().ThrowExactlyAsync()).And; } public TestBreaker ShortCallTimeoutCb() => diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs index 7c72d0c0e08..9f93d83d3d6 100644 --- a/src/core/Akka/Pattern/CircuitBreaker.cs +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -252,7 +252,7 @@ public void WithSyncCircuitBreaker(Action body) => /// Call needing protected /// The result of the call public T WithSyncCircuitBreaker(Func body) => - WithCircuitBreaker(body, b => Task.Run(b)).Result; + WithCircuitBreaker(body, b => Task.Run(b)).GetAwaiter().GetResult(); /// /// Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the From aeef26a0a69f5c1b087341214774272244fa0574 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 22 Oct 2024 03:40:36 +0700 Subject: [PATCH 2/3] Minimum viable fix --- .../AsyncWriteProxyEx.cs | 15 ++- .../SnapshotStoreProxy.cs | 17 +-- .../Journal/BatchingSqlJournal.cs | 83 ++++++++----- .../Journal/SqlJournal.cs | 35 +++--- .../Snapshot/SqlSnapshotStore.cs | 20 +-- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 8 ++ .../CoreAPISpec.ApproveCore.Net.verified.txt | 8 ++ ...pec.ApprovePersistence.DotNet.verified.txt | 52 ++++---- ...PISpec.ApprovePersistence.Net.verified.txt | 52 ++++---- ...vePersistenceSqlCommon.DotNet.verified.txt | 28 ++--- ...provePersistenceSqlCommon.Net.verified.txt | 28 ++--- .../Utilities/CircuitBreakerDocSpec.cs | 2 +- .../Journal/TestJournal.cs | 8 +- .../SnapshotStore/TestSnapshotStore.cs | 18 +-- .../Journal/ChaosJournal.cs | 7 +- .../Journal/SteppingMemoryJournal.cs | 13 +- .../PersistentActorDeleteFailureSpec.cs | 3 +- .../PersistentActorFailureSpec.cs | 5 +- .../SnapshotFailureRobustnessSpec.cs | 9 +- .../Akka.Persistence/Journal/AsyncRecovery.cs | 4 +- .../Journal/AsyncWriteJournal.cs | 19 +-- .../Journal/AsyncWriteProxy.cs | 16 ++- .../Akka.Persistence/Journal/MemoryJournal.cs | 12 +- .../Snapshot/LocalSnapshotStore.cs | 11 +- .../Snapshot/MemorySnapshotStore.cs | 9 +- .../Snapshot/NoSnapshotStore.cs | 13 +- .../Snapshot/SnapshotStore.cs | 23 ++-- .../Akka.Tests/Pattern/CircuitBreakerSpec.cs | 29 ++--- .../Pattern/CircuitBreakerStressSpec.cs | 5 +- src/core/Akka/Pattern/CircuitBreaker.cs | 29 ++++- src/core/Akka/Pattern/CircuitBreakerState.cs | 115 +++++++++++++++--- src/core/Akka/Util/Internal/AtomicState.cs | 86 +++++++++++-- .../Journal/SqliteJournal.cs | 19 +-- .../Snapshot/SqliteSnapshotStore.cs | 21 ++-- 34 files changed, 541 insertions(+), 281 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs index 040f9701137..b5949b2a542 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs @@ -162,18 +162,19 @@ protected internal override bool AroundReceive(Receive receive, object message) /// TBD /// /// TBD + /// The to stop async operation /// /// This exception is thrown when the store has not been initialized. /// /// TBD - protected override Task> WriteMessagesAsync(IEnumerable messages) + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { var trueMsgs = messages.ToArray(); if (_store == null) return StoreNotInitialized>(); - return _store.Ask(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, CancellationToken.None) + return _store.Ask(sender => new WriteMessages(trueMsgs, sender, 1), Timeout, cancellationToken) .ContinueWith(r => { if (r.IsCanceled) @@ -195,18 +196,19 @@ protected override Task> WriteMessagesAsync(IEnumerabl /// /// TBD /// TBD + /// The to stop async operation /// /// This exception is thrown when the store has not been initialized. /// /// TBD - protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { if (_store == null) return StoreNotInitialized(); var result = new TaskCompletionSource(); - _store.Ask(sender => new DeleteMessagesTo(persistenceId, toSequenceNr, sender), Timeout, CancellationToken.None).ContinueWith(r => + _store.Ask(sender => new DeleteMessagesTo(persistenceId, toSequenceNr, sender), Timeout, cancellationToken).ContinueWith(r => { if (r.IsFaulted) result.TrySetException(r.Exception); @@ -250,18 +252,19 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten /// /// TBD /// TBD + /// The to stop async operation /// /// This exception is thrown when the store has not been initialized. /// /// TBD - public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { if (_store == null) return StoreNotInitialized(); var result = new TaskCompletionSource(); - _store.Ask(sender => new ReplayMessages(0, 0, 0, persistenceId, sender), Timeout, CancellationToken.None) + _store.Ask(sender => new ReplayMessages(0, 0, 0, persistenceId, sender), Timeout, cancellationToken) .ContinueWith(t => { if (t.IsFaulted) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs index 240cf614a77..b4fe857e91d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/SnapshotStoreProxy.cs @@ -7,6 +7,7 @@ using System; using System.Runtime.ExceptionServices; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -89,14 +90,14 @@ protected internal override bool AroundReceive(Receive receive, object message) return true; } - protected async override Task DeleteAsync(SnapshotMetadata metadata) + protected override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { if (_store == null) throw new TimeoutException("Store not intialized."); var s = Sender; try { - var response = await _store.Ask(new DeleteSnapshot(metadata), Timeout); + var response = await _store.Ask(new DeleteSnapshot(metadata), Timeout, cancellationToken); if (response is DeleteSnapshotFailure f) { ExceptionDispatchInfo.Capture(f.Cause).Throw(); @@ -108,14 +109,14 @@ protected async override Task DeleteAsync(SnapshotMetadata metadata) } } - protected async override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { if (_store == null) throw new TimeoutException("Store not intialized."); var s = Sender; try { - var response = await _store.Ask(new DeleteSnapshots(persistenceId, criteria), Timeout); + var response = await _store.Ask(new DeleteSnapshots(persistenceId, criteria), Timeout, cancellationToken); if (response is DeleteSnapshotsFailure f) { ExceptionDispatchInfo.Capture(f.Cause).Throw(); @@ -127,14 +128,14 @@ protected async override Task DeleteAsync(string persistenceId, SnapshotSelectio } } - protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { if (_store == null) throw new TimeoutException("Store not intialized."); var s = Sender; try { - var response = await _store.Ask(new LoadSnapshot(persistenceId, criteria, criteria.MaxSequenceNr), Timeout); + var response = await _store.Ask(new LoadSnapshot(persistenceId, criteria, criteria.MaxSequenceNr), Timeout, cancellationToken); switch (response) { case LoadSnapshotResult ls: @@ -154,14 +155,14 @@ protected override async Task LoadAsync(string persistenceId, throw new TimeoutException(); } - protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default) { if (_store == null) throw new TimeoutException("Store not intialized."); var s = Sender; try { - var response = await _store.Ask(new SaveSnapshot(metadata, snapshot), Timeout); + var response = await _store.Ask(new SaveSnapshot(metadata, snapshot), Timeout, cancellationToken); if (response is SaveSnapshotFailure f) { ExceptionDispatchInfo.Capture(f.Cause).Throw(); diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs index 293ff2b92a2..f5d1f1a5635 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs @@ -16,6 +16,7 @@ using System.Runtime.ExceptionServices; using System.Runtime.Serialization; using System.Text; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; @@ -941,18 +942,20 @@ private void TryProcess() var (chunk, isWrite) = DequeueChunk(_remainingOperations); var context = Context; - _circuitBreaker.WithCircuitBreaker(() => ExecuteChunk(chunk, context, isWrite)) + _circuitBreaker.WithCircuitBreaker(ct => ExecuteChunk(chunk, context, isWrite, ct)) .PipeTo(Self, failure: ex => new ChunkExecutionFailure(ex, chunk.Requests, chunk.ChunkId)); } } - private async Task ExecuteChunk(RequestChunk chunk, IActorContext context, bool isWriteOperation) + private async Task ExecuteChunk(RequestChunk chunk, IActorContext context, bool isWriteOperation, CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + var writeResults = new Queue(); var stopwatch = new Stopwatch(); using (var connection = CreateConnection(Setup.ConnectionString)) { - await connection.OpenAsync(); + await connection.OpenAsync(cancellationToken); // In the grand scheme of thing, using a transaction in an all read batch operation // should not hurt performance by much, because it is done only once at the start. @@ -971,22 +974,22 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext switch (req) { case WriteMessages msg: - writeResults.Enqueue(await HandleWriteMessages(msg, command)); + writeResults.Enqueue(await HandleWriteMessages(msg, command, cancellationToken)); break; case DeleteMessagesTo msg: - await HandleDeleteMessagesTo(msg, command); + await HandleDeleteMessagesTo(msg, command, cancellationToken); break; case ReplayMessages msg: - await HandleReplayMessages(msg, command, context); + await HandleReplayMessages(msg, command, context, cancellationToken); break; case ReplayTaggedMessages msg: - await HandleReplayTaggedMessages(msg, command); + await HandleReplayTaggedMessages(msg, command, cancellationToken); break; case ReplayAllEvents msg: - await HandleReplayAllMessages(msg, command); + await HandleReplayAllMessages(msg, command, cancellationToken); break; case SelectCurrentPersistenceIds msg: - await HandleSelectCurrentPersistenceIds(msg, command); + await HandleSelectCurrentPersistenceIds(msg, command, cancellationToken); break; default: Unhandled(req); @@ -1043,8 +1046,10 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext return new BatchComplete(chunk.ChunkId, chunk.Requests.Length, stopwatch.Elapsed); } - protected virtual async Task HandleDeleteMessagesTo(DeleteMessagesTo req, TCommand command) + protected virtual async Task HandleDeleteMessagesTo(DeleteMessagesTo req, TCommand command, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + var toSequenceNr = req.ToSequenceNr; var persistenceId = req.PersistenceId; @@ -1055,7 +1060,7 @@ protected virtual async Task HandleDeleteMessagesTo(DeleteMessagesTo req, TComma AddParameter(command, "@PersistenceId", DbType.String, persistenceId); AddParameter(command, "@ToSequenceNr", DbType.Int64, toSequenceNr); - await command.ExecuteNonQueryAsync(); + await command.ExecuteNonQueryAsync(cancellationToken); if (highestSequenceNr <= toSequenceNr) { @@ -1065,44 +1070,50 @@ protected virtual async Task HandleDeleteMessagesTo(DeleteMessagesTo req, TComma AddParameter(command, "@PersistenceId", DbType.String, persistenceId); AddParameter(command, "@SequenceNr", DbType.Int64, highestSequenceNr); - await command.ExecuteNonQueryAsync(); + await command.ExecuteNonQueryAsync(cancellationToken); } } - protected virtual async Task ReadHighestSequenceNr(string persistenceId, TCommand command) + protected virtual async Task ReadHighestSequenceNr(string persistenceId, TCommand command, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + command.CommandText = HighestSequenceNrSql; command.Parameters.Clear(); AddParameter(command, "@PersistenceId", DbType.String, persistenceId); - var result = await command.ExecuteScalarAsync(); + var result = await command.ExecuteScalarAsync(cancellationToken); var highestSequenceNr = result is long ? Convert.ToInt64(result) : 0L; return highestSequenceNr; } - protected virtual async Task ReadHighestSequenceNr(TCommand command) + protected virtual async Task ReadHighestSequenceNr(TCommand command, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + command.CommandText = HighestOrderingSql; command.Parameters.Clear(); - var result = await command.ExecuteScalarAsync(); + var result = await command.ExecuteScalarAsync(cancellationToken); var highestSequenceNr = result is long ? Convert.ToInt64(result) : 0L; return highestSequenceNr; } - protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds message, TCommand command) + protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds message, TCommand command, CancellationToken cancellationToken = default) { - long highestOrderingNumber = await ReadHighestSequenceNr(command); + cancellationToken.ThrowIfCancellationRequested(); + + long highestOrderingNumber = await ReadHighestSequenceNr(command, cancellationToken); var result = new List(256); command.CommandText = AllPersistenceIdsSql; command.Parameters.Clear(); AddParameter(command, "@Ordering", DbType.Int64, message.Offset); - using (var reader = await command.ExecuteReaderAsync()) + using (var reader = await command.ExecuteReaderAsync(cancellationToken)) { - while (await reader.ReadAsync()) + while (await reader.ReadAsync(cancellationToken)) { result.Add(reader.GetString(0)); } @@ -1111,8 +1122,10 @@ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPers message.ReplyTo.Tell(new CurrentPersistenceIds(result, highestOrderingNumber)); } - protected virtual async Task HandleReplayTaggedMessages(ReplayTaggedMessages req, TCommand command) + protected virtual async Task HandleReplayTaggedMessages(ReplayTaggedMessages req, TCommand command, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + var replyTo = req.ReplyTo; try @@ -1130,9 +1143,9 @@ protected virtual async Task HandleReplayTaggedMessages(ReplayTaggedMessages req AddParameter(command, "@Ordering", DbType.Int64, fromOffset); AddParameter(command, "@Take", DbType.Int64, take); - using (var reader = await command.ExecuteReaderAsync()) + using (var reader = await command.ExecuteReaderAsync(cancellationToken)) { - while (await reader.ReadAsync()) + while (await reader.ReadAsync(cancellationToken)) { var persistent = ReadEvent(reader); var ordering = reader.GetInt64(OrderingIndex); @@ -1153,8 +1166,10 @@ protected virtual async Task HandleReplayTaggedMessages(ReplayTaggedMessages req } } - protected virtual async Task HandleReplayAllMessages(ReplayAllEvents req, TCommand command) + protected virtual async Task HandleReplayAllMessages(ReplayAllEvents req, TCommand command, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + var replyTo = req.ReplyTo; try @@ -1168,7 +1183,7 @@ protected virtual async Task HandleReplayAllMessages(ReplayAllEvents req, TComma command.CommandText = HighestOrderingSql; command.Parameters.Clear(); - var maxOrdering = (await command.ExecuteScalarAsync()) as long? ?? 0L; + var maxOrdering = (await command.ExecuteScalarAsync(cancellationToken)) as long? ?? 0L; command.CommandText = AllEventsSql; command.Parameters.Clear(); @@ -1176,9 +1191,9 @@ protected virtual async Task HandleReplayAllMessages(ReplayAllEvents req, TComma AddParameter(command, "@Ordering", DbType.Int64, fromOffset); AddParameter(command, "@Take", DbType.Int64, take); - using (var reader = await command.ExecuteReaderAsync()) + using (var reader = await command.ExecuteReaderAsync(cancellationToken)) { - while (await reader.ReadAsync()) + while (await reader.ReadAsync(cancellationToken)) { var persistent = ReadEvent(reader); var ordering = reader.GetInt64(OrderingIndex); @@ -1198,8 +1213,10 @@ protected virtual async Task HandleReplayAllMessages(ReplayAllEvents req, TComma } } - protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand command, IActorContext context) + protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand command, IActorContext context, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + var replaySettings = Setup.ReplayFilterSettings; var replyTo = replaySettings.IsEnabled ? context.ActorOf(ReplayFilter.Props( @@ -1213,7 +1230,7 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c try { - var highestSequenceNr = await ReadHighestSequenceNr(persistenceId, command); + var highestSequenceNr = await ReadHighestSequenceNr(persistenceId, command, cancellationToken); var toSequenceNr = Math.Min(req.ToSequenceNr, highestSequenceNr); command.CommandText = ByPersistenceIdSql; @@ -1223,10 +1240,10 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c AddParameter(command, "@FromSequenceNr", DbType.Int64, req.FromSequenceNr); AddParameter(command, "@ToSequenceNr", DbType.Int64, toSequenceNr); - using (var reader = await command.ExecuteReaderAsync()) + using (var reader = await command.ExecuteReaderAsync(cancellationToken)) { var i = 0L; - while ((i++) < req.Max && await reader.ReadAsync()) + while ((i++) < req.Max && await reader.ReadAsync(cancellationToken)) { var persistent = ReadEvent(reader); @@ -1250,7 +1267,7 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c } } - private async Task HandleWriteMessages(WriteMessages req, TCommand command) + private async Task HandleWriteMessages(WriteMessages req, TCommand command, CancellationToken cancellationToken) { var tags = new HashSet(); var persistenceIds = new HashSet(); @@ -1284,7 +1301,7 @@ private async Task HandleWriteMessages(WriteMessages req, T WriteEvent(command, persistent.WithTimestamp(TimestampProvider.GenerateTimestamp(persistent)), tagBuilder.ToString()); - await command.ExecuteNonQueryAsync(); + await command.ExecuteNonQueryAsync(cancellationToken); persistenceIds.Add(persistent.PersistenceId); } diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs index 967dda87731..8ec9c8619f8 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs @@ -99,15 +99,16 @@ protected override bool ReceivePluginInternal(object message) /// 'akka.persistence.journal.sql-server' scope with 'schema-name' and 'table-name' keys. /// /// TBD + /// The to stop async operation /// TBD /// TBD - protected override async Task> WriteMessagesAsync(IEnumerable messages) + protected override async Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { var writeTasks = messages.Select(async message => { using (var connection = CreateDbConnection()) { - await connection.OpenAsync(); + await connection.OpenAsync(cancellationToken); var eventToTags = new Dictionary>(); var persistentMessages = ((IImmutableList)message.Payload).ToArray(); @@ -126,15 +127,15 @@ protected override async Task> WriteMessagesAsync(IEnu } var batch = new WriteJournalBatch(eventToTags); - using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) - await QueryExecutor.InsertBatchAsync(connection, cancellationToken.Token, batch); + using(var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) + await QueryExecutor.InsertBatchAsync(connection, cts.Token, batch); } }).ToArray(); var result = await Task> .Factory .ContinueWhenAll(writeTasks, - tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList()); + tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList(), cancellationToken); return result; } @@ -148,9 +149,9 @@ protected virtual async Task ReplayTaggedMessagesAsync(ReplayTaggedMessage { using (var connection = CreateDbConnection()) { - await connection.OpenAsync(); using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) { + await connection.OpenAsync(cancellationToken.Token); return await QueryExecutor .SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset, replay.ToOffset, replay.Max, replayedTagged => { foreach(var adapted in AdaptFromJournal(replayedTagged.Persistent)) @@ -166,9 +167,9 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay) { using (var connection = CreateDbConnection()) { - await connection.OpenAsync(); using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) { + await connection.OpenAsync(cancellationToken.Token); return await QueryExecutor .SelectAllEventsAsync( connection, @@ -190,9 +191,9 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay) { using (var connection = CreateDbConnection()) { - await connection.OpenAsync(); using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) { + await connection.OpenAsync(cancellationToken.Token); var lastOrdering = await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token); var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token, offset); return (ids, lastOrdering); @@ -215,9 +216,9 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per { using (var connection = CreateDbConnection()) { - await connection.OpenAsync(); using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) { + await connection.OpenAsync(cancellationToken.Token); await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId, fromSequenceNr, toSequenceNr, max, recoveryCallback); } } @@ -322,15 +323,16 @@ private bool IsTagId(string persistenceId) /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { using (var connection = CreateDbConnection()) { - await connection.OpenAsync(); - using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + await connection.OpenAsync(cancellationToken); + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { - await QueryExecutor.DeleteBatchAsync(connection, cancellationToken.Token, persistenceId, toSequenceNr); + await QueryExecutor.DeleteBatchAsync(connection, cts.Token, persistenceId, toSequenceNr); } } } @@ -340,15 +342,16 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t /// /// TBD /// TBD + /// The to stop async operation /// TBD - public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { using (var connection = CreateDbConnection()) { await connection.OpenAsync(); - using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { - return await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token, persistenceId); + return await QueryExecutor.SelectHighestSequenceNrAsync(connection, cts.Token, persistenceId); } } } diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/SqlSnapshotStore.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/SqlSnapshotStore.cs index 8b3b86dcf95..f708a146a9a 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/SqlSnapshotStore.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/SqlSnapshotStore.cs @@ -167,11 +167,12 @@ protected virtual string GetConnectionString() /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { using (var connection = CreateDbConnection()) - using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(nestedCancellationTokenSource.Token); return await QueryExecutor.SelectSnapshotAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp); @@ -183,11 +184,12 @@ protected override async Task LoadAsync(string persistenceId, /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default) { using (var connection = CreateDbConnection()) - using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(nestedCancellationTokenSource.Token); await QueryExecutor.InsertAsync(connection, nestedCancellationTokenSource.Token, snapshot, metadata); @@ -198,11 +200,12 @@ protected override async Task SaveAsync(SnapshotMetadata metadata, object snapsh /// TBD /// /// TBD + /// The to stop async operation /// TBD - protected override async Task DeleteAsync(SnapshotMetadata metadata) + protected override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { using (var connection = CreateDbConnection()) - using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(nestedCancellationTokenSource.Token); DateTime? timestamp = metadata.Timestamp != DateTime.MinValue ? metadata.Timestamp : default(DateTime?); @@ -215,11 +218,12 @@ protected override async Task DeleteAsync(SnapshotMetadata metadata) /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { using (var connection = CreateDbConnection()) - using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(nestedCancellationTokenSource.Token); await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp); diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 60fa2406c64..1835fbc7373 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -4495,10 +4495,18 @@ namespace Akka.Pattern public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { } public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { } public void Succeed() { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(System.Func> body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(System.Func> body) { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func> body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func> body) { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(System.Func body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(System.Func body) { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func body) { } public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { } public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { } public void WithSyncCircuitBreaker(System.Action body) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index eccf3f67d9b..8739c7aeab9 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -4485,10 +4485,18 @@ namespace Akka.Pattern public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { } public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { } public void Succeed() { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(System.Func> body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(System.Func> body) { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func> body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func> body) { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(System.Func body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(System.Func body) { } + [System.ObsoleteAttribute("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func body) { } public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { } public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { } public void WithSyncCircuitBreaker(System.Action body) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt index a19ffd58af1..cf8b895b952 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt @@ -857,14 +857,14 @@ namespace Akka.Persistence.Journal { protected readonly bool CanPublish; protected AsyncWriteJournal() { } - protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr); - public abstract System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr); + protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null); + public abstract System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null); protected virtual bool Receive(object message) { } protected virtual bool ReceivePluginInternal(object message) { } protected bool ReceiveWriteJournal(object message) { } public abstract System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback); protected static System.Exception TryUnwrapException(System.Exception e) { } - protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages); + protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null); } public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { @@ -873,10 +873,10 @@ namespace Akka.Persistence.Journal public abstract System.TimeSpan Timeout { get; } public override void AroundPreStart() { } protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } - protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } - public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } + protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } + public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } - protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } + protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public class InitTimeout { public static Akka.Persistence.Journal.AsyncWriteProxy.InitTimeout Instance { get; } @@ -955,7 +955,7 @@ namespace Akka.Persistence.Journal } public interface IAsyncRecovery { - System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr); + System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null); System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback); } public interface IEmptyEventSequence : Akka.Persistence.Journal.IEventSequence { } @@ -994,14 +994,14 @@ namespace Akka.Persistence.Journal protected virtual System.Collections.Concurrent.ConcurrentDictionary> Messages { get; } public System.Collections.Generic.IDictionary> Add(Akka.Persistence.IPersistentRepresentation persistent) { } public System.Collections.Generic.IDictionary> Delete(string pid, long seqNr) { } - protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } + protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } public long HighestSequenceNr(string pid) { } public System.Collections.Generic.IEnumerable Read(string pid, long fromSeqNr, long toSeqNr, long max) { } - public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } + public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } protected override bool ReceivePluginInternal(object message) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } public System.Collections.Generic.IDictionary> Update(string pid, long seqNr, System.Func updater) { } - protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } + protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public sealed class CurrentPersistenceIds : Akka.Event.IDeadLetterSuppression { public readonly System.Collections.Generic.IEnumerable AllPersistenceIds; @@ -1170,14 +1170,14 @@ namespace Akka.Persistence.Snapshot public class LocalSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore { public LocalSnapshotStore() { } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected System.IO.FileInfo GetSnapshotFileForWrite(Akka.Persistence.SnapshotMetadata metadata, string extension = "") { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected override void PreStart() { } protected override bool ReceivePluginInternal(object message) { } protected virtual void Save(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } protected void Serialize(System.IO.Stream stream, Akka.Persistence.Serialization.Snapshot snapshot) { } protected System.IO.FileInfo WithOutputStream(Akka.Persistence.SnapshotMetadata metadata, System.Action p) { } } @@ -1185,18 +1185,18 @@ namespace Akka.Persistence.Snapshot { public MemorySnapshotStore() { } protected virtual System.Collections.Generic.List Snapshots { get; } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } } public sealed class NoSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore { public NoSnapshotStore() { } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } public class NoSnapshotStoreException : System.Exception { public NoSnapshotStoreException() { } @@ -1217,11 +1217,11 @@ namespace Akka.Persistence.Snapshot public abstract class SnapshotStore : Akka.Actor.ActorBase { protected SnapshotStore() { } - protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata); - protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria); - protected abstract System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria); + protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null); + protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null); + protected abstract System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null); protected virtual bool Receive(object message) { } protected virtual bool ReceivePluginInternal(object message) { } - protected abstract System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot); + protected abstract System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null); } } \ No newline at end of file diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt index f48cd96cfa4..5f6362c2ed0 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt @@ -857,14 +857,14 @@ namespace Akka.Persistence.Journal { protected readonly bool CanPublish; protected AsyncWriteJournal() { } - protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr); - public abstract System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr); + protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null); + public abstract System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null); protected virtual bool Receive(object message) { } protected virtual bool ReceivePluginInternal(object message) { } protected bool ReceiveWriteJournal(object message) { } public abstract System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback); protected static System.Exception TryUnwrapException(System.Exception e) { } - protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages); + protected abstract System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null); } public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue { @@ -873,10 +873,10 @@ namespace Akka.Persistence.Journal public abstract System.TimeSpan Timeout { get; } public override void AroundPreStart() { } protected override bool AroundReceive(Akka.Actor.Receive receive, object message) { } - protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } - public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } + protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } + public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } - protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } + protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public class InitTimeout { public static Akka.Persistence.Journal.AsyncWriteProxy.InitTimeout Instance { get; } @@ -955,7 +955,7 @@ namespace Akka.Persistence.Journal } public interface IAsyncRecovery { - System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr); + System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null); System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback); } public interface IEmptyEventSequence : Akka.Persistence.Journal.IEventSequence { } @@ -994,14 +994,14 @@ namespace Akka.Persistence.Journal protected virtual System.Collections.Concurrent.ConcurrentDictionary> Messages { get; } public System.Collections.Generic.IDictionary> Add(Akka.Persistence.IPersistentRepresentation persistent) { } public System.Collections.Generic.IDictionary> Delete(string pid, long seqNr) { } - protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } + protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } public long HighestSequenceNr(string pid) { } public System.Collections.Generic.IEnumerable Read(string pid, long fromSeqNr, long toSeqNr, long max) { } - public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } + public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } protected override bool ReceivePluginInternal(object message) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } public System.Collections.Generic.IDictionary> Update(string pid, long seqNr, System.Func updater) { } - protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } + protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } public sealed class CurrentPersistenceIds : Akka.Event.IDeadLetterSuppression { public readonly System.Collections.Generic.IEnumerable AllPersistenceIds; @@ -1168,14 +1168,14 @@ namespace Akka.Persistence.Snapshot public class LocalSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore { public LocalSnapshotStore() { } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected System.IO.FileInfo GetSnapshotFileForWrite(Akka.Persistence.SnapshotMetadata metadata, string extension = "") { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected override void PreStart() { } protected override bool ReceivePluginInternal(object message) { } protected virtual void Save(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } protected void Serialize(System.IO.Stream stream, Akka.Persistence.Serialization.Snapshot snapshot) { } protected System.IO.FileInfo WithOutputStream(Akka.Persistence.SnapshotMetadata metadata, System.Action p) { } } @@ -1183,18 +1183,18 @@ namespace Akka.Persistence.Snapshot { public MemorySnapshotStore() { } protected virtual System.Collections.Generic.List Snapshots { get; } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } } public sealed class NoSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore { public NoSnapshotStore() { } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } public class NoSnapshotStoreException : System.Exception { public NoSnapshotStoreException() { } @@ -1215,11 +1215,11 @@ namespace Akka.Persistence.Snapshot public abstract class SnapshotStore : Akka.Actor.ActorBase { protected SnapshotStore() { } - protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata); - protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria); - protected abstract System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria); + protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null); + protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null); + protected abstract System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null); protected virtual bool Receive(object message) { } protected virtual bool ReceivePluginInternal(object message) { } - protected abstract System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot); + protected abstract System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null); } } \ No newline at end of file diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt index 93780b8f9d2..da0810094a0 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.DotNet.verified.txt @@ -122,17 +122,17 @@ namespace Akka.Persistence.Sql.Common.Journal protected void AddParameter(TCommand command, string paramName, System.Data.DbType dbType, object value) { } protected void BatchRequest(Akka.Persistence.IJournalRequest message) { } protected abstract TConnection CreateConnection(string connectionString); - protected virtual System.Threading.Tasks.Task HandleDeleteMessagesTo(Akka.Persistence.DeleteMessagesTo req, TCommand command) { } - protected virtual System.Threading.Tasks.Task HandleReplayAllMessages(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents req, TCommand command) { } - protected virtual System.Threading.Tasks.Task HandleReplayMessages(Akka.Persistence.ReplayMessages req, TCommand command, Akka.Actor.IActorContext context) { } - protected virtual System.Threading.Tasks.Task HandleReplayTaggedMessages(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages req, TCommand command) { } - protected virtual System.Threading.Tasks.Task HandleSelectCurrentPersistenceIds(Akka.Persistence.Sql.Common.Journal.SelectCurrentPersistenceIds message, TCommand command) { } + protected virtual System.Threading.Tasks.Task HandleDeleteMessagesTo(Akka.Persistence.DeleteMessagesTo req, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleReplayAllMessages(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents req, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleReplayMessages(Akka.Persistence.ReplayMessages req, TCommand command, Akka.Actor.IActorContext context, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleReplayTaggedMessages(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages req, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleSelectCurrentPersistenceIds(Akka.Persistence.Sql.Common.Journal.SelectCurrentPersistenceIds message, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } protected virtual void OnBufferOverflow(Akka.Persistence.IJournalMessage request) { } protected virtual void PreAddParameterToCommand(TCommand command, System.Data.Common.DbParameter param) { } protected override void PreStart() { } protected virtual Akka.Persistence.IPersistentRepresentation ReadEvent(System.Data.Common.DbDataReader reader) { } - protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(string persistenceId, TCommand command) { } - protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(TCommand command) { } + protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(string persistenceId, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(TCommand command, System.Threading.CancellationToken cancellationToken = null) { } protected virtual bool Receive(object message) { } protected virtual void WriteEvent(TCommand command, Akka.Persistence.IPersistentRepresentation persistent, string tags = "") { } } @@ -326,12 +326,12 @@ namespace Akka.Persistence.Sql.Common.Journal public Akka.Actor.IStash Stash { get; set; } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } - protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } + protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } protected virtual string GetConnectionString() { } protected Akka.Persistence.Sql.Common.Journal.ITimestampProvider GetTimestampProvider(string typeName) { } protected override void PostStop() { } protected override void PreStart() { } - public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } + public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } protected override bool ReceivePluginInternal(object message) { } protected virtual System.Threading.Tasks.Task ReplayAllEventsAsync(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents replay) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } @@ -341,7 +341,7 @@ namespace Akka.Persistence.Sql.Common.Journal "LastOrdering"})] protected virtual System.Threading.Tasks.Task, long>> SelectAllPersistenceIdsAsync(long offset) { } protected bool WaitingForInitialization(object message) { } - protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } + protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } } [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeNewEvents : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand @@ -490,12 +490,12 @@ namespace Akka.Persistence.Sql.Common.Snapshot public Akka.Actor.IStash Stash { get; set; } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected virtual string GetConnectionString() { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected override void PostStop() { } protected override void PreStart() { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } } } \ No newline at end of file diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt index 6e8a604e37c..44bc0b1a180 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceSqlCommon.Net.verified.txt @@ -122,17 +122,17 @@ namespace Akka.Persistence.Sql.Common.Journal protected void AddParameter(TCommand command, string paramName, System.Data.DbType dbType, object value) { } protected void BatchRequest(Akka.Persistence.IJournalRequest message) { } protected abstract TConnection CreateConnection(string connectionString); - protected virtual System.Threading.Tasks.Task HandleDeleteMessagesTo(Akka.Persistence.DeleteMessagesTo req, TCommand command) { } - protected virtual System.Threading.Tasks.Task HandleReplayAllMessages(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents req, TCommand command) { } - protected virtual System.Threading.Tasks.Task HandleReplayMessages(Akka.Persistence.ReplayMessages req, TCommand command, Akka.Actor.IActorContext context) { } - protected virtual System.Threading.Tasks.Task HandleReplayTaggedMessages(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages req, TCommand command) { } - protected virtual System.Threading.Tasks.Task HandleSelectCurrentPersistenceIds(Akka.Persistence.Sql.Common.Journal.SelectCurrentPersistenceIds message, TCommand command) { } + protected virtual System.Threading.Tasks.Task HandleDeleteMessagesTo(Akka.Persistence.DeleteMessagesTo req, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleReplayAllMessages(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents req, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleReplayMessages(Akka.Persistence.ReplayMessages req, TCommand command, Akka.Actor.IActorContext context, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleReplayTaggedMessages(Akka.Persistence.Sql.Common.Journal.ReplayTaggedMessages req, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task HandleSelectCurrentPersistenceIds(Akka.Persistence.Sql.Common.Journal.SelectCurrentPersistenceIds message, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } protected virtual void OnBufferOverflow(Akka.Persistence.IJournalMessage request) { } protected virtual void PreAddParameterToCommand(TCommand command, System.Data.Common.DbParameter param) { } protected override void PreStart() { } protected virtual Akka.Persistence.IPersistentRepresentation ReadEvent(System.Data.Common.DbDataReader reader) { } - protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(string persistenceId, TCommand command) { } - protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(TCommand command) { } + protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(string persistenceId, TCommand command, System.Threading.CancellationToken cancellationToken = null) { } + protected virtual System.Threading.Tasks.Task ReadHighestSequenceNr(TCommand command, System.Threading.CancellationToken cancellationToken = null) { } protected virtual bool Receive(object message) { } protected virtual void WriteEvent(TCommand command, Akka.Persistence.IPersistentRepresentation persistent, string tags = "") { } } @@ -326,12 +326,12 @@ namespace Akka.Persistence.Sql.Common.Journal public Akka.Actor.IStash Stash { get; set; } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } - protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { } + protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } protected virtual string GetConnectionString() { } protected Akka.Persistence.Sql.Common.Journal.ITimestampProvider GetTimestampProvider(string typeName) { } protected override void PostStop() { } protected override void PreStart() { } - public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { } + public override System.Threading.Tasks.Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken = null) { } protected override bool ReceivePluginInternal(object message) { } protected virtual System.Threading.Tasks.Task ReplayAllEventsAsync(Akka.Persistence.Sql.Common.Journal.ReplayAllEvents replay) { } public override System.Threading.Tasks.Task ReplayMessagesAsync(Akka.Actor.IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, System.Action recoveryCallback) { } @@ -341,7 +341,7 @@ namespace Akka.Persistence.Sql.Common.Journal "LastOrdering"})] protected virtual System.Threading.Tasks.Task, long>> SelectAllPersistenceIdsAsync(long offset) { } protected bool WaitingForInitialization(object message) { } - protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages) { } + protected override System.Threading.Tasks.Task> WriteMessagesAsync(System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken = null) { } } [System.ObsoleteAttribute("Query is not implemented.", true)] public sealed class SubscribeNewEvents : Akka.Persistence.Sql.Common.Journal.ISubscriptionCommand @@ -490,12 +490,12 @@ namespace Akka.Persistence.Sql.Common.Snapshot public Akka.Actor.IStash Stash { get; set; } protected abstract System.Data.Common.DbConnection CreateDbConnection(string connectionString); public System.Data.Common.DbConnection CreateDbConnection() { } - protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata) { } - protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken = null) { } + protected override System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected virtual string GetConnectionString() { } - protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria) { } + protected override System.Threading.Tasks.Task LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken = null) { } protected override void PostStop() { } protected override void PreStart() { } - protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot) { } + protected override System.Threading.Tasks.Task SaveAsync(Akka.Persistence.SnapshotMetadata metadata, object snapshot, System.Threading.CancellationToken cancellationToken = null) { } } } \ No newline at end of file diff --git a/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs b/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs index 70338bb703b..ea0f1bda2fb 100644 --- a/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs +++ b/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs @@ -52,7 +52,7 @@ public DangerousActorCallProtection() Receive(str => str.Equals("is my middle name"), _ => { var sender = this.Sender; - breaker.WithCircuitBreaker(() => Task.FromResult(dangerousCall)).PipeTo(sender); + breaker.WithCircuitBreaker(cancellationToken => Task.FromResult(dangerousCall)).PipeTo(sender); }); Receive(str => str.Equals("block for me"), _ => diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs index c5e77efca6d..19d7dc727c8 100644 --- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs +++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs @@ -5,6 +5,8 @@ // //----------------------------------------------------------------------- +using System.Threading; + namespace Akka.Persistence.TestKit { using Akka.Actor; @@ -48,7 +50,7 @@ protected override bool ReceivePluginInternal(object message) } } - protected override async Task> WriteMessagesAsync(IEnumerable messages) + protected override async Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { await _connectionInterceptor.InterceptAsync(); var exceptions = new List(); @@ -103,10 +105,10 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per } } - public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { await _connectionInterceptor.InterceptAsync(); - return await base.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr); + return await base.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr, cancellationToken); } /// diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs index de13fc6de64..66a57b4a94f 100644 --- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs +++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs @@ -5,6 +5,8 @@ // //----------------------------------------------------------------------- +using System.Threading; + namespace Akka.Persistence.TestKit { using System.Threading.Tasks; @@ -50,32 +52,32 @@ protected override bool ReceivePluginInternal(object message) } } - protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default) { await _connectionInterceptor.InterceptAsync(); await _saveInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); - await base.SaveAsync(metadata, snapshot); + await base.SaveAsync(metadata, snapshot, cancellationToken); } - protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { await _connectionInterceptor.InterceptAsync(); await _loadInterceptor.InterceptAsync(persistenceId, criteria); - return await base.LoadAsync(persistenceId, criteria); + return await base.LoadAsync(persistenceId, criteria, cancellationToken); } - protected override async Task DeleteAsync(SnapshotMetadata metadata) + protected override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { await _connectionInterceptor.InterceptAsync(); await _deleteInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata)); - await base.DeleteAsync(metadata); + await base.DeleteAsync(metadata, cancellationToken); } - protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { await _connectionInterceptor.InterceptAsync(); await _deleteInterceptor.InterceptAsync(persistenceId, criteria); - await base.DeleteAsync(persistenceId, criteria); + await base.DeleteAsync(persistenceId, criteria, cancellationToken); } static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata) diff --git a/src/core/Akka.Persistence.Tests/Journal/ChaosJournal.cs b/src/core/Akka.Persistence.Tests/Journal/ChaosJournal.cs index 31275a34678..f1fbdb99315 100644 --- a/src/core/Akka.Persistence.Tests/Journal/ChaosJournal.cs +++ b/src/core/Akka.Persistence.Tests/Journal/ChaosJournal.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Persistence.Journal; @@ -83,7 +84,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten return promise.Task; } - public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { var promise = new TaskCompletionSource(); if (ChaosSupportExtensions.ShouldFail(_readHighestFailureRate)) @@ -93,7 +94,7 @@ public override Task ReadHighestSequenceNrAsync(string persistenceId, long return promise.Task; } - protected override Task> WriteMessagesAsync(IEnumerable messages) + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { var promise = new TaskCompletionSource>(); @@ -120,7 +121,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl return promise.Task; } - protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { TaskCompletionSource promise = new TaskCompletionSource(); try diff --git a/src/core/Akka.Persistence.Tests/Journal/SteppingMemoryJournal.cs b/src/core/Akka.Persistence.Tests/Journal/SteppingMemoryJournal.cs index 154afae6d2d..440bb60577c 100644 --- a/src/core/Akka.Persistence.Tests/Journal/SteppingMemoryJournal.cs +++ b/src/core/Akka.Persistence.Tests/Journal/SteppingMemoryJournal.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; @@ -110,14 +111,14 @@ protected override void PostStop() _current.TryRemove(_instanceId, out foo); } - protected override Task> WriteMessagesAsync(IEnumerable messages) + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { var tasks = messages.Select(message => { return WrapAndDoOrEnqueue( () => - base.WriteMessagesAsync(new[] {message}) + base.WriteMessagesAsync(new[] {message}, cancellationToken) .ContinueWith(t => t.Result != null ? t.Result.FirstOrDefault() : null, _continuationOptions | TaskContinuationOptions.OnlyOnRanToCompletion)); }); @@ -127,19 +128,19 @@ protected override Task> WriteMessagesAsync(IEnumerabl _continuationOptions | TaskContinuationOptions.OnlyOnRanToCompletion); } - protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { return WrapAndDoOrEnqueue( () => - base.DeleteMessagesToAsync(persistenceId, toSequenceNr) + base.DeleteMessagesToAsync(persistenceId, toSequenceNr, cancellationToken) .ContinueWith(_ => new object(), _continuationOptions | TaskContinuationOptions.OnlyOnRanToCompletion)); } - public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { - return WrapAndDoOrEnqueue(() => base.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr)); + return WrapAndDoOrEnqueue(() => base.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr, cancellationToken)); } public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, diff --git a/src/core/Akka.Persistence.Tests/PersistentActorDeleteFailureSpec.cs b/src/core/Akka.Persistence.Tests/PersistentActorDeleteFailureSpec.cs index 3e3f1667477..29dc00d66ea 100644 --- a/src/core/Akka.Persistence.Tests/PersistentActorDeleteFailureSpec.cs +++ b/src/core/Akka.Persistence.Tests/PersistentActorDeleteFailureSpec.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -41,7 +42,7 @@ public SimulatedException(string message) : base(message) public class DeleteFailingMemoryJournal : MemoryJournal { - protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { var promise = new TaskCompletionSource(); promise.SetException(new SimulatedException("Boom! Unable to delete events!")); diff --git a/src/core/Akka.Persistence.Tests/PersistentActorFailureSpec.cs b/src/core/Akka.Persistence.Tests/PersistentActorFailureSpec.cs index cc0fdc8023b..1ca1a4d049c 100644 --- a/src/core/Akka.Persistence.Tests/PersistentActorFailureSpec.cs +++ b/src/core/Akka.Persistence.Tests/PersistentActorFailureSpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Persistence.Journal; @@ -49,7 +50,7 @@ public SimulatedSerializationException(string message) : base(message) internal class FailingMemoryJournal : MemoryJournal { - protected override Task> WriteMessagesAsync(IEnumerable messages) + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { var msgs = messages.ToList(); if (IsWrong(msgs)) @@ -59,7 +60,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl { return Task.FromResult(checkSerializable); } - return base.WriteMessagesAsync(msgs); + return base.WriteMessagesAsync(msgs, cancellationToken); } public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, diff --git a/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs b/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs index 1c431edb5e3..07cbc3f8054 100644 --- a/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs +++ b/src/core/Akka.Persistence.Tests/SnapshotFailureRobustnessSpec.cs @@ -9,6 +9,7 @@ using System.IO; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -168,18 +169,18 @@ protected override void Save(SnapshotMetadata metadata, object payload) internal class DeleteFailingLocalSnapshotStore : LocalSnapshotStore { - protected override Task DeleteAsync(SnapshotMetadata metadata) + protected override Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { - base.DeleteAsync(metadata); // we actually delete it properly, but act as if it failed + base.DeleteAsync(metadata, cancellationToken); // we actually delete it properly, but act as if it failed var promise = new TaskCompletionSource(); promise.SetException(new InvalidOperationException("Failed to delete snapshot for some reason.")); return promise.Task; } - protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - base.DeleteAsync(persistenceId, criteria); // we actually delete it properly, but act as if it failed + base.DeleteAsync(persistenceId, criteria, cancellationToken); // we actually delete it properly, but act as if it failed #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed var promise = new TaskCompletionSource(); promise.SetException(new InvalidOperationException("Failed to delete snapshot for some reason.")); diff --git a/src/core/Akka.Persistence/Journal/AsyncRecovery.cs b/src/core/Akka.Persistence/Journal/AsyncRecovery.cs index 4b94f97f3b6..51ce8c01e1d 100644 --- a/src/core/Akka.Persistence/Journal/AsyncRecovery.cs +++ b/src/core/Akka.Persistence/Journal/AsyncRecovery.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -62,7 +63,8 @@ public interface IAsyncRecovery /// Hint where to start searching for the highest sequence number. /// When a persistent actor is recovering this will the sequence /// number of the used snapshot, or `0L` if no snapshot is used. + /// The to stop async operation /// TBD - Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr); + Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default); } } diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index 6f334729059..feb892c4f89 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Pattern; @@ -88,7 +89,7 @@ protected AsyncWriteJournal() public abstract Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback); /// - public abstract Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr); + public abstract Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default); /// /// Plugin API: asynchronously writes a batch of persistent messages to the @@ -161,8 +162,9 @@ protected AsyncWriteJournal() /// This call is protected with a circuit-breaker. /// /// TBD + /// The to stop async operation /// TBD - protected abstract Task> WriteMessagesAsync(IEnumerable messages); + protected abstract Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default); /// /// Asynchronously deletes all persistent messages up to inclusive @@ -170,8 +172,9 @@ protected AsyncWriteJournal() /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr); + protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default); /// /// Plugin API: Allows plugin implementers to use f.PipeTo(Self) @@ -222,8 +225,8 @@ async Task ProcessDelete() { try { - await _breaker.WithCircuitBreaker((message, awj: this), state => - state.awj.DeleteMessagesToAsync(state.message.PersistenceId, state.message.ToSequenceNr)); + await _breaker.WithCircuitBreaker((message, awj: this), (state, ct) => + state.awj.DeleteMessagesToAsync(state.message.PersistenceId, state.message.ToSequenceNr, ct)); message.PersistentActor.Tell(new DeleteMessagesSuccess(message.ToSequenceNr), self); @@ -271,8 +274,8 @@ void CompleteHighSeqNo(long highSeqNo) try { - var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), state => - state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom)); + var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), (state, ct) => + state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom, ct)); var toSequenceNr = Math.Min(message.ToSequenceNr, highSequenceNr); if (toSequenceNr <= 0L || message.FromSequenceNr > toSequenceNr) { @@ -357,7 +360,7 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc try { var writeResult = - await _breaker.WithCircuitBreaker((prepared, awj: this), state => state.awj.WriteMessagesAsync(state.prepared)).ConfigureAwait(false); + await _breaker.WithCircuitBreaker((prepared, awj: this), (state, ct) => state.awj.WriteMessagesAsync(state.prepared, ct)).ConfigureAwait(false); ProcessResults(writeResult, atomicWriteCount, message, _resequencer, resequencerCounter, self); } diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs b/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs index 255d38d3de6..a7aac50c44b 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteProxy.cs @@ -12,6 +12,7 @@ using System.Threading.Tasks; using Akka.Actor; using System.Runtime.Serialization; +using System.Threading; namespace Akka.Persistence.Journal { @@ -324,16 +325,17 @@ protected internal override bool AroundReceive(Receive receive, object message) /// TBD /// /// TBD + /// The to stop async operation /// /// This exception is thrown when the store has not been initialized. /// /// TBD - protected override Task> WriteMessagesAsync(IEnumerable messages) + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { if (_store == null) return StoreNotInitialized>(); - return _store.Ask>(new AsyncWriteTarget.WriteMessages(messages), Timeout); + return _store.Ask>(new AsyncWriteTarget.WriteMessages(messages), Timeout, cancellationToken); } /// @@ -341,16 +343,17 @@ protected override Task> WriteMessagesAsync(IEnumerabl /// /// TBD /// TBD + /// The to stop async operation /// /// This exception is thrown when the store has not been initialized. /// /// TBD - protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { if (_store == null) return StoreNotInitialized(); - return _store.Ask(new AsyncWriteTarget.DeleteMessagesTo(persistenceId, toSequenceNr), Timeout); + return _store.Ask(new AsyncWriteTarget.DeleteMessagesTo(persistenceId, toSequenceNr), Timeout, cancellationToken); } /// @@ -384,16 +387,17 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten /// /// TBD /// TBD + /// The to stop async operation /// /// This exception is thrown when the store has not been initialized. /// /// TBD - public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { if (_store == null) return StoreNotInitialized(); - return _store.Ask(new AsyncWriteTarget.ReplayMessages(persistenceId, 0, 0, 0), Timeout) + return _store.Ask(new AsyncWriteTarget.ReplayMessages(persistenceId, 0, 0, 0), Timeout, cancellationToken) .ContinueWith(t => t.Result.HighestSequenceNr, TaskContinuationOptions.OnlyOnRanToCompletion); } diff --git a/src/core/Akka.Persistence/Journal/MemoryJournal.cs b/src/core/Akka.Persistence/Journal/MemoryJournal.cs index f4271f5f58c..181329543ad 100644 --- a/src/core/Akka.Persistence/Journal/MemoryJournal.cs +++ b/src/core/Akka.Persistence/Journal/MemoryJournal.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -81,8 +82,9 @@ public class MemoryJournal : AsyncWriteJournal /// TBD /// /// TBD + /// The to stop async operation /// TBD - protected override Task> WriteMessagesAsync(IEnumerable messages) + protected override Task> WriteMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { foreach (var w in messages) { @@ -91,7 +93,7 @@ protected override Task> WriteMessagesAsync(IEnumerabl var persistentRepresentation = p.WithTimestamp(DateTime.UtcNow.Ticks); Add(persistentRepresentation); _allMessages.AddLast(persistentRepresentation); - if (!(p.Payload is Tagged tagged)) continue; + if (p.Payload is not Tagged tagged) continue; foreach (var tag in tagged.Tags) { @@ -115,8 +117,9 @@ protected override Task> WriteMessagesAsync(IEnumerabl /// /// TBD /// TBD + /// The to stop async operation /// TBD - public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken = default) { return Task.FromResult(Math.Max(HighestSequenceNr(persistenceId), _meta.TryGetValue(persistenceId, out long metaSeqNr) ? metaSeqNr : 0L)); } @@ -145,8 +148,9 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken = default) { var highestSeqNr = HighestSequenceNr(persistenceId); var toSeqNr = Math.Min(toSequenceNr, highestSeqNr); diff --git a/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs index 86f0cdc4761..4f115060c8c 100644 --- a/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/LocalSnapshotStore.cs @@ -11,6 +11,7 @@ using System.IO; using System.Linq; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using Akka.Dispatch; using Akka.Event; @@ -67,7 +68,7 @@ public LocalSnapshotStore() private readonly ILoggingAdapter _log; /// - protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { // // Heuristics: @@ -81,7 +82,7 @@ protected override Task LoadAsync(string persistenceId, Snapsh } /// - protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default) { _saving.Add(metadata); return RunWithStreamDispatcher(() => @@ -92,7 +93,7 @@ protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) } /// - protected override Task DeleteAsync(SnapshotMetadata metadata) + protected override Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { _saving.Remove(metadata); return RunWithStreamDispatcher(() => @@ -109,11 +110,11 @@ protected override Task DeleteAsync(SnapshotMetadata metadata) } /// - protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { foreach (var metadata in GetSnapshotMetadata(persistenceId, criteria)) { - await DeleteAsync(metadata); + await DeleteAsync(metadata, cancellationToken); } } diff --git a/src/core/Akka.Persistence/Snapshot/MemorySnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/MemorySnapshotStore.cs index 93b75db5462..46f8aed8b0d 100644 --- a/src/core/Akka.Persistence/Snapshot/MemorySnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/MemorySnapshotStore.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Util.Internal; @@ -25,7 +26,7 @@ public class MemorySnapshotStore : SnapshotStore /// protected virtual List Snapshots { get; } = new(); - protected override Task DeleteAsync(SnapshotMetadata metadata) + protected override Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { bool Pred(SnapshotEntry x) => x.PersistenceId == metadata.PersistenceId && (metadata.SequenceNr <= 0 || metadata.SequenceNr == long.MaxValue || x.SequenceNr == metadata.SequenceNr) && (metadata.Timestamp == DateTime.MinValue || metadata.Timestamp == DateTime.MaxValue || x.Timestamp == metadata.Timestamp.Ticks); @@ -37,7 +38,7 @@ bool Pred(SnapshotEntry x) => x.PersistenceId == metadata.PersistenceId && (meta return TaskEx.Completed; } - protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { var filter = CreateRangeFilter(persistenceId, criteria); @@ -45,7 +46,7 @@ protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCrite return TaskEx.Completed; } - protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { var filter = CreateRangeFilter(persistenceId, criteria); @@ -54,7 +55,7 @@ protected override Task LoadAsync(string persistenceId, Snapsh return Task.FromResult(snapshot); } - protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default) { var snapshotEntry = ToSnapshotEntry(metadata, snapshot); diff --git a/src/core/Akka.Persistence/Snapshot/NoSnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/NoSnapshotStore.cs index 491ba0c81ad..602e2fd4d20 100644 --- a/src/core/Akka.Persistence/Snapshot/NoSnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/NoSnapshotStore.cs @@ -7,6 +7,7 @@ using System; using System.Runtime.Serialization; +using System.Threading; using System.Threading.Tasks; namespace Akka.Persistence.Snapshot @@ -64,8 +65,9 @@ protected NoSnapshotStoreException(SerializationInfo info, StreamingContext cont /// /// TBD /// TBD + /// The to stop async operation /// TBD - protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { return Task.FromResult((SelectedSnapshot)null); } @@ -75,11 +77,12 @@ protected override Task LoadAsync(string persistenceId, Snapsh /// /// TBD /// TBD + /// The to stop async operation /// /// This exception is thrown when no snapshot store is configured. /// /// TBD - protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default) { return Flop(); } @@ -88,11 +91,12 @@ protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) /// TBD /// /// TBD + /// The to stop async operation /// /// This exception is thrown when no snapshot store is configured. /// /// TBD - protected override Task DeleteAsync(SnapshotMetadata metadata) + protected override Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { return Flop(); } @@ -102,11 +106,12 @@ protected override Task DeleteAsync(SnapshotMetadata metadata) /// /// TBD /// TBD + /// The to stop async operation /// /// This exception is thrown when no snapshot store is configured. /// /// TBD - protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) + protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default) { return Flop(); } diff --git a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs index fe5d86540ac..8b6645a94d2 100644 --- a/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs +++ b/src/core/Akka.Persistence/Snapshot/SnapshotStore.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Pattern; @@ -63,7 +64,7 @@ private bool ReceiveSnapshotStore(object message) } else { - _breaker.WithCircuitBreaker(() => LoadAsync(loadSnapshot.PersistenceId, loadSnapshot.Criteria.Limit(loadSnapshot.ToSequenceNr))) + _breaker.WithCircuitBreaker(ct => LoadAsync(loadSnapshot.PersistenceId, loadSnapshot.Criteria.Limit(loadSnapshot.ToSequenceNr), ct)) .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) ? new LoadSnapshotResult(t.Result, loadSnapshot.ToSequenceNr) as ISnapshotResponse : new LoadSnapshotFailed(t.IsFaulted @@ -77,7 +78,7 @@ private bool ReceiveSnapshotStore(object message) { var metadata = new SnapshotMetadata(saveSnapshot.Metadata.PersistenceId, saveSnapshot.Metadata.SequenceNr, saveSnapshot.Metadata.Timestamp == DateTime.MinValue ? DateTime.UtcNow : saveSnapshot.Metadata.Timestamp); - _breaker.WithCircuitBreaker(() => SaveAsync(metadata, saveSnapshot.Snapshot)) + _breaker.WithCircuitBreaker(ct => SaveAsync(metadata, saveSnapshot.Snapshot, ct)) .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) ? new SaveSnapshotSuccess(metadata) as ISnapshotResponse : new SaveSnapshotFailure(saveSnapshot.Metadata, @@ -103,7 +104,7 @@ private bool ReceiveSnapshotStore(object message) try { ReceivePluginInternal(message); - _breaker.WithCircuitBreaker(() => DeleteAsync(saveSnapshotFailure.Metadata)); + _breaker.WithCircuitBreaker(ct => DeleteAsync(saveSnapshotFailure.Metadata, ct)); } finally { @@ -113,7 +114,7 @@ private bool ReceiveSnapshotStore(object message) else if (message is DeleteSnapshot deleteSnapshot) { var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshot.Metadata)) + _breaker.WithCircuitBreaker(ct => DeleteAsync(deleteSnapshot.Metadata, ct)) .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) ? new DeleteSnapshotSuccess(deleteSnapshot.Metadata) as ISnapshotResponse : new DeleteSnapshotFailure(deleteSnapshot.Metadata, @@ -153,7 +154,7 @@ private bool ReceiveSnapshotStore(object message) else if (message is DeleteSnapshots deleteSnapshots) { var eventStream = Context.System.EventStream; - _breaker.WithCircuitBreaker(() => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria)) + _breaker.WithCircuitBreaker(ct => DeleteAsync(deleteSnapshots.PersistenceId, deleteSnapshots.Criteria, ct)) .ContinueWith(t => (!t.IsFaulted && !t.IsCanceled) ? new DeleteSnapshotsSuccess(deleteSnapshots.Criteria) as ISnapshotResponse : new DeleteSnapshotsFailure(deleteSnapshots.Criteria, @@ -212,8 +213,9 @@ private Exception TryUnwrapException(Exception e) /// /// Id of the persistent actor. /// Selection criteria for loading. + /// The to stop async operation /// TBD - protected abstract Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria); + protected abstract Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default); /// /// Plugin API: Asynchronously saves a snapshot. @@ -222,8 +224,9 @@ private Exception TryUnwrapException(Exception e) /// /// Snapshot metadata. /// Snapshot. + /// The to stop async operation /// TBD - protected abstract Task SaveAsync(SnapshotMetadata metadata, object snapshot); + protected abstract Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken = default); /// /// Plugin API: Deletes the snapshot identified by . @@ -231,8 +234,9 @@ private Exception TryUnwrapException(Exception e) /// This call is protected with a circuit-breaker /// /// Snapshot metadata. + /// The to stop async operation /// TBD - protected abstract Task DeleteAsync(SnapshotMetadata metadata); + protected abstract Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default); /// /// Plugin API: Deletes all snapshots matching provided . @@ -241,8 +245,9 @@ private Exception TryUnwrapException(Exception e) /// /// Id of the persistent actor. /// Selection criteria for deleting. + /// The to stop async operation /// TBD - protected abstract Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria); + protected abstract Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken = default); /// /// Plugin API: Allows plugin implementers to use f.PipeTo(Self) diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs index 43beec0a511..be986ff18e1 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -246,8 +246,12 @@ public async Task Must_increment_failure_count_on_callTimeout() { var breaker = ShortCallTimeoutCb(); - var innerFuture = SlowThrowing(); - var future = breaker.Instance.WithCircuitBreaker(() => innerFuture); + Task innerFuture = null; + var future = breaker.Instance.WithCircuitBreaker(ct => + { + innerFuture = SlowThrowing(ct); + return innerFuture; + }); CheckLatch(breaker.OpenLatch).Should().BeTrue(); breaker.Instance.CurrentFailureCount.Should().Be(1); @@ -257,20 +261,17 @@ public async Task Must_increment_failure_count_on_callTimeout() await InterceptException(() => future); // Issue https://github.com/akkadotnet/akka.net/issues/7358 - // The actual exception is thrown out-of-band with no handler because inner Task is detached - // after a timeout and NOT protected - - // In the bug, the task is still running when it should've been stopped. + // In the bug, TestException was thrown out-of-band with no handler because inner Task is detached + // after a timeout and NOT protected when it SHOULD be cancelled/stopped. innerFuture.IsCompleted.Should().BeTrue(); - innerFuture.IsFaulted.Should().BeTrue(); - innerFuture.Exception.Should().BeOfType(); + innerFuture.IsCanceled.Should().BeTrue(); return; - async Task SlowThrowing() + async Task SlowThrowing(CancellationToken ct) { - await Task.Delay(150); - await ThrowExceptionAsync(); + await Task.Delay(500, ct); + await ThrowExceptionAsync(ct); } } @@ -347,7 +348,7 @@ public async Task Must_increase_reset_timeout_after_it_transits_to_open_again() { var breaker = NonOneFactorCb(); await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); - _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + _ = breaker.Instance.WithCircuitBreaker(ct => Task.Run(ThrowException, ct)); CheckLatch(breaker.OpenLatch).Should().BeTrue(); var e1 = await InterceptException(() => breaker.Instance.WithCircuitBreaker(ThrowExceptionAsync)); @@ -378,7 +379,7 @@ public class CircuitBreakerSpecBase : AkkaSpec public static void ThrowException() => throw new TestException("Test Exception"); [DebuggerStepThrough] - public static async Task ThrowExceptionAsync() + public static async Task ThrowExceptionAsync(CancellationToken ct) { await Task.Yield(); throw new TestException("Test Exception"); @@ -386,7 +387,7 @@ public static async Task ThrowExceptionAsync() public static string SayHi() => "hi"; - public static async Task SayHiAsync() + public static async Task SayHiAsync(CancellationToken ct) { await Task.Yield(); return "hi"; diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs index ca4c6f010c3..c791ac04bc7 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs @@ -7,6 +7,7 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Pattern; @@ -94,9 +95,9 @@ protected override void OnReceive(object message) } } - private static async Task Job() + private static async Task Job(CancellationToken ct) { - await Task.Delay(TimeSpan.FromMilliseconds(ThreadLocalRandom.Current.Next(300))); + await Task.Delay(TimeSpan.FromMilliseconds(ThreadLocalRandom.Current.Next(300)), ct); return JobDone.Instance; } } diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs index 9f93d83d3d6..3d8b11d0196 100644 --- a/src/core/Akka/Pattern/CircuitBreaker.cs +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -224,27 +224,52 @@ public CircuitBreaker(IScheduler scheduler, int maxFailures, TimeSpan callTimeou /// TBD /// Call needing protected /// containing the call result + [Obsolete("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public Task WithCircuitBreaker(Func> body) => CurrentState.Invoke(body); + /// + /// Wraps invocation of asynchronous calls that need to be protected + /// + /// TBD + /// Call needing protected + /// containing the call result + public Task WithCircuitBreaker(Func> body) => CurrentState.Invoke(body); + + [Obsolete("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public Task WithCircuitBreaker(TState state, Func> body) => CurrentState.InvokeState(state, body); + public Task WithCircuitBreaker(TState state, Func> body) => + CurrentState.InvokeState(state, body); + /// /// Wraps invocation of asynchronous calls that need to be protected /// /// Call needing protected /// + [Obsolete("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public Task WithCircuitBreaker(Func body) => CurrentState.Invoke(body); + /// + /// Wraps invocation of asynchronous calls that need to be protected + /// + /// Call needing protected + /// + public Task WithCircuitBreaker(Func body) => CurrentState.Invoke(body); + + [Obsolete("Use WithCircuitBreaker that takes a cancellation token in the body instead", true)] public Task WithCircuitBreaker(TState state, Func body) => CurrentState.InvokeState(state, body); + public Task WithCircuitBreaker(TState state, Func body) => + CurrentState.InvokeState(state, body); + /// /// Wraps invocations of asynchronous calls that need to be protected. /// /// Call needing protected public void WithSyncCircuitBreaker(Action body) => - WithCircuitBreaker(body, b => Task.Run(b)).GetAwaiter().GetResult(); + WithCircuitBreaker(body, (b, ct) => Task.Run(b, ct)).GetAwaiter().GetResult(); /// /// Wraps invocations of asynchronous calls that need to be protected. @@ -252,7 +277,7 @@ public void WithSyncCircuitBreaker(Action body) => /// Call needing protected /// The result of the call public T WithSyncCircuitBreaker(Func body) => - WithCircuitBreaker(body, b => Task.Run(b)).GetAwaiter().GetResult(); + WithCircuitBreaker(body, (b, ct) => Task.Run(b, ct)).GetAwaiter().GetResult(); /// /// Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the diff --git a/src/core/Akka/Pattern/CircuitBreakerState.cs b/src/core/Akka/Pattern/CircuitBreakerState.cs index 62846f0ccba..11e84dcdf78 100644 --- a/src/core/Akka/Pattern/CircuitBreakerState.cs +++ b/src/core/Akka/Pattern/CircuitBreakerState.cs @@ -8,6 +8,7 @@ using System; using System.Diagnostics; using System.Globalization; +using System.Threading; using System.Threading.Tasks; using Akka.Util; using Akka.Util.Internal; @@ -48,28 +49,50 @@ private TimeSpan RemainingDuration() /// N/A /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public override Task Invoke(Func> body) => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); - public override Task - InvokeState(TState state, Func body) => - Task.FromException( - new OpenCircuitException(_breaker.LastCaughtException, - RemainingDuration())); + /// + /// Fail-fast on any invocation + /// + /// N/A + /// Implementation of the call that needs protected + /// containing result of protected call + public override Task Invoke(Func> body) + => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] + public override Task InvokeState(TState state, Func body) + => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); - public override Task InvokeState(TState state, - Func> body) => Task.FromException( - new OpenCircuitException(_breaker.LastCaughtException, - RemainingDuration())); + public override Task InvokeState(TState state, Func body) + => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] + public override Task InvokeState(TState state, Func> body) + => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + + public override Task InvokeState(TState state, Func> body) + => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); /// /// Fail-fast on any invocation /// /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public override Task Invoke(Func body) => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + /// + /// Fail-fast on any invocation + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override Task Invoke(Func body) + => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + /// /// No-op for open, calls are never executed so cannot succeed or fail /// @@ -147,13 +170,34 @@ private void CheckState() /// TBD /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public override Task Invoke(Func> body) + { + CheckState(); + return CallThrough(_ => body()); + } + + /// + /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. + /// If the call succeeds, the breaker closes. + /// + /// TBD + /// Implementation of the call that needs protected + /// containing result of protected call + public override Task Invoke(Func> body) { CheckState(); return CallThrough(body); } + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] public override Task InvokeState(TState state, Func> body) + { + CheckState(); + return CallThrough(state, (s, _) => body(s)); + } + + public override Task InvokeState(TState state, Func> body) { CheckState(); return CallThrough(state,body); @@ -165,14 +209,33 @@ public override Task InvokeState(TState state, Func /// /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public override async Task Invoke(Func body) + { + CheckState(); + await CallThrough(_ => body()); + } + + /// + /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. + /// If the call succeeds, the breaker closes. + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public override async Task Invoke(Func body) { CheckState(); await CallThrough(body); } - public override async Task InvokeState(TState state, - Func body) + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] + public override async Task InvokeState(TState state, Func body) + { + CheckState(); + await CallThrough(state, (s, _) => body(s)); + } + + public override async Task InvokeState(TState state, Func body) { CheckState(); await CallThrough(state,body); @@ -237,31 +300,53 @@ public Closed(CircuitBreaker breaker) /// TBD /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public override Task Invoke(Func> body) { - return CallThrough(body); + return CallThrough(_ => body()); } + /// + /// Implementation of invoke, which simply attempts the call + /// + /// TBD + /// Implementation of the call that needs protected + /// containing result of protected call + public override Task Invoke(Func> body) + => CallThrough(body); + + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] public override Task InvokeState(TState state, Func> body) { - return CallThrough(state, body); + return CallThrough(state, (s, _) => body(s)); } + public override Task InvokeState(TState state, Func> body) + => CallThrough(state, body); + /// /// Implementation of invoke, which simply attempts the call /// /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public override Task Invoke(Func body) { - return CallThrough(body); + return CallThrough(_ => body()); } + public override Task Invoke(Func body) + => CallThrough(body); + + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] public override Task InvokeState(TState state, Func body) { - return CallThrough(state, body); + return CallThrough(state, (s, _) => body(s)); } + public override Task InvokeState(TState state, Func body) + => CallThrough(state, body); + /// /// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and /// the breaker is tripped if we have reached maxFailures. diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index 7395cf4aa71..b393e3fc8ef 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Concurrent; using System.Runtime.ExceptionServices; +using System.Threading; using System.Threading.Tasks; namespace Akka.Util.Internal @@ -76,13 +77,22 @@ await Task /// /// Implementation of the call /// containing the result of the call - public async Task CallThrough(Func> task) + public async Task CallThrough(Func> task) { var result = default(T); try { - result = await task().WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); + using var cts = new CancellationTokenSource(); + try + { + result = await task(cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); + } + catch + { + cts.Cancel(); + throw; + } } catch (Exception ex) { @@ -94,13 +104,24 @@ public async Task CallThrough(Func> task) return result; } - public async Task CallThrough(TState state, Func> task) + public async Task CallThrough(TState state, Func> task) { var result = default(T); try { - result = await task(state).WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); + using var cts = new CancellationTokenSource(_callTimeout); + try + { + result = await task(state, cts.Token).WaitAsync(_callTimeout, cts.Token).ConfigureAwait(false); + CallSucceeds(); + } + catch (OperationCanceledException cancelled) + { + if(cts.IsCancellationRequested) + throw new TimeoutException(null, cancelled); + + throw; + } } catch (Exception ex) { @@ -118,12 +139,21 @@ public async Task CallThrough(TState state, Func> /// /// Implementation of the call /// containing the result of the call - public async Task CallThrough(Func task) + public async Task CallThrough(Func task) { try { - await task().WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); + using var cts = new CancellationTokenSource(); + try + { + await task(cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); + } + catch + { + cts.Cancel(); + throw; + } } catch (Exception ex) { @@ -133,12 +163,21 @@ public async Task CallThrough(Func task) } } - public async Task CallThrough(TState state, Func task) + public async Task CallThrough(TState state, Func task) { try { - await task(state).WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); + using var cts = new CancellationTokenSource(); + try + { + await task(state, cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); + } + catch + { + cts.Cancel(); + throw; + } } catch (Exception ex) { @@ -154,19 +193,42 @@ public async Task CallThrough(TState state, Func task) /// TBD /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public abstract Task Invoke(Func> body); + /// + /// Abstract entry point for all states + /// + /// TBD + /// Implementation of the call that needs protected + /// containing result of protected call + public abstract Task Invoke(Func> body); + + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] public abstract Task InvokeState(TState state, Func> body); + public abstract Task InvokeState(TState state, Func> body); + /// /// Abstract entry point for all states /// /// Implementation of the call that needs protected /// containing result of protected call + [Obsolete("Use Invoke that takes a cancellation token in the body instead", true)] public abstract Task Invoke(Func body); + /// + /// Abstract entry point for all states + /// + /// Implementation of the call that needs protected + /// containing result of protected call + public abstract Task Invoke(Func body); + + [Obsolete("Use InvokeState that takes a cancellation token in the body instead", true)] public abstract Task InvokeState(TState state, Func body); + public abstract Task InvokeState(TState state, Func body); + /// /// Invoked when call fails /// diff --git a/src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs b/src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs index b29e46a526c..ced1e5eeae8 100644 --- a/src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs +++ b/src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs @@ -273,12 +273,13 @@ public sealed override async Task ReplayMessagesAsync( // public sealed override async Task ReadHighestSequenceNrAsync( string persistenceId, - long fromSequenceNr) + long fromSequenceNr, + CancellationToken cancellationToken = default) { // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(cts.Token); // Create new DbCommand instance @@ -299,7 +300,8 @@ public sealed override async Task ReadHighestSequenceNrAsync( // protected sealed override async Task> WriteMessagesAsync( - IEnumerable messages) + IEnumerable messages, + CancellationToken cancellationToken = default) { // For each of the atomic write request, create an async Task var writeTasks = messages.Select(async message => @@ -307,7 +309,7 @@ protected sealed override async Task> WriteMessagesAsy // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(cts.Token); @@ -384,7 +386,7 @@ protected sealed override async Task> WriteMessagesAsy .ContinueWhenAll(writeTasks, tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) - : null).ToImmutableList()); + : null).ToImmutableList(), cancellationToken); return result; } @@ -393,15 +395,16 @@ protected sealed override async Task> WriteMessagesAsy // protected sealed override async Task DeleteMessagesToAsync( string persistenceId, - long toSequenceNr) + long toSequenceNr, + CancellationToken cancellationToken = default) { // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) { - await connection.OpenAsync(); + await connection.OpenAsync(cancellationToken); using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { // We will be using two DbCommands to complete this process using (var deleteCommand = GetCommand(connection, DeleteBatchSql, _timeout)) diff --git a/src/examples/Akka.Persistence.Custom/Snapshot/SqliteSnapshotStore.cs b/src/examples/Akka.Persistence.Custom/Snapshot/SqliteSnapshotStore.cs index 70061e4fa50..4f0c4ee5943 100644 --- a/src/examples/Akka.Persistence.Custom/Snapshot/SqliteSnapshotStore.cs +++ b/src/examples/Akka.Persistence.Custom/Snapshot/SqliteSnapshotStore.cs @@ -184,12 +184,13 @@ private bool WaitingForInitialization(object message) // protected sealed override async Task LoadAsync( string persistenceId, - SnapshotSelectionCriteria criteria) + SnapshotSelectionCriteria criteria, + CancellationToken cancellationToken = default) { // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(cts.Token); @@ -234,12 +235,13 @@ protected sealed override async Task LoadAsync( // protected sealed override async Task SaveAsync( SnapshotMetadata metadata, - object snapshot) + object snapshot, + CancellationToken cancellationToken = default) { // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(cts.Token); @@ -302,12 +304,14 @@ protected sealed override async Task SaveAsync( // // - protected sealed override async Task DeleteAsync(SnapshotMetadata metadata) + protected sealed override async Task DeleteAsync( + SnapshotMetadata metadata, + CancellationToken cancellationToken = default) { // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(cts.Token); var timestamp = metadata.Timestamp != DateTime.MinValue @@ -346,12 +350,13 @@ protected sealed override async Task DeleteAsync(SnapshotMetadata metadata) // protected sealed override async Task DeleteAsync( string persistenceId, - SnapshotSelectionCriteria criteria) + SnapshotSelectionCriteria criteria, + CancellationToken cancellationToken = default) { // Create a new DbConnection instance using (var connection = new SqliteConnection(_connectionString)) using (var cts = CancellationTokenSource - .CreateLinkedTokenSource(_pendingRequestsCancellation.Token)) + .CreateLinkedTokenSource(cancellationToken, _pendingRequestsCancellation.Token)) { await connection.OpenAsync(cts.Token); From d28557348f3d17695ef707a3511d6b02ee1824e6 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 22 Oct 2024 04:02:37 +0700 Subject: [PATCH 3/3] Simplify code --- src/core/Akka/Util/Internal/AtomicState.cs | 62 ++++++---------------- 1 file changed, 16 insertions(+), 46 deletions(-) diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index b393e3fc8ef..e98db5f1186 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -80,22 +80,15 @@ await Task public async Task CallThrough(Func> task) { var result = default(T); + using var cts = new CancellationTokenSource(); try { - using var cts = new CancellationTokenSource(); - try - { - result = await task(cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); - } - catch - { - cts.Cancel(); - throw; - } + result = await task(cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { + cts.Cancel(); var capturedException = ExceptionDispatchInfo.Capture(ex); CallFails(capturedException.SourceException); capturedException.Throw(); @@ -107,24 +100,15 @@ public async Task CallThrough(Func> task) public async Task CallThrough(TState state, Func> task) { var result = default(T); + using var cts = new CancellationTokenSource(_callTimeout); try { - using var cts = new CancellationTokenSource(_callTimeout); - try - { - result = await task(state, cts.Token).WaitAsync(_callTimeout, cts.Token).ConfigureAwait(false); - CallSucceeds(); - } - catch (OperationCanceledException cancelled) - { - if(cts.IsCancellationRequested) - throw new TimeoutException(null, cancelled); - - throw; - } + result = await task(state, cts.Token).WaitAsync(_callTimeout, cts.Token).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { + cts.Cancel(); var capturedException = ExceptionDispatchInfo.Capture(ex); CallFails(capturedException.SourceException); capturedException.Throw(); @@ -141,22 +125,15 @@ public async Task CallThrough(TState state, Func containing the result of the call public async Task CallThrough(Func task) { + using var cts = new CancellationTokenSource(); try { - using var cts = new CancellationTokenSource(); - try - { - await task(cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); - } - catch - { - cts.Cancel(); - throw; - } + await task(cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { + cts.Cancel(); var capturedException = ExceptionDispatchInfo.Capture(ex); CallFails(capturedException.SourceException); capturedException.Throw(); @@ -165,22 +142,15 @@ public async Task CallThrough(Func task) public async Task CallThrough(TState state, Func task) { + using var cts = new CancellationTokenSource(); try { - using var cts = new CancellationTokenSource(); - try - { - await task(state, cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); - CallSucceeds(); - } - catch - { - cts.Cancel(); - throw; - } + await task(state, cts.Token).WaitAsync(_callTimeout).ConfigureAwait(false); + CallSucceeds(); } catch (Exception ex) { + cts.Cancel(); var capturedException = ExceptionDispatchInfo.Capture(ex); CallFails(capturedException.SourceException); capturedException.Throw();