Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlettering and exception propagation in Grpc service #39412

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
# Release History

## 5.14.0-beta.1 (Unreleased)

### Features Added

### Breaking Changes
## 5.13.3 (2023-10-20)

### Bugs Fixed

### Other Changes
- Fixed issue where deadlettering a message without specifying properties to modify could throw
an exception from out of proc extension.
- Include underlying exception details in RpcException when a failure occurs.

## 5.13.2 (2023-10-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

#if NET6_0_OR_GREATER
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core.Amqp.Shared;
Expand Down Expand Up @@ -31,54 +32,98 @@ public SettlementService()

public override async Task<Empty> Complete(CompleteRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
await tuple.Actions.CompleteMessageAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.CompleteMessageAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Abandon(AbandonRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
await tuple.Actions.AbandonMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.AbandonMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Defer(DeferRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
await tuple.Actions.DeferMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.DeferMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Deadletter(DeadletterRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
try
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
if (request.PropertiesToModify == null || request.PropertiesToModify == ByteString.Empty)
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
}
else
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
}

return new Empty();
}
}
catch (Exception ex)
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
DeserializeAmqpMap(request.PropertiesToModify),
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
throw new RpcException(new Status(StatusCode.Unknown, ex.ToString()));
}

throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<Description>Microsoft Azure WebJobs SDK ServiceBus Extension</Description>
<Version>5.14.0-beta.1</Version>
<Version>5.13.3</Version>
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
<!--Since we are adding a new target for net6.0, we need to temporarily condition on netstandard-->
<ApiCompatVersion>5.13.2</ApiCompatVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,32 @@ public async Task BindToMessageAndDeadletter()
}

[Test]
public async Task BindToBatchAndDeadletter()
public async Task BindToMessageAndDeadletterWithNoPropertiesToModify()
{
var host = BuildHost<ServiceBusBindToBatchAndDeadletter>();
var host = BuildHost<ServiceBusBindToMessageAndDeadletterWithNoPropertiesToModify>();
var settlementImpl = host.Services.GetRequiredService<SettlementService>();
var provider = host.Services.GetRequiredService<MessagingProvider>();
ServiceBusBindToMessageAndDeadletterWithNoPropertiesToModify.SettlementService = settlementImpl;

using (host)
{
var message = new ServiceBusMessage("foobar");
await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
var sender = client.CreateSender(FirstQueueScope.QueueName);
await sender.SendMessageAsync(message);

bool result = _waitHandle1.WaitOne(SBTimeoutMills);
Assert.True(result);
await host.StopAsync();
}
Assert.IsEmpty(provider.ActionsCache);
}

[Test]
public async Task BindToBatchAndDeadletterExceptionValidation()
{
// this test expects errors so set skipValidation=true
var host = BuildHost<ServiceBusBindToBatchAndDeadletter>(skipValidation: true);
var settlementImpl = host.Services.GetRequiredService<SettlementService>();
var provider = host.Services.GetRequiredService<MessagingProvider>();
ServiceBusBindToBatchAndDeadletter.SettlementService = settlementImpl;
Expand Down Expand Up @@ -261,6 +284,31 @@ await SettlementService.Deadletter(
}
}

public class ServiceBusBindToMessageAndDeadletterWithNoPropertiesToModify
{
internal static SettlementService SettlementService { get; set; }
public static async Task BindToMessage(
[ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusClient client)
{
Assert.AreEqual("foobar", message.Body.ToString());
await SettlementService.Deadletter(
new DeadletterRequest()
{
Locktoken = message.LockToken,
DeadletterErrorDescription = "description",
DeadletterReason = "reason"
},
new MockServerCallContext());

var receiver = client.CreateReceiver(FirstQueueScope.QueueName, new ServiceBusReceiverOptions {SubQueue = SubQueue.DeadLetter});
var deadletterMessage = await receiver.ReceiveMessageAsync();
Assert.AreEqual("foobar", deadletterMessage.Body.ToString());
Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
_waitHandle1.Set();
}
}

public class ServiceBusBindToBatchAndDeadletter
{
internal static SettlementService SettlementService { get; set; }
Expand Down Expand Up @@ -292,6 +340,37 @@ await SettlementService.Deadletter(
Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
Assert.AreEqual(42, deadletterMessage.ApplicationProperties["key"]);

var exception = Assert.ThrowsAsync<RpcException>(
async () =>
await SettlementService.Complete(
new CompleteRequest { Locktoken = message.LockToken },
new MockServerCallContext()));
StringAssert.Contains(
"Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid.",
exception.ToString());

exception = Assert.ThrowsAsync<RpcException>(
async () =>
await SettlementService.Defer(
new DeferRequest { Locktoken = message.LockToken },
new MockServerCallContext()));
StringAssert.Contains(
"Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid.",
exception.ToString());

exception = Assert.ThrowsAsync<RpcException>(
async () =>
await SettlementService.Deadletter(
new DeadletterRequest() { Locktoken = message.LockToken },
new MockServerCallContext()));
StringAssert.Contains(
"Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid.",
exception.ToString());

// The service doesn't throw when an already settled message gets abandoned over the mgmt link, so we won't
// test for that here.

_waitHandle1.Set();
}
}
Expand Down