Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SNS batch publishing #1335

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<Copyright>Copyright (c) Just Eat 2015-$([System.DateTime]::Now.ToString(yyyy))</Copyright>
<Deterministic>true</Deterministic>
<Description>A light-weight message bus on top of AWS SNS and SQS</Description>
<MinVerMinimumMajorMinor>7.1</MinVerMinimumMajorMinor>
<MinVerMinimumMajorMinor>7.2</MinVerMinimumMajorMinor>
<MinVerTagPrefix>v</MinVerTagPrefix>
<MinVerSkip Condition=" '$(Configuration)' == 'Debug' ">true</MinVerSkip>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<ItemGroup Label="Libraries">
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.300" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.0" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.0" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.300" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.300" />
Copy link
Member Author

@martincostello martincostello Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the minimum versions that support .NET 8 and AoT, so I think they're worth raising the baseline to as part of getting the batch publishing support.

<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" Condition=" '$(TargetFramework)' == 'net461' " />
Expand Down
17 changes: 16 additions & 1 deletion JustSaying.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.8.34330.188
VisualStudioVersion = 17.8.34525.116
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{4B4A4A0C-31C2-482B-A7D8-094C60C4D0B5}"
ProjectSection(SolutionItems) = preProject
Expand Down Expand Up @@ -35,8 +35,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Tools", "src\Jus
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{F0BCBE5F-2132-422D-B17B-23B7FCC4A8A8}"
ProjectSection(SolutionItems) = preProject
.github\actionlint-matcher.json = .github\actionlint-matcher.json
.github\CODEOWNERS = .github\CODEOWNERS
.github\CONTRIBUTING.md = .github\CONTRIBUTING.md
.github\dependabot.yml = .github\dependabot.yml
.github\ISSUE_TEMPLATE.md = .github\ISSUE_TEMPLATE.md
.github\PULL_REQUEST_TEMPLATE.md = .github\PULL_REQUEST_TEMPLATE.md
.github\stale.yml = .github\stale.yml
Expand Down Expand Up @@ -81,6 +83,18 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Extensions.Aws",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Extensions.Aws.Tests", "tests\JustSaying.Extensions.Aws.Tests\JustSaying.Extensions.Aws.Tests.csproj", "{1B99B357-5D76-4540-B28E-B6CD3F6F1963}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{C91A9AE0-10A6-41FE-89CB-058E24CF02D3}"
ProjectSection(SolutionItems) = preProject
.github\workflows\approve-and-merge.yml = .github\workflows\approve-and-merge.yml
.github\workflows\build.yml = .github\workflows\build.yml
.github\workflows\code-ql.yml = .github\workflows\code-ql.yml
.github\workflows\dependabot-approve.yml = .github\workflows\dependabot-approve.yml
.github\workflows\dependency-review.yml = .github\workflows\dependency-review.yml
.github\workflows\lint-actions.yml = .github\workflows\lint-actions.yml
.github\workflows\scorecard.yml = .github\workflows\scorecard.yml
.github\workflows\update-dotnet-sdk.yml = .github\workflows\update-dotnet-sdk.yml
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -176,6 +190,7 @@ Global
{38DAC394-0A6E-4BB6-BCFC-8C21D2C64B3A} = {77C93C37-DE5B-448F-9A23-6C9D0C8465CA}
{4EFC48D7-4B45-4EBC-9237-4B84FE8239E0} = {A94633F2-29F2-48C6-840A-C5370B300AE2}
{1B99B357-5D76-4540-B28E-B6CD3F6F1963} = {E22A50F2-9952-4483-8AD1-09BE354FB3E4}
{C91A9AE0-10A6-41FE-89CB-058E24CF02D3} = {F0BCBE5F-2132-422D-B17B-23B7FCC4A8A8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {18FBDF85-C124-4444-9F03-D0D4F2B3A612}
Expand Down
30 changes: 28 additions & 2 deletions samples/src/JustSaying.Sample.Restaurant.OrderingApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@
builder.Services.AddHostedService<BusService>();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen(c =>
builder.Services.AddSwaggerGen(options =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Restaurant Ordering API", Version = "v1" });
options.SwaggerDoc("v1", new OpenApiInfo
{
Title = "Restaurant Ordering API",
Version = "v1"
});
});

var app = builder.Build();
Expand Down Expand Up @@ -108,6 +112,28 @@
app.Logger.LogInformation("Order {orderId} placed", orderId);
});

app.MapPost("api/multi-orders",
async (IReadOnlyCollection<CustomerOrderModel> orders, IMessageBatchPublisher publisher) =>
{
app.Logger.LogInformation("Orders received: {@Orders}", orders);
Fixed Show fixed Hide fixed

// Save order to database generating OrderId
var message = orders.Select(order =>
{
var orderId = Random.Shared.Next(1, 100);
return new OrderPlacedEvent
{
OrderId = orderId,
Description = order.Description
};
})
.ToList();

await publisher.PublishAsync(message);

app.Logger.LogInformation("Order {@OrderIds} placed", message.Select(x => x.OrderId));
});

await app.RunAsync();
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JustSaying.AwsTools;
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
Expand Down Expand Up @@ -125,7 +126,10 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,

services.TryAddSingleton<IAwsClientFactory, DefaultAwsClientFactory>();
services.TryAddSingleton<IAwsClientFactoryProxy>((p) => new AwsClientFactoryProxy(p.GetRequiredService<IAwsClientFactory>));
services.TryAddSingleton<IMessagingConfig, MessagingConfig>();
services.TryAddSingleton<MessagingConfig>();
services.TryAddSingleton<IMessagingConfig>((p) => p.GetRequiredService<MessagingConfig>());
services.TryAddSingleton<IPublishConfiguration>((p) => p.GetRequiredService<MessagingConfig>());
services.TryAddSingleton<IPublishBatchConfiguration>((p) => p.GetRequiredService<MessagingConfig>());
services.TryAddSingleton<IMessageMonitor, NullOpMessageMonitor>();

