Skip to content

Commit

Permalink
Merge pull request #1416 from rabbitmq/rabbitmq-dotnet-client-1414-6.x
Browse files Browse the repository at this point in the history
Removed ReceiveBufferSize and SendBufferSize to improve message rates
  • Loading branch information
lukebakken authored Nov 15, 2023
2 parents ac2c4de + b612715 commit d39f36b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
4 changes: 1 addition & 3 deletions projects/RabbitMQ.Client/client/api/ConnectionFactoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
{
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true,
ReceiveBufferSize = 65536,
SendBufferSize = 65536
NoDelay = true
};
return new TcpClientAdapter(socket);
}
Expand Down
67 changes: 49 additions & 18 deletions projects/Unit/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using NUnit.Framework;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client.Unit
{
Expand All @@ -44,12 +45,12 @@ public class TestConnectionFactory
[Test]
public void TestProperties()
{
string u = "username";
string u = "username";
string pw = "password";
string v = "vhost";
string h = "192.168.0.1";
int p = 5674;
uint mms = 512 * 1024 * 1024;
string v = "vhost";
string h = "192.168.0.1";
int p = 5674;
uint mms = 512 * 1024 * 1024;

var cf = new ConnectionFactory
{
Expand All @@ -73,6 +74,35 @@ public void TestProperties()
Assert.AreEqual(cf.Endpoint.MaxMessageSize, mms);
}

[Test]
public void TestConnectionFactoryWithCustomSocketFactory()
{
const int defaultSocketBufsz = 8192; // From the docs
const int bufsz = 1024;

var cf = new ConnectionFactory
{
SocketFactory = (AddressFamily af) =>
{
var socket = new Socket(af, SocketType.Stream, ProtocolType.Tcp)
{
SendBufferSize = bufsz,
ReceiveBufferSize = bufsz,
NoDelay = false
};
return new TcpClientAdapter(socket);
}
};

ITcpClient c = cf.SocketFactory(AddressFamily.InterNetwork);
Assert.IsAssignableFrom(typeof(TcpClientAdapter), c);
TcpClientAdapter tcpClientAdapter = (TcpClientAdapter)c;
Socket s = tcpClientAdapter.Client;
Assert.AreNotEqual(defaultSocketBufsz, s.ReceiveBufferSize);
Assert.AreNotEqual(defaultSocketBufsz, s.SendBufferSize);
Assert.False(s.NoDelay);
}

[Test]
public void TestCreateConnectionUsesSpecifiedPort()
{
Expand All @@ -84,7 +114,7 @@ public void TestCreateConnectionUsesSpecifiedPort()
};
Assert.That(() =>
{
using(IConnection conn = cf.CreateConnection()) {}
using (IConnection conn = cf.CreateConnection()) { }
}, Throws.TypeOf<BrokerUnreachableException>());
}

Expand All @@ -99,7 +129,7 @@ public void TestCreateConnectionWithClientProvidedNameUsesSpecifiedPort()
};
Assert.That(() =>
{
using(IConnection conn = cf.CreateConnection("some_name")) {}
using (IConnection conn = cf.CreateConnection("some_name")) { }
}, Throws.TypeOf<BrokerUnreachableException>());
}

Expand Down Expand Up @@ -154,10 +184,10 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
AutomaticRecoveryEnabled = true
};
var xs = new System.Collections.Generic.List<AmqpTcpEndpoint> { new AmqpTcpEndpoint("localhost") };
using(IConnection conn = cf.CreateConnection(xs, "some_name"))
using (IConnection conn = cf.CreateConnection(xs, "some_name"))
{
Assert.AreEqual("some_name", conn.ClientProvidedName);
Assert.AreEqual("some_name", conn.ClientProperties["connection_name"]);
Assert.AreEqual("some_name", conn.ClientProvidedName);
Assert.AreEqual("some_name", conn.ClientProperties["connection_name"]);
}
}

Expand All @@ -169,7 +199,8 @@ public void TestCreateConnectionUsesDefaultPort()
AutomaticRecoveryEnabled = true,
HostName = "localhost"
};
using (IConnection conn = cf.CreateConnection()){
using (IConnection conn = cf.CreateConnection())
{
Assert.AreEqual(5672, conn.Endpoint.Port);
}
}
Expand Down Expand Up @@ -217,7 +248,7 @@ public void TestCreateConnectionWithAutoRecoveryUsesAmqpTcpEndpoint()
Port = 1234
};
var ep = new AmqpTcpEndpoint("localhost");
using(IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) {}
using (IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) { }
}

[Test]
Expand All @@ -230,7 +261,7 @@ public void TestCreateConnectionWithAutoRecoveryUsesInvalidAmqpTcpEndpoint()
var ep = new AmqpTcpEndpoint("localhost", 1234);
Assert.That(() =>
{
using(IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })){}
using (IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) { }
}, Throws.TypeOf<BrokerUnreachableException>());
}

Expand All @@ -243,7 +274,7 @@ public void TestCreateConnectionUsesAmqpTcpEndpoint()
Port = 1234
};
var ep = new AmqpTcpEndpoint("localhost");
using(IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) {}
using (IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) { }
}

[Test]
Expand All @@ -258,7 +289,7 @@ public void TestCreateConnectionWithForcedAddressFamily()
AddressFamily = System.Net.Sockets.AddressFamily.InterNetwork
};
cf.Endpoint = ep;
using(IConnection conn = cf.CreateConnection()){};
using (IConnection conn = cf.CreateConnection()) { };
}

[Test]
Expand All @@ -268,7 +299,7 @@ public void TestCreateConnectionUsesInvalidAmqpTcpEndpoint()
var ep = new AmqpTcpEndpoint("localhost", 1234);
Assert.That(() =>
{
using(IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) {}
using (IConnection conn = cf.CreateConnection(new System.Collections.Generic.List<AmqpTcpEndpoint> { ep })) { }
}, Throws.TypeOf<BrokerUnreachableException>());
}

Expand All @@ -278,7 +309,7 @@ public void TestCreateConnectioUsesValidEndpointWhenMultipleSupplied()
var cf = new ConnectionFactory();
var invalidEp = new AmqpTcpEndpoint("not_localhost");
var ep = new AmqpTcpEndpoint("localhost");
using(IConnection conn = cf.CreateConnection(new List<AmqpTcpEndpoint> { invalidEp, ep })) {};
using (IConnection conn = cf.CreateConnection(new List<AmqpTcpEndpoint> { invalidEp, ep })) { };
}

[Test]
Expand Down

0 comments on commit d39f36b

Please sign in to comment.