Skip to content

Commit

Permalink
fix: prevent deadlock in OutboxWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed May 18, 2023
1 parent a59f3ec commit 37f06ae
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>4.3.0$(BaseVersionSuffix)</BaseVersion>
<BaseVersion>4.3.1$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
6 changes: 6 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ uid: releases

# Releases

## [4.3.1](https://github.com/BEagle1984/silverback/releases/tag/v4.3.1)

### Fixes

* Fix deadlock in `OutboxWorker` when `enforceMessageOrder=true` (default)

## [4.3.0](https://github.com/BEagle1984/silverback/releases/tag/v4.3.0)

### What's new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,28 @@ private async Task ProcessQueueAsync(
{
_logger.LogProcessingOutboxStoredMessage(i + 1, outboxMessages.Count);

await ProcessMessageAsync(
outboxMessages[i],
failedMessages,
outboxReader,
serviceProvider)
.ConfigureAwait(false);
try
{
await ProcessMessageAsync(
outboxMessages[i],
failedMessages,
outboxReader,
serviceProvider)
.ConfigureAwait(false);
}
catch (Exception)
{
// Subtract the produce operations that will never be initiated
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1));
throw;
}

if (stoppingToken.IsCancellationRequested)
{
// Subtract the produce operations that will never be initiated
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1));
break;
}
}
}
finally
Expand Down Expand Up @@ -247,8 +260,7 @@ private IProducerEndpoint GetTargetEndpoint(

var targetEndpoint = outboundRoutes
.SelectMany(route => route.GetOutboundRouter(serviceProvider).Endpoints)
.FirstOrDefault(
endpoint => endpoint.Name == endpointName || endpoint.DisplayName == endpointName);
.FirstOrDefault(endpoint => endpoint.Name == endpointName || endpoint.DisplayName == endpointName);

if (targetEndpoint == null)
{
Expand Down
34 changes: 34 additions & 0 deletions src/Silverback.Storage.Memory/Silverback.Storage.Memory.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
<RootNamespace>$(RootNamespace)</RootNamespace>
<Version>$(BaseVersion)</Version>
<Authors>$(Authors)</Authors>
<Company>$(Company)</Company>
<PackageLicenseExpression>$(License)</PackageLicenseExpression>
<Copyright>$(Copyright)</Copyright>
<PackageProjectUrl>$(ProjectUrl)</PackageProjectUrl>
<RepositoryUrl>$(RepositoryUrl)</RepositoryUrl>
<RepositoryType>$(RepositoryType)</RepositoryType>
<GeneratePackageOnBuild>${GeneratePackageOnBuild}</GeneratePackageOnBuild>
<Description>$(Description) This package contains an implementation of Silverback.Storage that stores the data in memory.</Description>
<PackageIconUrl>$(IconUrl)</PackageIconUrl>
<PackageTags>$(Tags)</PackageTags>
<LangVersion>$(LangVersion)</LangVersion>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
<PackageId>Silverback.Storage.Memory</PackageId>
<Product>Silverback.Storage.Memory</Product>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Silverback.Core\Silverback.Core.csproj" />
<ProjectReference Include="..\Silverback.Integration\Silverback.Integration.csproj" />
<ProjectReference Include="..\Silverback.Integration.Kafka\Silverback.Integration.Kafka.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.1" />
</ItemGroup>

</Project>
36 changes: 36 additions & 0 deletions src/Silverback.Storage.Sqlite/Silverback.Storage.Sqlite.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
<RootNamespace>$(RootNamespace)</RootNamespace>
<Version>$(BaseVersion)</Version>
<Authors>$(Authors)</Authors>
<Company>$(Company)</Company>
<PackageLicenseExpression>$(License)</PackageLicenseExpression>
<Copyright>$(Copyright)</Copyright>
<PackageProjectUrl>$(ProjectUrl)</PackageProjectUrl>
<RepositoryUrl>$(RepositoryUrl)</RepositoryUrl>
<RepositoryType>$(RepositoryType)</RepositoryType>
<GeneratePackageOnBuild>${GeneratePackageOnBuild}</GeneratePackageOnBuild>
<Description>$(Description) This package contains an implementation of Silverback.Storage that stores the data in Sqlite.</Description>
<PackageIconUrl>$(IconUrl)</PackageIconUrl>
<PackageTags>$(Tags)</PackageTags>
<LangVersion>$(LangVersion)</LangVersion>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
<PackageId>Silverback.Storage.Sqlite</PackageId>
<Product>Silverback.Storage.Sqlite</Product>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Silverback.Core\Silverback.Core.csproj" />
<ProjectReference Include="..\Silverback.Integration\Silverback.Integration.csproj" />
<ProjectReference Include="..\Silverback.Storage.Memory\Silverback.Storage.Memory.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Data.Sqlite" Version="7.0.1" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,56 @@ await _outboxWriter.WriteAsync(

_broker.ProducedMessages.Should().HaveCount(3);
}

[Fact]
public async Task ProcessQueue_ProduceError_Retried()
{
await _outboxWriter.WriteAsync(
new TestEventOne { Content = "Test" },
null,
null,
"topic1",
"topic1");
await _outboxWriter.WriteAsync(
new TestEventTwo { Content = "Test" },
null,
null,
"topic2",
"topic2");
await _outboxWriter.WriteAsync(
new TestEventOne { Content = "Test" },
null,
null,
"topic1",
"topic1");
await _outboxWriter.WriteAsync(
new TestEventOne { Content = "Test" },
null,
null,
"topic1",
"topic1");
await _outboxWriter.CommitAsync();

_broker.FailProduceNumber = new[] { 2, 3 }; // Note: counter is per producer / topic

await _worker.ProcessQueueAsync(CancellationToken.None);

_broker.ProducedMessages.Should().HaveCount(2);
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");

await _worker.ProcessQueueAsync(CancellationToken.None);

_broker.ProducedMessages.Should().HaveCount(2);
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");

await _worker.ProcessQueueAsync(CancellationToken.None);

_broker.ProducedMessages.Should().HaveCount(3);
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");
_broker.ProducedMessages[2].Endpoint.Name.Should().Be("topic1");
}
}
}
2 changes: 2 additions & 0 deletions tests/Silverback.Integration.Tests/TestTypes/TestBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public TestBroker(IServiceProvider serviceProvider)