services.TryAddTransient<LoggingMiddleware>();
Expand Down Expand Up @@ -173,6 +177,20 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
return builder.BuildPublisher();
});

services.TryAddSingleton(
(serviceProvider) =>
{
var publisher = serviceProvider.GetRequiredService<IMessagePublisher>();

if (publisher is IMessageBatchPublisher batchPublisher)
{
return batchPublisher;
}

var builder = serviceProvider.GetRequiredService<MessagingBusBuilder>();
return builder.BuildBatchPublisher();
});

services.TryAddSingleton(
(serviceProvider) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

if (string.IsNullOrWhiteSpace(region))
{
throw new ArgumentException("region must not be null or empty" ,nameof(region));
throw new ArgumentException("region must not be null or empty", nameof(region));

Check warning on line 48 in src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs#L48

Added line #L48 was not covered by tests
}

registry.AddJustSaying(
Expand Down Expand Up @@ -131,6 +131,23 @@
return builder.BuildPublisher();
});

registry
.For<IMessageBatchPublisher>()
.Singleton()
.Use(
nameof(IMessageBatchPublisher),
context =>
{
var publisher = context.GetInstance<IMessagePublisher>();
if (publisher is IMessageBatchPublisher batchPublisher)
{
return batchPublisher;
}

var builder = context.GetInstance<MessagingBusBuilder>();
return builder.BuildBatchPublisher();

Check warning on line 148 in src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs#L147-L148

Added lines #L147 - L148 were not covered by tests
});

registry
.For<IMessagingBus>()
.Singleton()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public JustSayingRegistry()

For<IAwsClientFactory>().Use<DefaultAwsClientFactory>().Singleton();
For<IAwsClientFactoryProxy>().Use((p) => new AwsClientFactoryProxy(p.GetInstance<IAwsClientFactory>)).Singleton();
For<IMessagingConfig>().Use<MessagingConfig>().Singleton();
For<MessagingConfig>().Use<MessagingConfig>().Singleton();
For<IMessagingConfig>().Use(context => context.GetInstance<MessagingConfig>()).Singleton();
For<IPublishBatchConfiguration>().Use<MessagingConfig>(context => context.GetInstance<MessagingConfig>()).Singleton();
For<IMessageMonitor>().Use<NullOpMessageMonitor>().Singleton();
For<IMessageSerializationFactory>().Use<NewtonsoftSerializationFactory>().Singleton();
For<IMessageSubjectProvider>().Use<GenericMessageSubjectProvider>().Singleton();
Expand Down
8 changes: 8 additions & 0 deletions src/JustSaying/AwsTools/JustSayingConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,12 @@ public static class JustSayingConstants
/// Default length of time for which Amazon SQS can reuse a data key to encrypt/decrypt messages before calling AWS KMS again.
/// </summary>
public static TimeSpan DefaultAttributeEncryptionKeyReusePeriod => TimeSpan.FromMinutes(5);

/// <summary>
/// The maximum SNS batch size.
/// </summary>
/// <remarks>
/// The default vaule is 10. See https://docs.aws.amazon.com/sns/latest/dg/sns-batch-api-actions.html.
hwoodiwiss marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
public static int MaximumSnsBatchSize => 10;
}
30 changes: 30 additions & 0 deletions src/JustSaying/AwsTools/MessageHandling/MessageBatchResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Net;
using Amazon.Runtime;

namespace JustSaying.AwsTools.MessageHandling;

/// <summary>
/// A class representing the response from publishing a batch of messages.
/// </summary>
public class MessageBatchResponse
{
/// <summary>
/// Gets or sets the Ids of the messages that were successfully published.
/// </summary>
public IReadOnlyCollection<string> SuccessfulMessageIds { get; set; }

/// <summary>
/// Gets or sets the Ids of the messages that failed to publish.
/// </summary>
public IReadOnlyCollection<string> FailedMessageIds { get; set; }

/// <summary>
/// Gets or sets the response metadata.
/// </summary>
public ResponseMetadata ResponseMetadata { get; set; }

/// <summary>
/// Gets or sets the HTTP status code returned from the publish attempt, if any.
/// </summary>
public HttpStatusCode? HttpStatusCode { set; get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#if NETFRAMEWORK
using System.Runtime.Serialization;
#endif

namespace JustSaying.AwsTools.MessageHandling;

/// <summary>
/// Represents errors that occur publishing a batch of messages.
/// </summary>
#if NETFRAMEWORK
[Serializable]
#endif
public class PublishBatchException : PublishException
{
/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
public PublishBatchException()
: base("Failed to publish batch of messages")

Check warning on line 19 in src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs#L19

Added line #L19 was not covered by tests
{
}

Check warning on line 21 in src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs#L21

Added line #L21 was not covered by tests

/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public PublishBatchException(string message)
: base(message)

Check warning on line 28 in src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs#L28

Added line #L28 was not covered by tests
{
}

Check warning on line 30 in src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs#L30

Added line #L30 was not covered by tests

/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
/// <param name="message">The message that describes the error.</param>
/// <param name="inner">The exception that is the cause of the current exception, if any.</param>
public PublishBatchException(string message, Exception inner)
: base(message, inner)
{
}

#if NETFRAMEWORK
/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
/// <param name="info">
/// The <see cref="SerializationInfo"/> that holds the serialized object data
/// about the exception being thrown.
/// </param>
/// <param name="context">
/// The <see cref="StreamingContext"/> that contains contextual information about the source or destination.
/// </param>
protected PublishBatchException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
#endif
}
Loading