From 19f8c3166bef83f2350f9b871c5f49e5cd0f8026 Mon Sep 17 00:00:00 2001 From: William Chong Date: Mon, 1 Jul 2024 16:33:36 +0400 Subject: [PATCH] Dispose on token cancellation request --- .../PersistentSubscription.cs | 120 ++++-- .../StreamSubscription.cs | 21 +- ...ription_drops_due_to_cancellation_token.cs | 80 ++++ .../Obsolete/subscribe_to_all_obsolete.cs | 342 ++++++++++++------ .../Obsolete/subscribe_to_stream_obsolete.cs | 57 ++- .../SubscriptionDroppedResult.cs | 0 6 files changed, 459 insertions(+), 161 deletions(-) create mode 100644 test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/Obsolete/persistent_subscription_drops_due_to_cancellation_token.cs rename test/{EventStore.Client.Streams.Tests/Subscriptions => EventStore.Client.Tests.Common}/SubscriptionDroppedResult.cs (100%) diff --git a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs index f3d19b42e..35443e365 100644 --- a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs +++ b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs @@ -1,5 +1,3 @@ -using EventStore.Client.PersistentSubscriptions; -using Grpc.Core; using Microsoft.Extensions.Logging; namespace EventStore.Client { @@ -7,7 +5,9 @@ namespace EventStore.Client { /// Represents a persistent subscription connection. /// public class PersistentSubscription : IDisposable { - private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult; + private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult + _persistentSubscriptionResult; + private readonly IAsyncEnumerator _enumerator; private readonly Func _eventAppeared; private readonly Action _subscriptionDropped; @@ -25,7 +25,8 @@ internal static async Task Confirm( EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult, Func eventAppeared, Action subscriptionDropped, - ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default) { + ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default + ) { var enumerator = persistentSubscriptionResult .Messages .GetAsyncEnumerator(cancellationToken); @@ -34,11 +35,20 @@ internal static async Task Confirm( return (result, enumerator.Current) switch { (true, PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) => - new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared, - subscriptionDropped, log, cancellationToken), + new PersistentSubscription( + persistentSubscriptionResult, + enumerator, + subscriptionId, + eventAppeared, + subscriptionDropped, + log, + cancellationToken + ), (true, PersistentSubscriptionMessage.NotFound) => - throw new PersistentSubscriptionNotFoundException(persistentSubscriptionResult.StreamName, - persistentSubscriptionResult.GroupName), + throw new PersistentSubscriptionNotFoundException( + persistentSubscriptionResult.StreamName, + persistentSubscriptionResult.GroupName + ), _ => throw new InvalidOperationException("Subscription could not be confirmed.") }; } @@ -49,14 +59,15 @@ private PersistentSubscription( IAsyncEnumerator enumerator, string subscriptionId, Func eventAppeared, Action subscriptionDropped, ILogger log, - CancellationToken cancellationToken) { + CancellationToken cancellationToken + ) { _persistentSubscriptionResult = persistentSubscriptionResult; - _enumerator = enumerator; - SubscriptionId = subscriptionId; - _eventAppeared = eventAppeared; - _subscriptionDropped = subscriptionDropped; - _log = log; - _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _enumerator = enumerator; + SubscriptionId = subscriptionId; + _eventAppeared = eventAppeared; + _subscriptionDropped = subscriptionDropped; + _log = log; + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); Task.Run(Subscribe, _cts.Token); } @@ -91,7 +102,6 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) => public Task Ack(IEnumerable resolvedEvents) => Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId)); - /// /// Acknowledge that a message has failed processing (this will tell the server it has not been processed). /// @@ -99,7 +109,8 @@ public Task Ack(IEnumerable resolvedEvents) => /// A reason given. /// The of the s to nak. There should not be more than 2000 to nak at a time. /// The number of eventIds exceeded the limit of 2000. - public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason); + public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => + NackInternal(eventIds, action, reason); /// /// Acknowledge that a message has failed processing (this will tell the server it has not been processed). @@ -108,10 +119,15 @@ public Task Ack(IEnumerable resolvedEvents) => /// A reason given. /// The s to nak. There should not be more than 2000 to nak at a time. /// The number of resolvedEvents exceeded the limit of 2000. - public Task Nack(PersistentSubscriptionNakEventAction action, string reason, - params ResolvedEvent[] resolvedEvents) => - Nack(action, reason, - Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)); + public Task Nack( + PersistentSubscriptionNakEventAction action, string reason, + params ResolvedEvent[] resolvedEvents + ) => + Nack( + action, + reason, + Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId) + ); /// public void Dispose() => SubscriptionDropped(SubscriptionDroppedReason.Disposed); @@ -121,7 +137,8 @@ private async Task Subscribe() { try { while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) { - if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) { + if (_enumerator.Current is not + PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) { continue; } @@ -129,39 +146,54 @@ private async Task Subscribe() { if (_subscriptionDroppedInvoked != 0) { return; } - SubscriptionDropped(SubscriptionDroppedReason.ServerError, + + SubscriptionDropped( + SubscriptionDroppedReason.ServerError, new PersistentSubscriptionNotFoundException( - _persistentSubscriptionResult.StreamName, _persistentSubscriptionResult.GroupName)); + _persistentSubscriptionResult.StreamName, + _persistentSubscriptionResult.GroupName + ) + ); + return; } - + _log.LogTrace( "Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}", - SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId, - resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position); + SubscriptionId, + resolvedEvent.OriginalEvent.EventStreamId, + resolvedEvent.OriginalEvent.EventNumber, + resolvedEvent.OriginalEvent.Position + ); try { await _eventAppeared( this, resolvedEvent, retryCount, - _cts.Token).ConfigureAwait(false); + _cts.Token + ).ConfigureAwait(false); } catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) { if (_subscriptionDroppedInvoked != 0) { return; } - _log.LogWarning(ex, + _log.LogWarning( + ex, "Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.", - SubscriptionId); + SubscriptionId + ); SubscriptionDropped(SubscriptionDroppedReason.Disposed); return; } catch (Exception ex) { - _log.LogError(ex, + _log.LogError( + ex, "Persistent Subscription {subscriptionId} was dropped because the subscriber made an error.", - SubscriptionId); + SubscriptionId + ); + SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex); return; @@ -169,16 +201,30 @@ await _eventAppeared( } } catch (Exception ex) { if (_subscriptionDroppedInvoked == 0) { - _log.LogError(ex, - "Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.", - SubscriptionId); - SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex); + if (_cts.Token.IsCancellationRequested) { + _log.LogInformation( + "Subscription {subscriptionId} was dropped because cancellation was requested.", + SubscriptionId + ); + + SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); + } else { + _log.LogError( + ex, + "Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.", + SubscriptionId + ); + + SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex); + } } } finally { if (_subscriptionDroppedInvoked == 0) { _log.LogError( "Persistent Subscription {subscriptionId} was unexpectedly terminated.", - SubscriptionId); + SubscriptionId + ); + SubscriptionDropped(SubscriptionDroppedReason.ServerError); } } diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs index f70080c17..72f7ddbf6 100644 --- a/src/EventStore.Client.Streams/StreamSubscription.cs +++ b/src/EventStore.Client.Streams/StreamSubscription.cs @@ -135,13 +135,22 @@ await _checkpointReached(this, position, _cts.Token) SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); } catch (Exception ex) { if (_subscriptionDroppedInvoked == 0) { - _log.LogError( - ex, - "Subscription {subscriptionId} was dropped because an error occurred on the server.", - SubscriptionId - ); + if (_cts.IsCancellationRequested) { + _log.LogInformation( + "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", + SubscriptionId + ); + + SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex); + } else { + _log.LogError( + ex, + "Subscription {subscriptionId} was dropped because an error occurred on the server.", + SubscriptionId + ); - SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex); + SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex); + } } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/Obsolete/persistent_subscription_drops_due_to_cancellation_token.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/Obsolete/persistent_subscription_drops_due_to_cancellation_token.cs new file mode 100644 index 000000000..d387c3ada --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/Obsolete/persistent_subscription_drops_due_to_cancellation_token.cs @@ -0,0 +1,80 @@ +// ReSharper disable InconsistentNaming + +using EventStore.Client.Streams.Tests.Subscriptions; + +namespace EventStore.Client.PersistentSubscriptions.Tests.SubscriptionToAll.Obsolete; + +[Obsolete("Will be removed in future release when older subscriptions APIs are removed from the client")] +public class + PersistentSubscriptionDropsDueToCancellationToken(PersistentSubscriptionDropsDueToCancellationToken.Fixture fixture) + : IClassFixture< + PersistentSubscriptionDropsDueToCancellationToken.Fixture> { + static readonly string Group = Guid.NewGuid().ToString(); + static readonly string Stream = Guid.NewGuid().ToString(); + + [SupportsPSToAll.Fact] + public async Task persistent_subscription_to_all_drops_due_to_cancellation_token() { + var subscriptionDropped = new TaskCompletionSource(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + + await fixture.Client.CreateToAllAsync( + Group, + cancellationToken: cts.Token, + settings: new PersistentSubscriptionSettings() + ); + + using var subscription = await fixture.Client.SubscribeToAllAsync( + Group, + async (s, e, r, ct) => await s.Ack(e), + (sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)), + userCredentials: TestCredentials.Root, + cancellationToken: cts.Token + ) + .WithTimeout(); + + // wait until the cancellation token cancels + await Task.Delay(TimeSpan.FromSeconds(3)); + + var result = await subscriptionDropped.Task.WithTimeout(); + result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed); + } + + [SupportsPSToAll.Fact] + public async Task persistent_subscription_to_stream_drops_due_to_cancellation_token() { + var subscriptionDropped = new TaskCompletionSource(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + + await fixture.Client.CreateToStreamAsync( + Group, + Stream, + cancellationToken: cts.Token, + settings: new PersistentSubscriptionSettings() + ); + + using var subscription = await fixture.Client.SubscribeToStreamAsync( + Group, + Stream, + async (s, e, r, ct) => await s.Ack(e), + (sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)), + userCredentials: TestCredentials.Root, + cancellationToken: cts.Token + ) + .WithTimeout(); + + // wait until the cancellation token cancels + await Task.Delay(TimeSpan.FromSeconds(3)); + + var result = await subscriptionDropped.Task.WithTimeout(); + result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() { + return Task.CompletedTask; + } + + protected override Task When() => Task.CompletedTask; + } +} diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_all_obsolete.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_all_obsolete.cs index af4b5ac43..89b09b43d 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_all_obsolete.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_all_obsolete.cs @@ -3,7 +3,8 @@ namespace EventStore.Client.Streams.Tests.Subscriptions.Obsolete; [Trait("Category", "Subscriptions")] [Trait("Category", "Target:All")] [Obsolete("Will be removed in future release when older subscriptions APIs are removed from the client")] -public class subscribe_to_all_obsolete(ITestOutputHelper output, SubscriptionsFixture fixture) : EventStoreTests(output, fixture) { +public class subscribe_to_all_obsolete(ITestOutputHelper output, SubscriptionsFixture fixture) + : EventStoreTests(output, fixture) { [Fact] public async Task receives_all_events_from_start() { var receivedAllEvents = new TaskCompletionSource(); @@ -11,18 +12,26 @@ public async Task receives_all_events_from_start() { var seedEvents = Fixture.CreateTestEvents(10).ToArray(); var pageSize = seedEvents.Length / 2; - + var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); - + foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); using var subscription = await Fixture.Streams .SubscribeToAllAsync(FromAll.Start, OnReceived, false, OnDropped) .WithTimeout(); foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await receivedAllEvents.Task.WithTimeout(); @@ -34,40 +43,44 @@ public async Task receives_all_events_from_start() { subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { availableEvents.RemoveWhere(x => x == re.OriginalEvent.EventId); - + if (availableEvents.Count == 0) { receivedAllEvents.TrySetResult(true); Fixture.Log.Information("Received all {TotalEventsCount} expected events", seedEvents.Length); } - + return Task.CompletedTask; } void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } - + [Fact] public async Task receives_all_events_from_end() { var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var seedEvents = Fixture.CreateTestEvents(10).ToArray(); - + var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync(FromAll.End, OnReceived, false, OnDropped) .WithTimeout(); // add the events we want to receive after we start the subscription foreach (var evt in seedEvents) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await receivedAllEvents.Task.WithTimeout(); @@ -79,24 +92,24 @@ public async Task receives_all_events_from_end() { subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { availableEvents.RemoveWhere(x => x == re.OriginalEvent.EventId); - + if (availableEvents.Count == 0) { receivedAllEvents.TrySetResult(true); Fixture.Log.Information("Received all {TotalEventsCount} expected events", seedEvents.Length); } - + return Task.CompletedTask; } void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } - + [Fact] public async Task receives_all_events_from_position() { var receivedAllEvents = new TaskCompletionSource(); @@ -104,22 +117,30 @@ public async Task receives_all_events_from_position() { var seedEvents = Fixture.CreateTestEvents(10).ToArray(); var pageSize = seedEvents.Length / 2; - + // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); - + IWriteResult writeResult = new SuccessResult(); foreach (var evt in seedEvents.Take(pageSize)) - writeResult = await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + writeResult = await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var position = FromAll.After(writeResult.LogPosition); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync(position, OnReceived, false, OnDropped) .WithTimeout(); foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"stream-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"stream-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); await receivedAllEvents.Task.WithTimeout(); @@ -131,24 +152,24 @@ public async Task receives_all_events_from_position() { subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { availableEvents.RemoveWhere(x => x == re.OriginalEvent.EventId); - + if (availableEvents.Count == 0) { receivedAllEvents.TrySetResult(true); Fixture.Log.Information("Received all {TotalEventsCount} expected events", pageSize); } - + return Task.CompletedTask; } void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } - + [Fact] public async Task receives_all_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); @@ -158,13 +179,13 @@ public async Task receives_all_events_with_resolved_links() { var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); - + await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync(FromAll.Start, OnReceived, true, OnDropped) .WithTimeout(); - + await receivedAllEvents.Task.WithTimeout(); // if the subscription dropped before time, raise the reason why @@ -175,81 +196,98 @@ public async Task receives_all_events_with_resolved_links() { subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { var hasResolvedLink = re.OriginalEvent.EventStreamId.StartsWith($"$et-{EventStoreFixture.TestEventType}"); if (availableEvents.RemoveWhere(x => x == re.Event.EventId && hasResolvedLink) == 0) { - Fixture.Log.Debug("Received unexpected event {EventId} from stream {StreamId}", re.Event.EventId, re.OriginalEvent.EventStreamId); + Fixture.Log.Debug( + "Received unexpected event {EventId} from stream {StreamId}", + re.Event.EventId, + re.OriginalEvent.EventStreamId + ); + return Task.CompletedTask; } - + if (availableEvents.Count == 0) { receivedAllEvents.TrySetResult(true); Fixture.Log.Information("Received all {TotalEventsCount} expected events", seedEvents.Length); } - + return Task.CompletedTask; } void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } - + [Theory] - [MemberData(nameof(SubscriptionFilter.TestCases), MemberType= typeof(SubscriptionFilter))] + [MemberData(nameof(SubscriptionFilter.TestCases), MemberType = typeof(SubscriptionFilter))] public async Task receives_all_filtered_events_from_start(SubscriptionFilter filter) { var streamPrefix = $"{nameof(receives_all_filtered_events_from_start)}-{filter.Name}-{Guid.NewGuid():N}"; - + Fixture.Log.Information("Using filter {FilterName} with prefix {StreamPrefix}", filter.Name, streamPrefix); - + var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var checkpointReached = new TaskCompletionSource(); - + var seedEvents = Fixture.CreateTestEvents(64) .Select(evt => filter.PrepareEvent(streamPrefix, evt)) .ToArray(); var pageSize = seedEvents.Length / 2; - + var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, Fixture.CreateTestEvents(3)); - + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); + var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start).CountAsync(); Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); // Debugging: // await foreach (var evt in Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start)) // Fixture.Log.Debug("Read event {EventId} from {StreamId}.", evt.OriginalEvent.EventId, evt.OriginalEvent.EventStreamId); - + // add some of the events we want to see before we start the subscription foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1, CheckpointReached); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync(FromAll.Start, OnReceived, false, OnDropped, filterOptions) .WithTimeout(); // add some of the events we want to see after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); - + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); + // wait until all events were received and at least one checkpoint was reached? await receivedAllEvents.Task.WithTimeout(); await checkpointReached.Task.WithTimeout(); - + // await Task.WhenAll(receivedAllEvents.Task, checkpointReached.Task).WithTimeout(); // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) subscriptionDropped.Task.IsCompleted.ShouldBe(false, subscriptionDropped.Task.Result.ToString()); - + // stop the subscription subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); @@ -264,11 +302,16 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) ); receivedAllEvents.TrySetException( - new InvalidOperationException($"Received unexpected event {re.OriginalEvent.EventId} from stream {re.OriginalEvent.EventStreamId}") + new InvalidOperationException( + $"Received unexpected event {re.OriginalEvent.EventId} from stream {re.OriginalEvent.EventStreamId}" + ) + ); + } else { + Fixture.Log.Verbose( + "Received expected event {EventId} from {StreamId}.", + re.OriginalEvent.EventId, + re.OriginalEvent.EventStreamId ); - } - else { - Fixture.Log.Verbose("Received expected event {EventId} from {StreamId}.", re.OriginalEvent.EventId, re.OriginalEvent.EventStreamId); } if (availableEvents.Count == 0) { @@ -290,66 +333,81 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti Task CheckpointReached(StreamSubscription sub, Position position, CancellationToken ct) { Fixture.Log.Verbose( "Checkpoint reached {Position}. Received {ReceivedEventsCount}/{TotalEventsCount} events", - position, seedEvents.Length - availableEvents.Count, seedEvents.Length + position, + seedEvents.Length - availableEvents.Count, + seedEvents.Length ); + checkpointReached.TrySetResult(true); return Task.CompletedTask; } } - + [Theory] - [MemberData(nameof(SubscriptionFilter.TestCases), MemberType= typeof(SubscriptionFilter))] + [MemberData(nameof(SubscriptionFilter.TestCases), MemberType = typeof(SubscriptionFilter))] public async Task receives_all_filtered_events_from_end(SubscriptionFilter filter) { var streamPrefix = $"{nameof(receives_all_filtered_events_from_end)}-{filter.Name}-{Guid.NewGuid():N}"; - + Fixture.Log.Information("Using filter {FilterName} with prefix {StreamPrefix}", filter.Name, streamPrefix); - + var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var checkpointReached = new TaskCompletionSource(); - + var seedEvents = Fixture.CreateTestEvents(64) .Select(evt => filter.PrepareEvent(streamPrefix, evt)) .ToArray(); - + var pageSize = seedEvents.Length / 2; - + // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, Fixture.CreateTestEvents(3)); - + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); + var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start).CountAsync(); Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); - + // add some of the events that are a match to the filter but will not be received foreach (var evt in seedEvents.Take(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); - + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); + var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1, CheckpointReached); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync(FromAll.End, OnReceived, false, OnDropped, filterOptions) .WithTimeout(); // add the events we want to receive after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); - + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); + // wait until all events were received and at least one checkpoint was reached? await receivedAllEvents.Task.WithTimeout(); await checkpointReached.Task.WithTimeout(); - + // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) subscriptionDropped.Task.IsCompleted.ShouldBe(false, subscriptionDropped.Task.Result.ToString()); - + // stop the subscription subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { @@ -361,11 +419,16 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) ); receivedAllEvents.TrySetException( - new InvalidOperationException($"Received unexpected event {re.OriginalEvent.EventId} from stream {re.OriginalEvent.EventStreamId}") + new InvalidOperationException( + $"Received unexpected event {re.OriginalEvent.EventId} from stream {re.OriginalEvent.EventStreamId}" + ) + ); + } else { + Fixture.Log.Verbose( + "Received expected event {EventId} from {StreamId}", + re.OriginalEvent.EventId, + re.OriginalEvent.EventStreamId ); - } - else { - Fixture.Log.Verbose("Received expected event {EventId} from {StreamId}", re.OriginalEvent.EventId, re.OriginalEvent.EventStreamId); } if (availableEvents.Count == 0) { @@ -387,69 +450,84 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti Task CheckpointReached(StreamSubscription sub, Position position, CancellationToken ct) { Fixture.Log.Verbose( "Checkpoint reached {Position}. Received {ReceivedEventsCount}/{TotalEventsCount} events", - position, pageSize - availableEvents.Count, pageSize + position, + pageSize - availableEvents.Count, + pageSize ); + checkpointReached.TrySetResult(true); return Task.CompletedTask; } } [Theory] - [MemberData(nameof(SubscriptionFilter.TestCases), MemberType= typeof(SubscriptionFilter))] + [MemberData(nameof(SubscriptionFilter.TestCases), MemberType = typeof(SubscriptionFilter))] public async Task receives_all_filtered_events_from_position(SubscriptionFilter filter) { var streamPrefix = $"{nameof(receives_all_filtered_events_from_position)}-{filter.Name}-{Guid.NewGuid():N}"; - + Fixture.Log.Information("Using filter {FilterName} with prefix {StreamPrefix}", filter.Name, streamPrefix); - + var receivedAllEvents = new TaskCompletionSource(); var subscriptionDropped = new TaskCompletionSource(); var checkpointReached = new TaskCompletionSource(); - + var seedEvents = Fixture.CreateTestEvents(64) .Select(evt => filter.PrepareEvent(streamPrefix, evt)) .ToArray(); - + var pageSize = seedEvents.Length / 2; - + // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); // add noise - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, Fixture.CreateTestEvents(3)); - + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents(3) + ); + var existingEventsCount = await Fixture.Streams.ReadAllAsync(Direction.Forwards, Position.Start).CountAsync(); Fixture.Log.Debug("Existing events count: {ExistingEventsCount}", existingEventsCount); - + // add some of the events that are a match to the filter but will not be received IWriteResult writeResult = new SuccessResult(); foreach (var evt in seedEvents.Take(pageSize)) - writeResult = await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); + writeResult = await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); var position = FromAll.After(writeResult.LogPosition); - + var filterOptions = new SubscriptionFilterOptions(filter.Create(streamPrefix), 1, CheckpointReached); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync(position, OnReceived, false, OnDropped, filterOptions) .WithTimeout(); // add the events we want to receive after we start the subscription foreach (var evt in seedEvents.Skip(pageSize)) - await Fixture.Streams.AppendToStreamAsync($"{streamPrefix}-{evt.EventId.ToGuid():N}", StreamState.NoStream, new[] { evt }); - + await Fixture.Streams.AppendToStreamAsync( + $"{streamPrefix}-{evt.EventId.ToGuid():N}", + StreamState.NoStream, + new[] { evt } + ); + // wait until all events were received and at least one checkpoint was reached? await receivedAllEvents.Task.WithTimeout(); await checkpointReached.Task.WithTimeout(); - + // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) subscriptionDropped.Task.IsCompleted.ShouldBe(false, subscriptionDropped.Task.Result.ToString()); - + // stop the subscription subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { @@ -461,11 +539,16 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) ); receivedAllEvents.TrySetException( - new InvalidOperationException($"Received unexpected event {re.OriginalEvent.EventId} from stream {re.OriginalEvent.EventStreamId}") + new InvalidOperationException( + $"Received unexpected event {re.OriginalEvent.EventId} from stream {re.OriginalEvent.EventStreamId}" + ) + ); + } else { + Fixture.Log.Verbose( + "Received expected event {EventId} from {StreamId}", + re.OriginalEvent.EventId, + re.OriginalEvent.EventStreamId ); - } - else { - Fixture.Log.Verbose("Received expected event {EventId} from {StreamId}", re.OriginalEvent.EventId, re.OriginalEvent.EventStreamId); } if (availableEvents.Count == 0) { @@ -487,13 +570,16 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti Task CheckpointReached(StreamSubscription sub, Position position, CancellationToken ct) { Fixture.Log.Verbose( "Checkpoint reached {Position}. Received {ReceivedEventsCount}/{TotalEventsCount} events", - position, pageSize - availableEvents.Count, pageSize + position, + pageSize - availableEvents.Count, + pageSize ); + checkpointReached.TrySetResult(true); return Task.CompletedTask; } } - + [Fact] public async Task receives_all_filtered_events_with_resolved_links() { var streamName = Fixture.GetStreamName(); @@ -503,17 +589,15 @@ public async Task receives_all_filtered_events_with_resolved_links() { var seedEvents = Fixture.CreateTestEvents(3).ToArray(); var availableEvents = new HashSet(seedEvents.Select(x => x.EventId)); - + await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); - var options = new SubscriptionFilterOptions( - StreamFilter.Prefix($"$et-{EventStoreFixture.TestEventType}") - ); - + var options = new SubscriptionFilterOptions(StreamFilter.Prefix($"$et-{EventStoreFixture.TestEventType}")); + using var subscription = await Fixture.Streams .SubscribeToAllAsync(FromAll.Start, OnReceived, true, OnDropped, options) .WithTimeout(); - + await receivedAllEvents.Task.WithTimeout(); // if the subscription dropped before time, raise the reason why @@ -524,32 +608,37 @@ public async Task receives_all_filtered_events_with_resolved_links() { subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(SubscriptionDroppedResult.Disposed()); - + return; Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { var hasResolvedLink = re.OriginalEvent.EventStreamId.StartsWith($"$et-{EventStoreFixture.TestEventType}"); if (availableEvents.RemoveWhere(x => x == re.Event.EventId && hasResolvedLink) == 0) { - Fixture.Log.Debug("Received unexpected event {EventId} from stream {StreamId}", re.Event.EventId, re.OriginalEvent.EventStreamId); + Fixture.Log.Debug( + "Received unexpected event {EventId} from stream {StreamId}", + re.Event.EventId, + re.OriginalEvent.EventStreamId + ); + return Task.CompletedTask; } - + if (availableEvents.Count == 0) { receivedAllEvents.TrySetResult(true); Fixture.Log.Information("Received all {TotalEventsCount} expected events", seedEvents.Length); } - + return Task.CompletedTask; } void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } - + [Fact] public async Task drops_when_disposed() { var subscriptionDropped = new TaskCompletionSource(); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync( FromAll.Start, @@ -562,7 +651,7 @@ public async Task drops_when_disposed() { // if the subscription dropped before time, raise the reason why if (subscriptionDropped.Task.IsCompleted) subscriptionDropped.Task.IsCompleted.ShouldBe(false, subscriptionDropped.Task.Result.ToString()); - + // stop the subscription subscription.Dispose(); var result = await subscriptionDropped.Task.WithTimeout(); @@ -574,7 +663,7 @@ public async Task drops_when_subscriber_error() { var expectedResult = SubscriptionDroppedResult.SubscriberError(); var subscriptionDropped = new TaskCompletionSource(); - + using var subscription = await Fixture.Streams .SubscribeToAllAsync( FromAll.Start, @@ -584,9 +673,36 @@ public async Task drops_when_subscriber_error() { ) .WithTimeout(); - await Fixture.Streams.AppendToStreamAsync(Fixture.GetStreamName(), StreamState.NoStream, Fixture.CreateTestEvents()); + await Fixture.Streams.AppendToStreamAsync( + Fixture.GetStreamName(), + StreamState.NoStream, + Fixture.CreateTestEvents() + ); var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(expectedResult); } + + [Fact] + public async Task subscription_drops_due_to_cancellation_token() { + var subscriptionDropped = new TaskCompletionSource(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + + using var subscription = await Fixture.Streams + .SubscribeToAllAsync( + FromAll.Start, + (sub, re, ct) => Task.CompletedTask, + false, + (sub, reason, ex) => subscriptionDropped.SetResult(new(reason, ex)), + cancellationToken: cts.Token + ) + .WithTimeout(); + + // wait until the cancellation token canels + await Task.Delay(TimeSpan.FromSeconds(3)); + + var result = await subscriptionDropped.Task.WithTimeout(); + result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed); + } } diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_stream_obsolete.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_stream_obsolete.cs index 4e05b293b..65d47fd22 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_stream_obsolete.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/Obsolete/subscribe_to_stream_obsolete.cs @@ -3,7 +3,8 @@ namespace EventStore.Client.Streams.Tests.Subscriptions.Obsolete; [Trait("Category", "Subscriptions")] [Trait("Category", "Target:Stream")] [Obsolete("Will be removed in future release when older subscriptions APIs are removed from the client")] -public class subscribe_to_stream_obsolete(ITestOutputHelper output, SubscriptionsFixture fixture) : EventStoreTests(output, fixture) { +public class subscribe_to_stream_obsolete(ITestOutputHelper output, SubscriptionsFixture fixture) + : EventStoreTests(output, fixture) { [Fact] public async Task receives_all_events_from_start() { var streamName = Fixture.GetStreamName(); @@ -65,7 +66,12 @@ public async Task receives_all_events_from_position() { // only the second half of the events will be received var availableEvents = new HashSet(seedEvents.Skip(pageSize).Select(x => x.EventId)); - var writeResult = await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents.Take(pageSize)); + var writeResult = await Fixture.Streams.AppendToStreamAsync( + streamName, + StreamState.NoStream, + seedEvents.Take(pageSize) + ); + var streamPosition = StreamPosition.FromStreamRevision(writeResult.NextExpectedStreamRevision); var checkpoint = FromStream.After(streamPosition); @@ -73,7 +79,11 @@ public async Task receives_all_events_from_position() { .SubscribeToStreamAsync(streamName, checkpoint, OnReceived, false, OnDropped) .WithTimeout(); - await Fixture.Streams.AppendToStreamAsync(streamName, writeResult.NextExpectedStreamRevision, seedEvents.Skip(pageSize)); + await Fixture.Streams.AppendToStreamAsync( + streamName, + writeResult.NextExpectedStreamRevision, + seedEvents.Skip(pageSize) + ); await receivedAllEvents.Task.WithTimeout(); @@ -266,7 +276,13 @@ public async Task receives_all_events_with_resolved_links() { await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents); using var subscription = await Fixture.Streams - .SubscribeToStreamAsync($"$et-{EventStoreFixture.TestEventType}", FromStream.Start, OnReceived, true, OnDropped) + .SubscribeToStreamAsync( + $"$et-{EventStoreFixture.TestEventType}", + FromStream.Start, + OnReceived, + true, + OnDropped + ) .WithTimeout(); await receivedAllEvents.Task.WithTimeout(); @@ -285,7 +301,12 @@ public async Task receives_all_events_with_resolved_links() { Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) { var hasResolvedLink = re.OriginalEvent.EventStreamId.StartsWith($"$et-{EventStoreFixture.TestEventType}"); if (availableEvents.RemoveWhere(x => x == re.Event.EventId && hasResolvedLink) == 0) { - Fixture.Log.Debug("Received unexpected event {EventId} from stream {StreamId}", re.Event.EventId, re.OriginalEvent.EventStreamId); + Fixture.Log.Debug( + "Received unexpected event {EventId} from stream {StreamId}", + re.Event.EventId, + re.OriginalEvent.EventStreamId + ); + return Task.CompletedTask; } @@ -300,4 +321,30 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } + + [Fact] + public async Task subscription_drops_due_to_cancellation_token() { + var subscriptionDropped = new TaskCompletionSource(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + + var streamName = Fixture.GetStreamName(); + + using var subscription = await Fixture.Streams + .SubscribeToStreamAsync( + streamName, + FromStream.Start, + (sub, re, ct) => Task.CompletedTask, + false, + (sub, reason, ex) => subscriptionDropped.SetResult(new(reason, ex)), + cancellationToken: cts.Token + ) + .WithTimeout(); + + // wait until the cancellation token canels + await Task.Delay(TimeSpan.FromSeconds(3)); + + var result = await subscriptionDropped.Task.WithTimeout(); + result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed); + } } diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/SubscriptionDroppedResult.cs b/test/EventStore.Client.Tests.Common/SubscriptionDroppedResult.cs similarity index 100% rename from test/EventStore.Client.Streams.Tests/Subscriptions/SubscriptionDroppedResult.cs rename to test/EventStore.Client.Tests.Common/SubscriptionDroppedResult.cs