public bool SimulateConnectIssues { get; set; }

public IReadOnlyCollection<int>? FailProduceNumber { get; set; }

protected override Task ConnectAsync(
IReadOnlyCollection<IProducer> producers,
IReadOnlyCollection<IConsumer> consumers)
Expand Down
19 changes: 15 additions & 4 deletions tests/Silverback.Integration.Tests/TestTypes/TestProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using NSubstitute;
using Silverback.Diagnostics;
Expand All @@ -17,6 +18,8 @@ namespace Silverback.Tests.Integration.TestTypes
{
public class TestProducer : Producer<TestBroker, TestProducerEndpoint>
{
private int _produceCount;

public TestProducer(
TestBroker broker,
TestProducerEndpoint endpoint,
Expand Down Expand Up @@ -51,7 +54,7 @@ public TestProducer(
IReadOnlyCollection<MessageHeader>? headers,
string actualEndpointName)
{
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
PerformMockProduce(messageBytes, headers);
return null;
}

Expand Down Expand Up @@ -80,7 +83,7 @@ protected override void ProduceCore(
Action<IBrokerMessageIdentifier?> onSuccess,
Action<Exception> onError)
{
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
PerformMockProduce(messageBytes, headers);
onSuccess.Invoke(null);
}

Expand All @@ -101,7 +104,7 @@ await messageStream.ReadAllAsync().ConfigureAwait(false),
IReadOnlyCollection<MessageHeader>? headers,
string actualEndpointName)
{
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
PerformMockProduce(messageBytes, headers);
return Task.FromResult<IBrokerMessageIdentifier?>(null);
}

Expand All @@ -128,9 +131,17 @@ protected override Task ProduceCoreAsync(
Action<IBrokerMessageIdentifier?> onSuccess,
Action<Exception> onError)
{
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
PerformMockProduce(messageBytes, headers);
onSuccess.Invoke(null);
return Task.CompletedTask;
}

private void PerformMockProduce(byte[]? messageBytes, IReadOnlyCollection<MessageHeader>? headers)
{
if (Broker.FailProduceNumber != null && Broker.FailProduceNumber.Contains(++_produceCount))
throw new InvalidOperationException("Produce failed (mock).");

ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
}
}
}

0 comments on commit 37f06ae

Please sign in to comment.