Skip to content

Commit

Permalink
Merge pull request #47 from Noblix/main
Browse files Browse the repository at this point in the history
Fix bug related to required version
  • Loading branch information
LarsSkovslund authored Oct 10, 2023
2 parents 6118592 + 2407ffa commit 9b39c38
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 34 deletions.
3 changes: 2 additions & 1 deletion src/Atc.Cosmos.EventStore.Cqrs/Commands/StateProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public async ValueTask<IStreamState> ProjectAsync(
.ReadFromStreamAsync(
state.Id,
command.RequiredVersion?.Value ?? StreamVersion.Any,
cancellationToken: cancellationToken)
filter: null,
cancellationToken)
.ConfigureAwait(false))
{
await handlerMetadata
Expand Down
1 change: 1 addition & 0 deletions src/Atc.Cosmos.EventStore/IsExternalInit.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("Atc.Cosmos.EventStore.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("Atc.Cosmos.EventStore.Cqrs")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("Atc.Cosmos.EventStore.Cqrs.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo("DynamicProxyGenAssembly2")]

namespace System.Runtime.CompilerServices;
Expand Down
5 changes: 0 additions & 5 deletions src/Atc.Cosmos.EventStore/StreamReadFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,4 @@ public class StreamReadFilter
/// Gets or sets the type of events to read from the stream.
/// </summary>
public IReadOnlyCollection<EventName>? IncludeEvents { get; set; }

/// <summary>
/// Gets or sets the required version the stream must be at.
/// </summary>
public StreamVersion? RequiredVersion { get; set; }
}
2 changes: 1 addition & 1 deletion src/Atc.Cosmos.EventStore/Streams/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async IAsyncEnumerable<IEvent> ReadAsync(

readValidator.Validate(
metadata,
filter?.RequiredVersion ?? StreamVersion.Any);
fromVersion);

// If we don't have any events in the stream, then skip reading from stream.
if (metadata.Version == 0)
Expand Down
290 changes: 290 additions & 0 deletions test/Atc.Cosmos.EventStore.Cqrs.Tests/Commands/StateProjectorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
using Atc.Cosmos.EventStore.Cqrs.Commands;
using Atc.Cosmos.EventStore.Cqrs.Tests.Mocks;
using Atc.Cosmos.EventStore.Streams;
using Atc.Test;
using AutoFixture.AutoNSubstitute;
using AutoFixture.Xunit2;
using FluentAssertions;
using NSubstitute;
using Xunit;

namespace Atc.Cosmos.EventStore.Cqrs.Tests.Commands;

