diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props
index 7739deaf8a604..ede7e438e539a 100644
--- a/eng/Packages.Data.props
+++ b/eng/Packages.Data.props
@@ -140,6 +140,8 @@
+
+
@@ -147,7 +149,8 @@
-
+
+
diff --git a/sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs b/sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs
index 6d578f12e9394..d23ae798f9a2e 100644
--- a/sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs
+++ b/sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs
@@ -94,7 +94,7 @@ public DiagnosticScope CreateScope(
/// The trace parent of the message.
/// The trace state of the message.
/// true if the message properties contained the diagnostic id; otherwise, false.
- public static bool TryExtractTraceContext(IReadOnlyDictionary properties, out string? traceparent, out string? tracestate)
+ public static bool TryExtractTraceContext(IReadOnlyDictionary properties, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;
@@ -102,7 +102,7 @@ public static bool TryExtractTraceContext(IReadOnlyDictionary pr
if (ActivityExtensions.SupportsActivitySource && properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
{
traceparent = traceParentString;
- if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
+ if (properties.TryGetValue(TraceState, out object? state) && state is string stateString)
{
tracestate = stateString;
}
@@ -126,7 +126,7 @@ public static bool TryExtractTraceContext(IReadOnlyDictionary pr
/// The trace parent of the message.
/// The trace state of the message.
/// true if the message properties contained the diagnostic id; otherwise, false.
- public static bool TryExtractTraceContext(IDictionary properties, out string? traceparent, out string? tracestate)
+ public static bool TryExtractTraceContext(IDictionary properties, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;
@@ -134,7 +134,7 @@ public static bool TryExtractTraceContext(IDictionary properties
if (ActivityExtensions.SupportsActivitySource && properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
{
traceparent = traceParentString;
- if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
+ if (properties.TryGetValue(TraceState, out object? state) && state is string stateString)
{
tracestate = stateString;
}
@@ -158,7 +158,7 @@ public static bool TryExtractTraceContext(IDictionary properties
/// The activity name to use for the diagnostic scope.
/// The traceparent that was either added, or that already existed in the message properties.
/// The tracestate that was either added, or that already existed in the message properties.
- public void InstrumentMessage(IDictionary properties, string activityName, out string? traceparent, out string? tracestate)
+ public void InstrumentMessage(IDictionary properties, string activityName, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;
diff --git a/sdk/core/Azure.Core/src/Shared/MessagingDiagnosticOperation.cs b/sdk/core/Azure.Core/src/Shared/MessagingDiagnosticOperation.cs
index 69b5c9cb2bfca..fb4236581bda7 100644
--- a/sdk/core/Azure.Core/src/Shared/MessagingDiagnosticOperation.cs
+++ b/sdk/core/Azure.Core/src/Shared/MessagingDiagnosticOperation.cs
@@ -36,7 +36,7 @@ public bool Equals(MessagingDiagnosticOperation other)
return _operation == other._operation;
}
- public override bool Equals(object obj)
+ public override bool Equals(object? obj)
{
return obj is MessagingDiagnosticOperation other && Equals(other);
}
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs
index 39e4ce40db9a2..a36eb6143ce3e 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs
@@ -5,6 +5,10 @@
using System.Linq;
using System.Net;
using Microsoft.Azure.WebJobs;
+#if NET6_0_OR_GREATER
+using Microsoft.Azure.WebJobs.Extensions.Rpc;
+using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc;
+#endif
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config;
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -27,7 +31,6 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder)
}
builder.AddServiceBus(p => { });
-
return builder;
}
@@ -99,11 +102,18 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action
}
configure(options);
- });
+ })
+#if NET6_0_OR_GREATER
+ .MapWorkerGrpcService()
+#endif
+ ;
builder.Services.AddAzureClientsCore();
builder.Services.TryAddSingleton();
builder.Services.AddSingleton();
+ #if NET6_0_OR_GREATER
+ builder.Services.AddSingleton();
+ #endif
return builder;
}
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/SettlementExtensions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/SettlementExtensions.cs
new file mode 100644
index 0000000000000..b3118786d7739
--- /dev/null
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/SettlementExtensions.cs
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+#if NET6_0_OR_GREATER
+using Microsoft.Azure.ServiceBus.Grpc;
+
+namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
+{
+ internal static class SettlementExtensions
+ {
+ internal static object GetPropertyValue(this SettlementProperties properties)
+ {
+ return properties.ValuesCase switch
+ {
+ SettlementProperties.ValuesOneofCase.LongValue => properties.LongValue,
+ SettlementProperties.ValuesOneofCase.UlongValue => properties.UlongValue,
+ SettlementProperties.ValuesOneofCase.DoubleValue => properties.DoubleValue,
+ SettlementProperties.ValuesOneofCase.FloatValue => properties.FloatValue,
+ SettlementProperties.ValuesOneofCase.IntValue => properties.IntValue,
+ SettlementProperties.ValuesOneofCase.UintValue => properties.UintValue,
+ SettlementProperties.ValuesOneofCase.BoolValue => properties.BoolValue,
+ SettlementProperties.ValuesOneofCase.StringValue => properties.StringValue,
+ _ => null
+ };
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto
new file mode 100644
index 0000000000000..9c6c45d0be96d
--- /dev/null
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/Proto/settlement.proto
@@ -0,0 +1,66 @@
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/empty.proto";
+
+// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
+option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
+
+// The settlement service definition.
+service Settlement {
+ // Completes a message
+ rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {}
+
+ // Abandons a message
+ rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {}
+
+ // Deadletters a message
+ rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {}
+
+ // Defers a message
+ rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}
+}
+
+// The complete message request containing the locktoken.
+message CompleteRequest {
+ string locktoken = 1;
+}
+
+// The abandon message request containing the locktoken and properties to modify.
+message AbandonRequest {
+ string locktoken = 1;
+ map propertiesToModify = 2;
+}
+
+// The deadletter message request containing the locktoken and properties to modify along with the reason/description.
+message DeadletterRequest {
+ string locktoken = 1;
+ map propertiesToModify = 2;
+ string deadletterReason = 3;
+ string deadletterErrorDescription = 4;
+}
+
+// The defer message request containing the locktoken and properties to modify.
+message DeferRequest {
+ string locktoken = 1;
+ map propertiesToModify = 2;
+}
+
+// The settlement property can be of any type listed below, which
+// corresponds to the types specified in
+// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
+// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
+// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
+message SettlementProperties {
+ oneof values {
+ string stringValue = 1;
+ int32 intValue = 2;
+ uint32 uintValue = 3;
+ int64 longValue = 4;
+ uint64 ulongValue = 5;
+ bool boolValue = 6;
+ float floatValue = 7;
+ double doubleValue = 8;
+ google.protobuf.Timestamp timestampValue = 9;
+ }
+}
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs
new file mode 100644
index 0000000000000..832272828267b
--- /dev/null
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Grpc/SettlementService.cs
@@ -0,0 +1,89 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+#if NET6_0_OR_GREATER
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Microsoft.Azure.ServiceBus.Grpc;
+using Microsoft.Azure.WebJobs.ServiceBus;
+
+namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
+{
+ internal class SettlementService : Settlement.SettlementBase
+ {
+ private readonly MessagingProvider _provider;
+
+ public SettlementService(MessagingProvider provider)
+ {
+ _provider = provider;
+ }
+
+ public SettlementService()
+ {
+ _provider = null;
+ }
+
+ public override async Task Complete(CompleteRequest request, ServerCallContext context)
+ {
+ if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
+ {
+ await tuple.Actions.CompleteMessageAsync(
+ tuple.Message,
+ context.CancellationToken).ConfigureAwait(false);
+ return new Empty();
+ }
+ throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
+ }
+
+ public override async Task Abandon(AbandonRequest request, ServerCallContext context)
+ {
+ if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
+ {
+ await tuple.Actions.AbandonMessageAsync(
+ tuple.Message,
+ request.PropertiesToModify.ToDictionary(
+ pair => pair.Key,
+ pair => pair.Value.GetPropertyValue()),
+ context.CancellationToken).ConfigureAwait(false);
+ return new Empty();
+ }
+ throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
+ }
+
+ public override async Task Defer(DeferRequest request, ServerCallContext context)
+ {
+ if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
+ {
+ await tuple.Actions.DeferMessageAsync(
+ tuple.Message,
+ request.PropertiesToModify.ToDictionary(
+ pair => pair.Key,
+ pair => pair.Value.GetPropertyValue()),
+ context.CancellationToken).ConfigureAwait(false);
+ return new Empty();
+ }
+ throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
+ }
+
+ public override async Task Deadletter(DeadletterRequest request, ServerCallContext context)
+ {
+ if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
+ {
+ await tuple.Actions.DeadLetterMessageAsync(
+ tuple.Message,
+ request.PropertiesToModify.ToDictionary(
+ pair => pair.Key,
+ pair => pair.Value.GetPropertyValue()),
+ request.DeadletterReason,
+ request.DeadletterErrorDescription,
+ context.CancellationToken).ConfigureAwait(false);
+ return new Empty();
+ }
+ throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs
index 5530ba2985cfb..51515f33a3c87 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs
@@ -63,11 +63,11 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider, ITa
// Serialize execution of StopAsync to avoid calling Unregister* concurrently
private readonly SemaphoreSlim _stopAsyncSemaphore = new SemaphoreSlim(1, 1);
private readonly string _functionId;
- private CancellationTokenRegistration _batchReceiveRegistration;
private Task _batchLoop;
private Lazy _details;
private Lazy _clientDiagnostics;
private readonly IDrainModeManager _drainModeManager;
+ private readonly MessagingProvider _messagingProvider;
public ServiceBusListener(
string functionId,
@@ -94,6 +94,7 @@ public ServiceBusListener(
_logger = loggerFactory.CreateLogger();
_functionId = functionId;
_drainModeManager = drainModeManager;
+ _messagingProvider = messagingProvider;
_client = new Lazy(
() => clientFactory.CreateClientFromSetting(connection));
@@ -334,7 +335,6 @@ public void Dispose()
_stopAsyncSemaphore.Dispose();
_stoppingCancellationTokenSource.Dispose();
- _batchReceiveRegistration.Dispose();
_concurrencyUpdateManager?.Dispose();
// No need to dispose the _functionExecutionCancellationTokenSource since we don't create it as a linked token and
@@ -355,6 +355,8 @@ internal async Task ProcessMessageAsync(ProcessMessageEventArgs args)
using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _stoppingCancellationTokenSource.Token))
{
var actions = new ServiceBusMessageActions(args);
+ _messagingProvider.ActionsCache.TryAdd(args.Message.LockToken, (args.Message, actions));
+
if (!await _messageProcessor.Value.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token).ConfigureAwait(false))
{
return;
@@ -374,6 +376,7 @@ await _messageProcessor.Value.CompleteProcessingMessageAsync(actions, args.Messa
finally
{
receiveActions.EndExecutionScope();
+ _messagingProvider.ActionsCache.TryRemove(args.Message.LockToken, out _);
}
}
}
@@ -388,6 +391,8 @@ internal async Task ProcessSessionMessageAsync(ProcessSessionMessageEventArgs ar
CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _stoppingCancellationTokenSource.Token))
{
var actions = new ServiceBusSessionMessageActions(args);
+ _messagingProvider.ActionsCache.TryAdd(args.Message.LockToken, (args.Message, actions));
+
if (!await _sessionMessageProcessor.Value.BeginProcessingMessageAsync(actions, args.Message, linkedCts.Token)
.ConfigureAwait(false))
{
@@ -413,6 +418,7 @@ await _sessionMessageProcessor.Value.CompleteProcessingMessageAsync(actions, arg
finally
{
receiveActions.EndExecutionScope();
+ _messagingProvider.ActionsCache.TryRemove(args.Message.LockToken, out _);
}
}
}
@@ -494,6 +500,12 @@ private async Task RunBatchReceiveLoopAsync(CancellationTokenSource cancellation
var messageActions = _isSessionsEnabled
? new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver)
: new ServiceBusMessageActions(receiver);
+
+ foreach (var message in messages)
+ {
+ _messagingProvider.ActionsCache.TryAdd(message.LockToken, (message, messageActions));
+ }
+
var receiveActions = new ServiceBusReceiveActions(receiver);
ServiceBusReceivedMessage[] messagesArray = _supportMinBatchSize ? Array.Empty() : messages.ToArray();
@@ -656,55 +668,67 @@ private async Task TriggerAndCompleteMessagesInternal(ServiceBusReceivedMessage[
ActivityKind.Consumer,
MessagingDiagnosticOperation.Process);
- scope.SetMessageData(messagesArray);
-
- scope.Start();
- FunctionResult result = await _triggerExecutor.TryExecuteAsync(input.GetTriggerFunctionData(), _functionExecutionCancellationTokenSource.Token).ConfigureAwait(false);
- if (result.Exception != null)
+ try
{
- scope.Failed(result.Exception);
- }
- receiveActions.EndExecutionScope();
+ scope.SetMessageData(messagesArray);
- var processedMessages = messagesArray.Concat(receiveActions.Messages.Keys);
- // Complete batch of messages only if the execution was successful
- if (_autoCompleteMessages && result.Succeeded)
- {
- List completeTasks = new List();
- foreach (ServiceBusReceivedMessage message in processedMessages)
+ scope.Start();
+ FunctionResult result = await _triggerExecutor
+ .TryExecuteAsync(input.GetTriggerFunctionData(), _functionExecutionCancellationTokenSource.Token).ConfigureAwait(false);
+ if (result.Exception != null)
{
- // Skip messages that were settled in the user's function
- if (input.MessageActions.SettledMessages.ContainsKey(message))
- {
- continue;
- }
-
- // Pass CancellationToken.None to allow autocompletion to finish even when shutting down
- completeTasks.Add(receiver.CompleteMessageAsync(message, CancellationToken.None));
+ scope.Failed(result.Exception);
}
- await Task.WhenAll(completeTasks).ConfigureAwait(false);
- }
- else if (!result.Succeeded)
- {
- // For failed executions, we abandon the messages regardless of the autoCompleteMessages configuration.
- // This matches the behavior that happens for single dispatch functions as the processor does the same thing
- // in the Service Bus SDK.
+ receiveActions.EndExecutionScope();
- List abandonTasks = new();
- foreach (ServiceBusReceivedMessage message in processedMessages)
+ var processedMessages = messagesArray.Concat(receiveActions.Messages.Keys);
+ // Complete batch of messages only if the execution was successful
+ if (_autoCompleteMessages && result.Succeeded)
{
- // skip messages that were settled in the user's function
- if (input.MessageActions.SettledMessages.ContainsKey(message))
+ List completeTasks = new List(messagesArray.Length + receiveActions.Messages.Keys.Count);
+ foreach (ServiceBusReceivedMessage message in processedMessages)
{
- continue;
+ // Skip messages that were settled in the user's function
+ if (input.MessageActions.SettledMessages.ContainsKey(message))
+ {
+ continue;
+ }
+
+ // Pass CancellationToken.None to allow autocompletion to finish even when shutting down
+ completeTasks.Add(receiver.CompleteMessageAsync(message, CancellationToken.None));
}
- // Pass CancellationToken.None to allow abandon to finish even when shutting down
- abandonTasks.Add(receiver.AbandonMessageAsync(message, cancellationToken: CancellationToken.None));
+ await Task.WhenAll(completeTasks).ConfigureAwait(false);
}
+ else if (!result.Succeeded)
+ {
+ // For failed executions, we abandon the messages regardless of the autoCompleteMessages configuration.
+ // This matches the behavior that happens for single dispatch functions as the processor does the same thing
+ // in the Service Bus SDK.
+
+ List abandonTasks = new();
+ foreach (ServiceBusReceivedMessage message in processedMessages)
+ {
+ // skip messages that were settled in the user's function
+ if (input.MessageActions.SettledMessages.ContainsKey(message))
+ {
+ continue;
+ }
- await Task.WhenAll(abandonTasks).ConfigureAwait(false);
+ // Pass CancellationToken.None to allow abandon to finish even when shutting down
+ abandonTasks.Add(receiver.AbandonMessageAsync(message, cancellationToken: CancellationToken.None));
+ }
+
+ await Task.WhenAll(abandonTasks).ConfigureAwait(false);
+ }
+ }
+ finally
+ {
+ foreach (var message in input.Messages)
+ {
+ _messagingProvider.ActionsCache.TryRemove(message.LockToken, out _);
+ }
}
}
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj
index b64f6c379bef1..9d207b563cbfc 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj
@@ -1,24 +1,35 @@
- netstandard2.0
+ netstandard2.0;net6.0
Microsoft Azure WebJobs SDK ServiceBus Extension
5.13.0-beta.1
- 5.12.0
- $(NoWarn);AZC0001;CS1591;SA1636
+
+ 5.12.0
+ $(NoWarn);AZC0001;CS1591;SA1636;AZC0007;AZC0015
true
true
internal
-
+
+
+
+
+
+
+
+
+
+
+
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs
index 267a40dd3af8c..413be97668859 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs
@@ -22,6 +22,7 @@ public class MessagingProvider
private readonly ConcurrentDictionary _messageSenderCache = new();
private readonly ConcurrentDictionary _messageReceiverCache = new();
private readonly ConcurrentDictionary _clientCache = new();
+ internal ConcurrentDictionary ActionsCache { get; } = new();
///
/// Initializes a new instance of .
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs
new file mode 100644
index 0000000000000..725e8948ebc44
--- /dev/null
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcEndToEndTests.cs
@@ -0,0 +1,399 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+#if NET6_0_OR_GREATER
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus;
+using Azure.Messaging.ServiceBus.Tests;
+using Grpc.Core;
+using Microsoft.Azure.ServiceBus.Grpc;
+using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc;
+using Microsoft.Azure.WebJobs.ServiceBus;
+using Microsoft.Extensions.DependencyInjection;
+using NUnit.Framework;
+
+namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
+{
+ public class ServiceBusGrpcEndToEndTests : WebJobsServiceBusTestBase
+ {
+ public ServiceBusGrpcEndToEndTests() : base(isSession: false)
+ {
+ }
+
+ [Test]
+ public async Task BindToMessageAndComplete()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToMessageAndComplete.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToBatchAndComplete()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToBatchAndComplete.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToMessageAndDeadletter()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToMessageAndDeadletter.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToBatchAndDeadletter()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToBatchAndDeadletter.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToMessageAndDefer()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToMessageAndDefer.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToBatchAndDefer()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToBatchAndDefer.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ await host.StopAsync();
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToMessageAndAbandon()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToMessageAndAbandon.SettlementService = settlementImpl;
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+
+ var abandonedMessage = (await client.CreateReceiver(FirstQueueScope.QueueName).ReceiveMessagesAsync(1)).Single();
+ Assert.AreEqual("foobar", abandonedMessage.Body.ToString());
+ Assert.AreEqual("value", abandonedMessage.ApplicationProperties["key"]);
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToBatchAndAbandon()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToBatchAndAbandon.SettlementService = settlementImpl;
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar");
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+
+ var abandonedMessage = (await client.CreateReceiver(FirstQueueScope.QueueName).ReceiveMessagesAsync(1)).Single();
+ Assert.AreEqual("foobar", abandonedMessage.Body.ToString());
+ Assert.AreEqual("value", abandonedMessage.ApplicationProperties["key"]);
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ public class ServiceBusBindToMessageAndComplete
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Complete(new CompleteRequest() { Locktoken = message.LockToken }, new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToBatchAndComplete
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage[] messages)
+ {
+ var message = messages.Single();
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Complete(new CompleteRequest() { Locktoken = message.LockToken }, new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToMessageAndDeadletter
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusClient client)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Deadletter(
+ new DeadletterRequest()
+ {
+ Locktoken = message.LockToken,
+ DeadletterErrorDescription = "description",
+ DeadletterReason = "reason",
+ PropertiesToModify = {{ "key", new SettlementProperties { IntValue = 42} }}
+ },
+ new MockServerCallContext());
+
+ var receiver = client.CreateReceiver(FirstQueueScope.QueueName, new ServiceBusReceiverOptions {SubQueue = SubQueue.DeadLetter});
+ var deadletterMessage = await receiver.ReceiveMessageAsync();
+ Assert.AreEqual("foobar", deadletterMessage.Body.ToString());
+ Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
+ Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
+ Assert.AreEqual(42, deadletterMessage.ApplicationProperties["key"]);
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToBatchAndDeadletter
+ {
+ internal static SettlementService SettlementService { get; set; }
+
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage[] messages, ServiceBusClient client)
+ {
+ var message = messages.Single();
+
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Deadletter(
+ new DeadletterRequest()
+ {
+ Locktoken = message.LockToken,
+ DeadletterErrorDescription = "description",
+ DeadletterReason = "reason",
+ PropertiesToModify = { { "key", new SettlementProperties { IntValue = 42 } } }
+ },
+ new MockServerCallContext());
+
+ var receiver = client.CreateReceiver(FirstQueueScope.QueueName,
+ new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
+ var deadletterMessage = await receiver.ReceiveMessageAsync();
+ Assert.AreEqual("foobar", deadletterMessage.Body.ToString());
+ Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
+ Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
+ Assert.AreEqual(42, deadletterMessage.ApplicationProperties["key"]);
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToMessageAndDefer
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Defer(
+ new DeferRequest
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = {{ "key", new SettlementProperties { BoolValue = true} }}
+ },
+ new MockServerCallContext());
+ var deferredMessage = (await receiveActions.ReceiveDeferredMessagesAsync(
+ new[] { message.SequenceNumber })).Single();
+ Assert.AreEqual("foobar", deferredMessage.Body.ToString());
+ Assert.IsTrue((bool)deferredMessage.ApplicationProperties["key"]);
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToBatchAndDefer
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage[] messages, ServiceBusReceiveActions receiveActions)
+ {
+ var message = messages.Single();
+
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Defer(
+ new DeferRequest
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = {{ "key", new SettlementProperties { BoolValue = true} }}
+ },
+ new MockServerCallContext());
+ var deferredMessage = (await receiveActions.ReceiveDeferredMessagesAsync(
+ new[] { message.SequenceNumber })).Single();
+ Assert.AreEqual("foobar", deferredMessage.Body.ToString());
+ Assert.IsTrue((bool)deferredMessage.ApplicationProperties["key"]);
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToMessageAndAbandon
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Abandon(
+ new AbandonRequest
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = {{ "key", new SettlementProperties { StringValue = "value"} }}
+ },
+ new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToBatchAndAbandon
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey)] ServiceBusReceivedMessage[] messages, ServiceBusReceiveActions receiveActions)
+ {
+ var message = messages.Single();
+
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Abandon(
+ new AbandonRequest
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = {{ "key", new SettlementProperties { StringValue = "value"} }}
+ },
+ new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ internal class MockServerCallContext : ServerCallContext
+ {
+ protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override string MethodCore { get; }
+ protected override string HostCore { get; }
+ protected override string PeerCore { get; }
+ protected override DateTime DeadlineCore { get; }
+ protected override Metadata RequestHeadersCore { get; }
+ protected override CancellationToken CancellationTokenCore { get; } = CancellationToken.None;
+ protected override Metadata ResponseTrailersCore { get; }
+ protected override Status StatusCore { get; set; }
+ protected override WriteOptions WriteOptionsCore { get; set; }
+ protected override AuthContext AuthContextCore { get; }
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs
new file mode 100644
index 0000000000000..7a39c9f1bb3c4
--- /dev/null
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/ServiceBusGrpcSessionEndToEndTests.cs
@@ -0,0 +1,252 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+#if NET6_0_OR_GREATER
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus;
+using Azure.Messaging.ServiceBus.Tests;
+using Grpc.Core;
+using Microsoft.Azure.ServiceBus.Grpc;
+using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc;
+using Microsoft.Azure.WebJobs.ServiceBus;
+using Microsoft.Extensions.DependencyInjection;
+using NUnit.Framework;
+
+namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
+{
+ public class ServiceBusGrpcSessionEndToEndTests : WebJobsServiceBusTestBase
+ {
+ public ServiceBusGrpcSessionEndToEndTests() : base(isSession: true)
+ {
+ }
+
+ [Test]
+ public async Task BindToSessionMessageAndComplete()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToSessionMessageAndComplete.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar") {SessionId = "sessionId"};
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToSessionBatchAndComplete()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToSessionBatchAndComplete.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar") {SessionId = "sessionId"};
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToSessionMessageAndDeadletter()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToSessionMessageAndDeadletter.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar") {SessionId = "sessionId"};
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToSessionMessageAndDefer()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToSessionMessageAndDefer.SettlementService = settlementImpl;
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar") {SessionId = "sessionId"};
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ [Test]
+ public async Task BindToSessionMessageAndAbandon()
+ {
+ var host = BuildHost();
+ var settlementImpl = host.Services.GetRequiredService();
+ var provider = host.Services.GetRequiredService();
+ ServiceBusBindToSessionMessageAndAbandon.SettlementService = settlementImpl;
+ await using ServiceBusClient client = new ServiceBusClient(ServiceBusTestEnvironment.Instance.ServiceBusConnectionString);
+
+ using (host)
+ {
+ var message = new ServiceBusMessage("foobar") {SessionId = "sessionId"};
+ var sender = client.CreateSender(FirstQueueScope.QueueName);
+ await sender.SendMessageAsync(message);
+
+ bool result = _waitHandle1.WaitOne(SBTimeoutMills);
+ Assert.True(result);
+ }
+
+ var receiver = await client.AcceptNextSessionAsync(FirstQueueScope.QueueName);
+ var abandonedMessage = (await receiver.ReceiveMessagesAsync(1)).Single();
+ Assert.AreEqual("foobar", abandonedMessage.Body.ToString());
+ Assert.AreEqual("value", abandonedMessage.ApplicationProperties["key"]);
+ Assert.IsEmpty(provider.ActionsCache);
+ }
+
+ public class ServiceBusBindToSessionMessageAndComplete
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Complete(new CompleteRequest() { Locktoken = message.LockToken }, new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToSessionBatchAndComplete
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage[] messages)
+ {
+ var message = messages.Single();
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Complete(new CompleteRequest() { Locktoken = message.LockToken }, new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToSessionMessageAndDeadletter
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusClient client)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Deadletter(
+ new DeadletterRequest()
+ {
+ Locktoken = message.LockToken,
+ DeadletterErrorDescription = "description",
+ DeadletterReason = "reason",
+ PropertiesToModify = {{ "key", new SettlementProperties { IntValue = 42} }}
+ },
+ new MockServerCallContext());
+
+ var receiver = client.CreateReceiver(FirstQueueScope.QueueName, new ServiceBusReceiverOptions {SubQueue = SubQueue.DeadLetter});
+ var deadletterMessage = await receiver.ReceiveMessageAsync();
+ Assert.AreEqual("foobar", deadletterMessage.Body.ToString());
+ Assert.AreEqual("description", deadletterMessage.DeadLetterErrorDescription);
+ Assert.AreEqual("reason", deadletterMessage.DeadLetterReason);
+ Assert.AreEqual(42, deadletterMessage.ApplicationProperties["key"]);
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToSessionMessageAndDefer
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Defer(
+ new DeferRequest
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = {{ "key", new SettlementProperties { BoolValue = true} }}
+ },
+ new MockServerCallContext());
+ var deferredMessage = (await receiveActions.ReceiveDeferredMessagesAsync(
+ new[] { message.SequenceNumber })).Single();
+ Assert.AreEqual("foobar", deferredMessage.Body.ToString());
+ Assert.IsTrue((bool)deferredMessage.ApplicationProperties["key"]);
+ _waitHandle1.Set();
+ }
+ }
+
+ public class ServiceBusBindToSessionMessageAndAbandon
+ {
+ internal static SettlementService SettlementService { get; set; }
+ public static async Task BindToMessage(
+ [ServiceBusTrigger(FirstQueueNameKey, IsSessionsEnabled = true)] ServiceBusReceivedMessage message, ServiceBusReceiveActions receiveActions)
+ {
+ Assert.AreEqual("foobar", message.Body.ToString());
+ await SettlementService.Abandon(
+ new AbandonRequest
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = {{ "key", new SettlementProperties { StringValue = "value"} }}
+ },
+ new MockServerCallContext());
+ _waitHandle1.Set();
+ }
+ }
+
+ internal class MockServerCallContext : ServerCallContext
+ {
+ protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
+ {
+ throw new NotImplementedException();
+ }
+
+ protected override string MethodCore { get; }
+ protected override string HostCore { get; }
+ protected override string PeerCore { get; }
+ protected override DateTime DeadlineCore { get; }
+ protected override Metadata RequestHeadersCore { get; }
+ protected override CancellationToken CancellationTokenCore { get; } = CancellationToken.None;
+ protected override Metadata ResponseTrailersCore { get; }
+ protected override Status StatusCore { get; set; }
+ protected override WriteOptions WriteOptionsCore { get; set; }
+ protected override AuthContext AuthContextCore { get; }
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/SettlementPropertiesTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/SettlementPropertiesTests.cs
new file mode 100644
index 0000000000000..57a3415569817
--- /dev/null
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Grpc/SettlementPropertiesTests.cs
@@ -0,0 +1,93 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+#if NET6_0_OR_GREATER
+using Microsoft.Azure.ServiceBus.Grpc;
+using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc;
+using NUnit.Framework;
+
+namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests.Grpc
+{
+ public class SettlementPropertiesTests
+ {
+ [Test]
+ public void CanGetStringValue()
+ {
+ var properties = new SettlementProperties
+ {
+ StringValue = "foo"
+ };
+ Assert.AreEqual("foo", properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetIntValue()
+ {
+ var properties = new SettlementProperties
+ {
+ IntValue = 42
+ };
+ Assert.AreEqual(42, properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetUintValue()
+ {
+ var properties = new SettlementProperties
+ {
+ UintValue = 42
+ };
+ Assert.AreEqual(42, properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetLongValue()
+ {
+ var properties = new SettlementProperties
+ {
+ LongValue = 42
+ };
+ Assert.AreEqual(42, properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetUlongValue()
+ {
+ var properties = new SettlementProperties
+ {
+ UlongValue = 42
+ };
+ Assert.AreEqual(42, properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetFloatValue()
+ {
+ var properties = new SettlementProperties
+ {
+ FloatValue = 42.0f
+ };
+ Assert.AreEqual(42.0, properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetDoubleValue()
+ {
+ var properties = new SettlementProperties
+ {
+ DoubleValue = 42.0
+ };
+ Assert.AreEqual(42.0, properties.GetPropertyValue());
+ }
+
+ [Test]
+ public void CanGetBoolValue()
+ {
+ var properties = new SettlementProperties
+ {
+ BoolValue = true
+ };
+ Assert.IsTrue((bool)properties.GetPropertyValue());
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs
index 1e2301fc8d46b..2049f7b2897ad 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs
@@ -974,9 +974,10 @@ private async Task ServiceBusEndToEndInternal(IHost host)
IEnumerable logMessages = host.GetTestLoggerProvider().GetAllLogMessages();
- // Filter out Azure SDK and custom processor logs for easier validation.
+ // Filter out Azure SDK, hosting lifetime, and custom processor logs for easier validation.
logMessages = logMessages.Where(
m => !m.Category.StartsWith("Azure.", StringComparison.InvariantCulture) &&
+ !m.Category.StartsWith("Microsoft.Hosting.Lifetime") &&
m.Category != CustomMessagingProvider.CustomMessagingCategory);
string[] consoleOutputLines = logMessages
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs
index 64e337eec0a72..3b04778b1965e 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs
@@ -334,7 +334,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
await Task.Delay(TimeSpan.FromSeconds(2));
QueueRuntimeProperties properties = await client.GetQueueRuntimePropertiesAsync(FirstQueueScope.QueueName, CancellationToken.None);
- Assert.AreEqual(ExpectedRemainingMessages, properties.TotalMessageCount);
+ Assert.AreEqual(ExpectedRemainingMessages, properties.ActiveMessageCount);
}
private static bool IsError(LogMessage logMessage)