Skip to content

Commit

Permalink
feat: outbox with autopartitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
mt89vein committed Aug 18, 2024
1 parent 61598ee commit 72c8a09
Show file tree
Hide file tree
Showing 37 changed files with 1,492 additions and 203 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ dotnet_diagnostic.SYSLIB1045.severity = none # Use generate regex

dotnet_diagnostic.CA1069.severity = error # enum value duplicates
dotnet_diagnostic.IDE0290.severity = none # use primary constructor
dotnet_diagnostic.CA1014.severity = none # mark assemblies as CLSCompliant
dotnet_diagnostic.CA1515.severity = none # API application types should be internal
dotnet_diagnostic.CA1819.severity = none # properties should not be array.



Expand Down
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Set default behavior to automatically normalize line endings.
###############################################################################
* text=auto
*.sh eol=lf

###############################################################################
# Set default behavior for command prompt diff.
Expand Down
1 change: 1 addition & 0 deletions Sstv.Outbox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SolutionItems", "SolutionIt
BannedSymbols.txt = BannedSymbols.txt
readme.md = readme.md
Directory.Build.props = Directory.Build.props
.editorconfig = .editorconfig
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sstv.Outbox", "src\Sstv.Outbox\Sstv.Outbox.csproj", "{B174BD28-7926-4CC0-BD9B-E290440BD872}"
Expand Down
2 changes: 1 addition & 1 deletion dev/kafka/update_run.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/sh
# This script is required to run kafka cluster without zookeper
# This script is required to run kafka cluster without zookeper

# Docker workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,22 @@ public async Task<IEnumerable<TOutboxItem>> LockAndReturnItemsBatchAsync(Cancell

_transaction = await _dbContext.Database.BeginTransactionAsync(ct);