public class StateProjectorTests
{
[Theory, AutoNSubstituteData]
internal async Task Should_Check_If_Handler_Consumes_Events(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
StateProjector<MockCommand> sut,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
await sut.ProjectAsync(command, handler, cancellationToken);

handlerMetadata.Received().IsNotConsumingEvents();
}

[Theory, AutoNSubstituteData]
internal async Task Should_Read_Metadata_For_Handler_With_No_Consumes(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(true);

await sut.ProjectAsync(command, handler, cancellationToken);

await eventStore.Received()
.GetStreamInfoAsync(
command.GetEventStreamId().Value,
cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Validate_Metadata_For_Handler_With_No_Consumes(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
[Frozen, Substitute] IStreamReadValidator validator,
StateProjector<MockCommand> sut,
StreamMetadata metadata,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(true);
eventStore
.GetStreamInfoAsync(default, default)
.ReturnsForAnyArgs(metadata);

await sut.ProjectAsync(command, handler, cancellationToken);

validator
.Received(1)
.Validate(
metadata,
command.RequiredVersion!.Value.Value);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Default_To_Any_Version_When_Validating_Metadata(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
[Frozen, Substitute] IStreamReadValidator validator,
StateProjector<MockCommand> sut,
StreamMetadata metadata,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
command = command with { RequiredVersion = null };
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(true);
eventStore
.GetStreamInfoAsync(default, default)
.ReturnsForAnyArgs(metadata);

await sut.ProjectAsync(command, handler, cancellationToken);

validator
.Received(1)
.Validate(
metadata,
StreamVersion.Any);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Return_State_With_Version_From_Metadata_For_Handler_With_No_Consumes(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
StreamMetadata metadata,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(true);
eventStore
.GetStreamInfoAsync(default, default)
.ReturnsForAnyArgs(metadata);

var result = await sut.ProjectAsync(command, handler, cancellationToken);

result.Should()
.BeEquivalentTo(
new Cqrs.Commands.StreamState()
{
Id = command.GetEventStreamId().Value,
Version = metadata.Version,
});
}

[Theory, AutoNSubstituteData]
internal async Task Should_Not_Read_Stream_From_Event_Store_For_Handler_With_No_Consumes(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
StreamMetadata metadata,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(true);
eventStore
.GetStreamInfoAsync(default, default)
.ReturnsForAnyArgs(metadata);

await sut.ProjectAsync(command, handler, cancellationToken);

_ = eventStore
.DidNotReceiveWithAnyArgs()
.ReadFromStreamAsync(
default,
default,
default,
default);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Read_Stream_From_Event_Store_When_Handler_Consumes(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(false);

await sut.ProjectAsync(command, handler, cancellationToken);

_ = eventStore.Received()
.ReadFromStreamAsync(
command.GetEventStreamId().Value,
command.RequiredVersion!.Value.Value,
filter: null,
cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Default_To_Any_Version_When_Reading_Event_Store(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
command = command with { RequiredVersion = null };
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(false);

await sut.ProjectAsync(command, handler, cancellationToken);

_ = eventStore.Received()
.ReadFromStreamAsync(
command.GetEventStreamId().Value,
StreamVersion.Any,
filter: null,
cancellationToken);
}

[Theory, AutoNSubstituteData]
internal async Task Should_Consume_Events_Found_In_Stream(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
ICollection<MockEvent> events,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
command = command with { RequiredVersion = null };
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(false);
eventStore
.ReadFromStreamAsync(
default,
default,
default,
default)
.ReturnsForAnyArgs(ToAsyncEnumerable(events));

await sut.ProjectAsync(command, handler, cancellationToken);

foreach (var evt in events)
{
await handlerMetadata
.Received()
.ConsumeAsync(
evt,
handler,
cancellationToken);
}
}

[Theory, AutoNSubstituteData]
internal async Task Should_Return_State_With_Version_From_Last_Event_When_Handler_Consumes(
[Frozen] ICommandHandlerMetadata<MockCommand> handlerMetadata,
[Frozen] IEventStoreClient eventStore,
StateProjector<MockCommand> sut,
ICollection<MockEvent> events,
MockEvent lastEvent,
MockEventMetadata lastEventMetadata,
MockCommand command,
ICommandHandler<MockCommand> handler,
CancellationToken cancellationToken)
{
handlerMetadata
.IsNotConsumingEvents()
.ReturnsForAnyArgs(false);
lastEvent.Metadata = lastEventMetadata;
events.Add(lastEvent);
eventStore
.ReadFromStreamAsync(
default,
default,
default,
default)
.ReturnsForAnyArgs(ToAsyncEnumerable(events));

var result = await sut.ProjectAsync(
command,
handler,
cancellationToken);

result.Should()
.BeEquivalentTo(
new Cqrs.Commands.StreamState()
{
Id = command.GetEventStreamId().Value,
Version = lastEventMetadata.Version,
});
}

#pragma warning disable CS1998 // Mark method as async
private static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<T> enumerable)
#pragma warning restore CS1998 // Mark method as async
{
foreach (var item in enumerable)
{
yield return item;
}
}
}
19 changes: 19 additions & 0 deletions test/Atc.Cosmos.EventStore.Cqrs.Tests/Mocks/MockCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Atc.Cosmos.EventStore.Cqrs.Tests.Mocks;

public record MockCommand : ICommand
{
private readonly EventStreamId eventStreamId = new(Guid.NewGuid().ToString());

public string CommandId { get; set; }

public string? CorrelationId { get; set; }

public EventStreamVersion? RequiredVersion { get; set; }

public OnConflict Behavior { get; set; }

public int BehaviorCount { get; set; }

public EventStreamId GetEventStreamId()
=> eventStreamId;
}
2 changes: 1 addition & 1 deletion test/Atc.Cosmos.EventStore.Tests/EventStoreClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ internal async Task Should_Return_Info_On_GetStreamInfo(
}

[Theory, AutoNSubstituteData]
internal async ValueTask Should_WriteToStream(
internal async Task Should_WriteToStream(
[Frozen, Substitute] IStreamWriter writer,
EventStoreClient sut,
StreamId streamId,
Expand Down
Loading

0 comments on commit 9b39c38

Please sign in to comment.