From d6a4da3468f8d5a6d778563e76ad269259538d6b Mon Sep 17 00:00:00 2001 From: BEagle1984 Date: Sun, 4 Apr 2021 18:04:37 +0200 Subject: [PATCH] fix: #128 properly handle IOutboxWriter lifecycle --- Directory.Build.props | 2 +- docs/releases.md | 6 ++++ .../Outbound/Routing/ProduceBehavior.cs | 7 ++-- .../OutboxProduceStrategy.cs | 11 +++--- .../Kafka/OutboxTests.cs | 36 +++++++++++++------ 5 files changed, 45 insertions(+), 17 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index e689214f2..3536ecf1b 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 3.0.0$(BaseVersionSuffix) + 3.0.1$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/releases.md b/docs/releases.md index f23083c08..b5ce9ad04 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,6 +4,12 @@ uid: releases # Releases +## [3.0.1](https://github.com/BEagle1984/silverback/releases/tag/v3.0.1) + +### Fixes + +* Fix lifecycle [[#128](https://github.com/BEagle1984/silverback/issues/128)] + ## [3.0.0](https://github.com/BEagle1984/silverback/releases/tag/v3.0.0) ### What's new diff --git a/src/Silverback.Integration/Messaging/Outbound/Routing/ProduceBehavior.cs b/src/Silverback.Integration/Messaging/Outbound/Routing/ProduceBehavior.cs index 1c99aa706..2a3b689f0 100644 --- a/src/Silverback.Integration/Messaging/Outbound/Routing/ProduceBehavior.cs +++ b/src/Silverback.Integration/Messaging/Outbound/Routing/ProduceBehavior.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Silverback.Messaging.Broker; using Silverback.Messaging.Messages; using Silverback.Messaging.Publishing; using Silverback.Util; @@ -18,6 +19,8 @@ public class ProduceBehavior : IBehavior, ISorted { private readonly IServiceProvider _serviceProvider; + private IProduceStrategyImplementation? _produceStrategyImplementation; + /// /// Initializes a new instance of the class. /// @@ -40,8 +43,8 @@ public ProduceBehavior(IServiceProvider serviceProvider) if (message is IOutboundEnvelope envelope) { - await envelope.Endpoint.Strategy.Build(_serviceProvider).ProduceAsync(envelope) - .ConfigureAwait(false); + _produceStrategyImplementation ??= envelope.Endpoint.Strategy.Build(_serviceProvider); + await _produceStrategyImplementation.ProduceAsync(envelope).ConfigureAwait(false); } return await next(message).ConfigureAwait(false); diff --git a/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxProduceStrategy.cs b/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxProduceStrategy.cs index 896a3e3c6..c7ddc6ac4 100644 --- a/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxProduceStrategy.cs +++ b/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxProduceStrategy.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Silverback.Diagnostics; +using Silverback.Messaging.Broker; using Silverback.Messaging.Messages; using Silverback.Util; @@ -17,11 +18,9 @@ namespace Silverback.Messaging.Outbound.TransactionalOutbox /// public class OutboxProduceStrategy : IProduceStrategy { - private OutboxProduceStrategyImplementation? _implementation; - /// public IProduceStrategyImplementation Build(IServiceProvider serviceProvider) => - _implementation ??= new OutboxProduceStrategyImplementation( + new OutboxProduceStrategyImplementation( serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService>()); @@ -31,6 +30,8 @@ private class OutboxProduceStrategyImplementation : IProduceStrategyImplementati private readonly IOutboundLogger _logger; + private IProducer? _producer; + public OutboxProduceStrategyImplementation( TransactionalOutboxBroker outboundQueueBroker, IOutboundLogger logger) @@ -45,7 +46,9 @@ public Task ProduceAsync(IOutboundEnvelope envelope) _logger.LogWrittenToOutbox(envelope); - return _outboundQueueBroker.GetProducer(envelope.Endpoint).ProduceAsync(envelope); + _producer ??= _outboundQueueBroker.GetProducer(envelope.Endpoint); + + return _producer.ProduceAsync(envelope); } } } diff --git a/tests/Silverback.Integration.Tests.E2E/Kafka/OutboxTests.cs b/tests/Silverback.Integration.Tests.E2E/Kafka/OutboxTests.cs index dbb73b18f..9afd8dcf6 100644 --- a/tests/Silverback.Integration.Tests.E2E/Kafka/OutboxTests.cs +++ b/tests/Silverback.Integration.Tests.E2E/Kafka/OutboxTests.cs @@ -42,7 +42,11 @@ public async Task OutboxProduceStrategy_DefaultSettings_ProducedAndConsumed() .AddOutboxWorker(TimeSpan.FromMilliseconds(100))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://e2e"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://e2e"; + }) .AddOutbound( endpoint => endpoint .ProduceTo(DefaultTopicName) @@ -58,15 +62,19 @@ public async Task OutboxProduceStrategy_DefaultSettings_ProducedAndConsumed() .AddIntegrationSpyAndSubscriber()) .Run(); - var publisher = Host.ScopedServiceProvider.GetRequiredService(); - var dbContext = Host.ScopedServiceProvider.GetRequiredService(); - - for (int i = 1; i <= 15; i++) + for (int i = 0; i < 3; i++) { - await publisher.PublishAsync(new TestEventOne { Content = $"{i}" }); - } + using var scope = Host.ServiceProvider.CreateScope(); + var publisher = scope.ServiceProvider.GetRequiredService(); + var dbContext = scope.ServiceProvider.GetRequiredService(); - await dbContext.SaveChangesAsync(); + for (int j = (i * 5) + 1; j <= (i + 1) * 5; j++) + { + await publisher.PublishAsync(new TestEventOne { Content = $"{j}" }); + } + + await dbContext.SaveChangesAsync(); + } await Helper.WaitUntilAllMessagesAreConsumedAsync(); @@ -96,7 +104,11 @@ public async Task OutboxProduceStrategy_TransactionAborted_MessageNotProduced() .AddOutboxWorker(TimeSpan.FromMilliseconds(100))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://e2e"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://e2e"; + }) .AddOutbound( endpoint => endpoint .ProduceTo(DefaultTopicName) @@ -148,7 +160,11 @@ public async Task OutboxProduceStrategy_WithEndpointNameFunction_ProducedToPrope .AddOutboxWorker(TimeSpan.FromMilliseconds(100))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://e2e"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://e2e"; + }) .AddOutbound( endpoint => endpoint .ProduceTo(