string sql;
if (_options.GetPriorityFeature().Enabled)
{
sql = $"""
SELECT * FROM {m.QualifiedTableName}
WHERE {m.RetryAfter} is null or {m.RetryAfter} <= '{DateTimeOffset.UtcNow:O}'::timestamptz
ORDER BY {m.Priority} DESC, {m.Id} ASC, {m.RetryAfter} ASC
LIMIT {_options.OutboxItemsLimit}
FOR UPDATE SKIP LOCKED;
""";
}
else
{
sql = $"""
SELECT * FROM {m.QualifiedTableName}
WHERE {m.RetryAfter} is null or {m.RetryAfter} <= '{DateTimeOffset.UtcNow:O}'::timestamptz
ORDER BY {m.Id} ASC, {m.RetryAfter} ASC
LIMIT {_options.OutboxItemsLimit}
FOR UPDATE SKIP LOCKED;
var filter = _options.PartitionSettings.Enabled
? $" and {m.Status} <> {(int)OutboxItemStatus.Completed}"
: string.Empty;

var order = _options.GetPriorityFeature()
.Enabled
? $"ORDER BY {m.Priority} DESC, {m.Id} ASC, {m.RetryAfter} ASC"
: $"ORDER BY {m.Id} ASC, {m.RetryAfter} ASC";

var sql = $"""
SELECT * FROM {m.QualifiedTableName}
WHERE ({m.RetryAfter} is null or {m.RetryAfter} <= '{DateTimeOffset.UtcNow:O}'::timestamptz){filter}
{order}
LIMIT {_options.OutboxItemsLimit}
FOR UPDATE SKIP LOCKED;
""";
}

return await _dbContext
.Set<TOutboxItem>()
Expand Down Expand Up @@ -97,8 +92,11 @@ public async Task SaveAsync(
}

// hint: retried collection tracked by EF, so they automatically updated on save changes
if (!_options.PartitionSettings.Enabled)
{
_dbContext.Set<TOutboxItem>().RemoveRange(completed);
}

_dbContext.Set<TOutboxItem>().RemoveRange(completed);
await _dbContext.SaveChangesAsync(ct);
await _transaction.CommitAsync(ct);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public sealed class OutboxMaintenanceItemRepository<TDbContext, TOutboxItem> : I
/// <param name="monitor">Outbox settings.</param>
public OutboxMaintenanceItemRepository(
TDbContext dbContext,
IOptionsMonitor<OutboxOptions> monitor)
IOptionsMonitor<OutboxOptions> monitor
)
{
_dbContext = dbContext;
ArgumentNullException.ThrowIfNull(monitor);
Expand Down
172 changes: 172 additions & 0 deletions src/Sstv.Outbox.EntityFrameworkCore.Npgsql/Partitioner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Sstv.Outbox.Features.Partitions;

namespace Sstv.Outbox.EntityFrameworkCore.Npgsql;

/// <summary>
/// Table partitioner.
/// </summary>
public sealed partial class Partitioner<TDbContext, TOutboxItem> : IPartitioner<TOutboxItem>
where TDbContext : DbContext
where TOutboxItem : class, IOutboxItem
{
/// <summary>
/// Options.
/// </summary>
private readonly OutboxOptions _options;

/// <summary>
/// DbContext.
/// </summary>
private readonly TDbContext _dbContext;

/// <summary>
/// Logger.
/// </summary>
private readonly ILogger<Partitioner<TDbContext, TOutboxItem>> _logger;

/// <summary>
/// Creates new instance of <see cref="Partitioner{TDbContext,TOutboxItem}"/>.
/// </summary>
/// <param name="dbContext">DbContext.</param>
/// <param name="options">Options.</param>
/// <param name="logger">Logger.</param>
public Partitioner(
TDbContext dbContext,
IOptionsMonitor<OutboxOptions> options,
ILogger<Partitioner<TDbContext, TOutboxItem>> logger
)
{
ArgumentNullException.ThrowIfNull(dbContext);
ArgumentNullException.ThrowIfNull(options);

_options = options.Get(typeof(TOutboxItem).Name);
_dbContext = dbContext;
_logger = logger;
}

/// <summary>
/// Precreates partitions for entity.
/// </summary>
/// <param name="ct">Token for cancel operation.</param>
[SuppressMessage("Security", "EF1002:Risk of vulnerability to SQL injection.",
Justification = "There is no user input. FromSqlInterpolated incorrectly sets table name")]
public async Task CreatePartitionsAsync(CancellationToken ct = default)
{
try
{
var m = _options.GetDbMapping();

await _dbContext.Database.BeginTransactionAsync(ct);

foreach (var partition in _options.PartitionSettings.GetPartitions(m.TableName, DateTimeOffset.UtcNow.AddDays(-5)))
{
var from = _options.PartitionSettings.UuidV7Generator.ForDate(partition.DateFrom.UtcDateTime);
var to = _options.PartitionSettings.UuidV7Generator.ForDate(partition.DateTo.UtcDateTime);

var sql = $"""
CREATE TABLE IF NOT EXISTS {m.SchemaName}.{partition.PartitionTableName} PARTITION OF {m.QualifiedTableName} FOR values
FROM (overlay('{from}'::text placing '0000-0000-000000000000' from 15)::uuid)
TO (overlay('{to}'::text placing '0000-0000-000000000000' from 15)::uuid)
WITH (fillfactor = 90);
""";
await _dbContext.Database.ExecuteSqlRawAsync(sql, ct);
}

await _dbContext.Database.CommitTransactionAsync(ct);
}
catch (PostgresException e) when (e.Message.Contains("is not partitioned"))
{
PartitioningNotConfigured(e, typeof(TOutboxItem).Name);

throw;
}
catch (Exception e)
{
CreatePartitionFailed(
e,
outboxItem: typeof(TOutboxItem).Name,
dbContext: typeof(TDbContext).Name
);

throw;
}
}

/// <summary>
/// Removes old partitions for <typeparamref name="TOutboxItem"/>.
/// </summary>
/// <param name="ct">Token for cancel operation.</param>
public async Task DeleteOldPartitionsAsync(CancellationToken ct = default)
{
try
{
var m = _options.GetDbMapping();

var partitionsForDelete = _options
.PartitionSettings
.GetPartitions(
m.TableName,
startFrom: DateTimeOffset.UtcNow.AddDays(-_options.PartitionSettings.PrecreatePartitionCount))
.Reverse()
.Skip(_options.PartitionSettings.PartitionRetentionCount);

foreach (var partition in partitionsForDelete)
{
try
{
var sql = $"ALTER TABLE {m.QualifiedTableName} DETACH PARTITION {m.SchemaName}.{partition.PartitionTableName} CONCURRENTLY;";
await _dbContext.Database.ExecuteSqlRawAsync(sql, ct);

sql = $"DROP TABLE {m.SchemaName}.{partition.PartitionTableName};";
await _dbContext.Database.ExecuteSqlRawAsync(sql, ct);
}
catch (PostgresException e) when (e.Message.Contains("does not exist"))
{
continue;
}
}
}
catch (PostgresException e) when (e.Message.Contains("is not partitioned"))
{
PartitioningNotConfigured(e, typeof(TOutboxItem).Name);

throw;
}
catch (Exception e)
{
DropPartitionFailed(
e,
outboxItem: typeof(TOutboxItem).Name,
dbContext: typeof(TDbContext).Name
);

throw;
}
}

[LoggerMessage(
eventId: 0,
level: LogLevel.Error,
message: "Partitioning for {OutboxItem} not configured!"
)]
private partial void PartitioningNotConfigured(Exception e, string outboxItem);

[LoggerMessage(
eventId: 1,
level: LogLevel.Error,
message: "Error occured while precreating partitions for {OutboxItem} in {DbContext}"
)]
private partial void CreatePartitionFailed(Exception e, string outboxItem, string dbContext);

[LoggerMessage(
eventId: 2,
level: LogLevel.Error,
message: "Error occured while dropping partitions for {OutboxItem} in {DbContext}"
)]
private partial void DropPartitionFailed(Exception e, string outboxItem, string dbContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Sstv.Outbox.Features.Partitions;

namespace Sstv.Outbox.EntityFrameworkCore.Npgsql;

Expand Down Expand Up @@ -45,6 +46,7 @@ public static OutboxItemHandlerBuilder<TOutboxItem> AddOutboxItem<TDbContext, TO
using var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
o.SetDbMapping(DbMapping.GetFor<TOutboxItem>(dbContext));
o.SetPriorityFeature<TOutboxItem>();
o.SetStatusFeature<TOutboxItem>();
o.WorkerType ??= EfCoreWorkerTypes.Competing;
configure?.Invoke(o);
Expand All @@ -55,9 +57,13 @@ public static OutboxItemHandlerBuilder<TOutboxItem> AddOutboxItem<TDbContext, TO
options.WorkerType is not (EfCoreWorkerTypes.Competing or EfCoreWorkerTypes.BatchCompeting),
failureMessage:
$"You should implement {typeof(IHasStatus)} in your {typeof(TOutboxItem)} when worker type in competing mode"
).ValidateOnStart();

services.TryAddScoped<IOutboxMaintenanceRepository<TOutboxItem>, OutboxMaintenanceItemRepository<TDbContext, TOutboxItem>>();
)
.Validate(validation:
options => (options.PartitionSettings.Enabled && typeof(TOutboxItem).IsAssignableTo(typeof(IHasStatus))) || !options.PartitionSettings.Enabled,
failureMessage:
$"You should implement {typeof(IHasStatus)} in your {typeof(TOutboxItem)} when partitions enabled"
)
.ValidateOnStart();

services.TryAddKeyedTransient<IOutboxRepository<TOutboxItem>, StrictOrderingOutboxRepository<TDbContext, TOutboxItem>>(EfCoreWorkerTypes.StrictOrdering);
services.TryAddKeyedTransient<IOutboxWorker, StrictOrderingOutboxWorker>(EfCoreWorkerTypes.StrictOrdering);
Expand All @@ -74,6 +80,11 @@ options.WorkerType is not (EfCoreWorkerTypes.Competing or EfCoreWorkerTypes.Batc
services.TryAddSingleton(TimeProvider.System);
services.AddHostedService<OutboxBackgroundService<TOutboxItem>>();

services.AddHostedService<OutboxPartitionerBackgroundService<TOutboxItem>>();
services.TryAddScoped<IPartitioner<TOutboxItem>, Partitioner<TDbContext, TOutboxItem>>();

services.TryAddScoped<IOutboxMaintenanceRepository<TOutboxItem>, OutboxMaintenanceItemRepository<TDbContext, TOutboxItem>>();

return new OutboxItemHandlerBuilder<TOutboxItem>(services, outboxName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,22 @@ public async Task<IEnumerable<TOutboxItem>> LockAndReturnItemsBatchAsync(Cancell

_transaction = await _dbContext.Database.BeginTransactionAsync(ct);

string sql;
if (_options.GetPriorityFeature().Enabled)
{
sql = $"""
SELECT * FROM {m.QualifiedTableName}
ORDER BY {m.Priority} DESC, {m.Id} ASC
LIMIT {_options.OutboxItemsLimit}
FOR UPDATE NOWAIT;
""";
}
else
{
sql = $"""
var filter = _options.PartitionSettings.Enabled
? $"WHERE {m.Status} <> {(int)OutboxItemStatus.Completed}"
: string.Empty;

var order = _options.GetPriorityFeature()
.Enabled
? $"ORDER BY {m.Priority} DESC, {m.Id} ASC"
: $"ORDER BY {m.Id} ASC";

var sql = $"""
SELECT * FROM {m.QualifiedTableName}
ORDER BY {m.Id} ASC
{filter}
{order}
LIMIT {_options.OutboxItemsLimit}
FOR UPDATE NOWAIT;
""";
}

try
{
Expand Down Expand Up @@ -110,7 +107,11 @@ CancellationToken ct
throw new NotSupportedException("Retry not supported for strict ordering worker");
}

_dbContext.Set<TOutboxItem>().RemoveRange(completed);
if (!_options.PartitionSettings.Enabled)
{
_dbContext.Set<TOutboxItem>().RemoveRange(completed);
}

await _dbContext.SaveChangesAsync(ct);
await _transaction.CommitAsync(ct);
}
Expand Down
Loading

0 comments on commit 72c8a09

Please sign in to comment.