Skip to content

Commit

Permalink
fix: #128 properly handle IOutboxWriter lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Apr 4, 2021
1 parent 0b7bd53 commit d6a4da3
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>3.0.0$(BaseVersionSuffix)</BaseVersion>
<BaseVersion>3.0.1$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
6 changes: 6 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ uid: releases

# Releases

## [3.0.1](https://github.com/BEagle1984/silverback/releases/tag/v3.0.1)

### Fixes

* Fix <xref:Silverback.Messaging.Outbound.TransactionalOutbox.Repositories.IOutboxWriter> lifecycle [[#128](https://github.com/BEagle1984/silverback/issues/128)]

## [3.0.0](https://github.com/BEagle1984/silverback/releases/tag/v3.0.0)

### What's new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Silverback.Messaging.Broker;
using Silverback.Messaging.Messages;
using Silverback.Messaging.Publishing;
using Silverback.Util;
Expand All @@ -18,6 +19,8 @@ public class ProduceBehavior : IBehavior, ISorted
{
private readonly IServiceProvider _serviceProvider;

private IProduceStrategyImplementation? _produceStrategyImplementation;

/// <summary>
/// Initializes a new instance of the <see cref="ProduceBehavior" /> class.
/// </summary>
Expand All @@ -40,8 +43,8 @@ public ProduceBehavior(IServiceProvider serviceProvider)

if (message is IOutboundEnvelope envelope)
{
await envelope.Endpoint.Strategy.Build(_serviceProvider).ProduceAsync(envelope)
.ConfigureAwait(false);
_produceStrategyImplementation ??= envelope.Endpoint.Strategy.Build(_serviceProvider);
await _produceStrategyImplementation.ProduceAsync(envelope).ConfigureAwait(false);
}

return await next(message).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Silverback.Diagnostics;
using Silverback.Messaging.Broker;
using Silverback.Messaging.Messages;
using Silverback.Util;

Expand All @@ -17,11 +18,9 @@ namespace Silverback.Messaging.Outbound.TransactionalOutbox
/// </summary>
public class OutboxProduceStrategy : IProduceStrategy
{
private OutboxProduceStrategyImplementation? _implementation;

/// <inheritdoc cref="IProduceStrategy.Build" />
public IProduceStrategyImplementation Build(IServiceProvider serviceProvider) =>
_implementation ??= new OutboxProduceStrategyImplementation(
new OutboxProduceStrategyImplementation(
serviceProvider.GetRequiredService<TransactionalOutboxBroker>(),
serviceProvider.GetRequiredService<IOutboundLogger<OutboxProduceStrategy>>());

Expand All @@ -31,6 +30,8 @@ private class OutboxProduceStrategyImplementation : IProduceStrategyImplementati

private readonly IOutboundLogger<OutboxProduceStrategy> _logger;

private IProducer? _producer;

public OutboxProduceStrategyImplementation(
TransactionalOutboxBroker outboundQueueBroker,
IOutboundLogger<OutboxProduceStrategy> logger)
Expand All @@ -45,7 +46,9 @@ public Task ProduceAsync(IOutboundEnvelope envelope)

_logger.LogWrittenToOutbox(envelope);

return _outboundQueueBroker.GetProducer(envelope.Endpoint).ProduceAsync(envelope);
_producer ??= _outboundQueueBroker.GetProducer(envelope.Endpoint);

return _producer.ProduceAsync(envelope);
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions tests/Silverback.Integration.Tests.E2E/Kafka/OutboxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public async Task OutboxProduceStrategy_DefaultSettings_ProducedAndConsumed()
.AddOutboxWorker(TimeSpan.FromMilliseconds(100)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://e2e"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://e2e";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint
.ProduceTo(DefaultTopicName)
Expand All @@ -58,15 +62,19 @@ public async Task OutboxProduceStrategy_DefaultSettings_ProducedAndConsumed()
.AddIntegrationSpyAndSubscriber())
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();
var dbContext = Host.ScopedServiceProvider.GetRequiredService<TestDbContext>();

for (int i = 1; i <= 15; i++)
for (int i = 0; i < 3; i++)
{
await publisher.PublishAsync(new TestEventOne { Content = $"{i}" });
}
using var scope = Host.ServiceProvider.CreateScope();
var publisher = scope.ServiceProvider.GetRequiredService<IEventPublisher>();
var dbContext = scope.ServiceProvider.GetRequiredService<TestDbContext>();

await dbContext.SaveChangesAsync();
for (int j = (i * 5) + 1; j <= (i + 1) * 5; j++)
{
await publisher.PublishAsync(new TestEventOne { Content = $"{j}" });
}

await dbContext.SaveChangesAsync();
}

await Helper.WaitUntilAllMessagesAreConsumedAsync();

Expand Down Expand Up @@ -96,7 +104,11 @@ public async Task OutboxProduceStrategy_TransactionAborted_MessageNotProduced()
.AddOutboxWorker(TimeSpan.FromMilliseconds(100)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://e2e"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://e2e";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint
.ProduceTo(DefaultTopicName)
Expand Down Expand Up @@ -148,7 +160,11 @@ public async Task OutboxProduceStrategy_WithEndpointNameFunction_ProducedToPrope
.AddOutboxWorker(TimeSpan.FromMilliseconds(100)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://e2e"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://e2e";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint
.ProduceTo(
Expand Down

0 comments on commit d6a4da3

Please sign in to comment.