Skip to content

Commit

Permalink
Setting AMQP session incoming window size to unlimited for receivers (A…
Browse files Browse the repository at this point in the history
…zure#16163)

* Setting incoming window size on amqp session of a receive link to unlimited. This is to allow
the case where clients only receive, but don't complete or abandon messages.

* Ading a test case.

* Fixing the newly added testcase
  • Loading branch information
yvgopal authored Oct 23, 2020
1 parent e4a85e5 commit ff9473d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.ServiceBus.Primitives;
using Newtonsoft.Json.Schema;

internal abstract class AmqpLinkCreator
{
Expand Down Expand Up @@ -59,6 +60,14 @@ public async Task<Tuple<AmqpObject, DateTime>> CreateAndOpenAmqpLinkAsync()
{
// Create Session
var amqpSessionSettings = new AmqpSessionSettings { Properties = new Fields() };
if (this.amqpLinkSettings.IsReceiver())
{
// This is the maximum number of unsettled transfers across all receive links on this session.
// This will allow the session to accept unlimited number of transfers, even if the recevier(s)
// are not settling any of the deliveries.
amqpSessionSettings.IncomingWindow = uint.MaxValue;
}

session = amqpConnection.CreateSession(amqpSessionSettings);
await session.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests

static class TestUtility
{
const int MaxSendBatchSize = 4500;
private static readonly Lazy<string> NamespaceConnectionStringInstance =
new Lazy<string>( () => new ServiceBusConnectionStringBuilder(ReadEnvironmentConnectionString()).ToString(), LazyThreadSafetyMode.PublicationOnly);

Expand Down Expand Up @@ -49,15 +50,19 @@ internal static async Task SendMessagesAsync(IMessageSender messageSender, int m
await Task.FromResult(false);
}

var messagesToSend = new List<Message>();
for (var i = 0; i < messageCount; i++)
for (var j = 0; j < messageCount; j += MaxSendBatchSize)
{
var message = new Message(Encoding.UTF8.GetBytes("test" + i));
message.Label = "test" + i;
messagesToSend.Add(message);
var messagesToSend = new List<Message>();
for (var i = j; i < Math.Min(messageCount, j + MaxSendBatchSize); i++)
{
var message = new Message(Encoding.UTF8.GetBytes("test" + i));
message.Label = "test" + i;
messagesToSend.Add(message);
}

await messageSender.SendAsync(messagesToSend);
}

await messageSender.SendAsync(messagesToSend);
Log($"Sent {messageCount} messages");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal async Task ReceiveDeleteTestCase(IMessageSender messageSender, IMessage
{
await TestUtility.SendMessagesAsync(messageSender, messageCount);
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount, TimeSpan.FromSeconds(10));
Assert.Equal(receivedMessages.Count, messageCount);
Assert.Equal(messageCount, receivedMessages.Count);
}

internal async Task PeekLockWithAbandonTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa
});
}

[Fact]
[LiveTest]
[DisplayTestMethodName]
public async Task ReceiveLotOfMessagesWithoutSettling()
{
// 5000 is the default limit on amqp session incoming window
int messageCount = 5010;
await ServiceBusScope.UsingQueueAsync(false, false, async queueName =>
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, queueName, receiveMode: ReceiveMode.PeekLock);
try
{
await this.ReceiveDeleteTestCase(sender, receiver, messageCount);
}
finally
{
await sender.CloseAsync();
await receiver.CloseAsync();
}
});
}

[Theory]
[MemberData(nameof(TestPermutations))]
[LiveTest]
Expand Down

0 comments on commit ff9473d

Please sign in to comment.