Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support client specified connection name #142

Merged
merged 4 commits into from
Apr 17, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/com/rabbitmq/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
*/
Map<String, Object> getClientProperties();

/**
* Get connection name client property value
*
* @return string connection name from client properties, or null if there is not such property.
*/
String getConnectionName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be getClientProvidedConnectionName or getCustomConnectionName. I will do the renaming.


/**
* Retrieve the server properties.
* @return a map of the server properties. This typically includes the product name and version of the server.
Expand Down
120 changes: 116 additions & 4 deletions src/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.*;
import java.util.List;
import java.util.Arrays;
Expand Down Expand Up @@ -644,7 +645,25 @@ protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
* @throws IOException if it encounters a problem
*/
public Connection newConnection(Address[] addrs) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Arrays.asList(addrs));
return newConnection(this.sharedExecutor, Arrays.asList(addrs), null);
}


/**
* Create a new broker connection, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(Address[] addrs, String connectionName) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Arrays.asList(addrs), connectionName);
}

/**
Expand All @@ -660,7 +679,24 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
* @throws IOException if it encounters a problem
*/
public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, addrs);
return newConnection(this.sharedExecutor, addrs, null);
}

/**
* Create a new broker connection, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param addrs a List of known broker addresses (hostname/port pairs) to try in order
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(List<Address> addrs, String connectionName) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, addrs, connectionName);
}

/**
Expand All @@ -678,7 +714,45 @@ public Connection newConnection(List<Address> addrs) throws IOException, Timeout
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException {
return newConnection(executor, Arrays.asList(addrs));
return newConnection(executor, Arrays.asList(addrs), null);
}


/**
* Create a new broker connection, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param executor thread execution service for consumers on the connection
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws java.io.IOException if it encounters a problem
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, Address[] addrs, String connectionName) throws IOException, TimeoutException {
return newConnection(executor, Arrays.asList(addrs), connectionName);
}

/**
* Create a new broker connection, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param executor thread execution service for consumers on the connection
* @param addrs a List of known broker addrs (hostname/port pairs) to try in order
* @return an interface to the connection
* @throws java.io.IOException if it encounters a problem
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, List<Address> addrs) throws IOException, TimeoutException {
return newConnection(executor, addrs, null);
}

/**
Expand All @@ -691,15 +765,22 @@ public Connection newConnection(ExecutorService executor, Address[] addrs) throw
*
* @param executor thread execution service for consumers on the connection
* @param addrs a List of known broker addrs (hostname/port pairs) to try in order
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws java.io.IOException if it encounters a problem
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, List<Address> addrs)
public Connection newConnection(ExecutorService executor, List<Address> addrs, String connectionName)
throws IOException, TimeoutException {
// make sure we respect the provided thread factory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);
// set connection name client property
if (connectionName != null) {
Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
properties.put("connection_name", connectionName);
params.setClientProperties(properties);
}

if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
Expand Down Expand Up @@ -760,6 +841,21 @@ public Connection newConnection() throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}

/**
* Create a new broker connection.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Reconnection
* attempts will always use the address configured on {@link ConnectionFactory}.
*
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(String connectionName) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())), connectionName);
}

/**
* Create a new broker connection.
*
Expand All @@ -775,6 +871,22 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
return newConnection(executor, Collections.singletonList(new Address(getHost(), getPort())));
}

/**
* Create a new broker connection.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Reconnection
* attempts will always use the address configured on {@link ConnectionFactory}.
*
* @param executor thread execution service for consumers on the connection
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(ExecutorService executor, String connectionName) throws IOException, TimeoutException {
return newConnection(executor, Collections.singletonList(new Address(getHost(), getPort())), connectionName);
}

@Override public ConnectionFactory clone(){
try {
return (ConnectionFactory)super.clone();
Expand Down
9 changes: 9 additions & 0 deletions src/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,15 @@ public Map<String, Object> getClientProperties() {
return new HashMap<String, Object>(_clientProperties);
}

public String getConnectionName() {
Object connectionName = _clientProperties.get("connection_name");
if (connectionName == null){
return null;
} else {
return connectionName.toString();
}
}

/**
* Protected API - retrieve the current ExceptionHandler
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ public Map<String, Object> getClientProperties() {
return delegate.getClientProperties();
}

/**
* @see com.rabbitmq.client.Connection#getConnectionName()
*/
public String getConnectionName() {
return delegate.getConnectionName();
}

/**
* @see com.rabbitmq.client.Connection#getFrameMax()
*/
Expand Down
33 changes: 33 additions & 0 deletions test/src/com/rabbitmq/client/test/AMQConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import com.rabbitmq.client.Address;
import java.util.Arrays;

import com.rabbitmq.client.impl.ConnectionParams;
import com.rabbitmq.client.TopologyRecoveryException;
Expand Down Expand Up @@ -174,6 +177,36 @@ public void testConnectionHangInNegotiation() {
assertEquals("Wrong type of exception returned.", SocketTimeoutException.class, exceptionList.get(0).getClass());
}

public void testConnectionName() throws IOException, TimeoutException {
String connectionName = "custom name";
Connection connection = factory.newConnection(connectionName);
assertEquals(connectionName, connection.getConnectionName());
connection.close();

List<Address> addresses_list = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672));
connection = factory.newConnection(addresses_list, connectionName);
assertEquals(connectionName, connection.getConnectionName());
connection.close();

Address[] addresses_arr = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)};
connection = factory.newConnection(addresses_arr, connectionName);
assertEquals(connectionName, connection.getConnectionName());
connection.close();

ExecutorService executor = Executors.newSingleThreadExecutor();
connection = factory.newConnection(executor, connectionName);
assertEquals(connectionName, connection.getConnectionName());
connection.close();

connection = factory.newConnection(executor, addresses_list, connectionName);
assertEquals(connectionName, connection.getConnectionName());
connection.close();

connection = factory.newConnection(executor, addresses_arr, connectionName);
assertEquals(connectionName, connection.getConnectionName());
connection.close();
}

/** Mock frame handler to facilitate testing. */
private static class MockFrameHandler implements FrameHandler {
/** How many times has sendHeader() been called? */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ public void testConnectionRecovery() throws IOException, InterruptedException {
assertTrue(connection.isOpen());
}

public void testNamedConnectionRecovery()
throws IOException, InterruptedException, TimeoutException {
String connectionName = "custom name";
AutorecoveringConnection c = newRecoveringConnection(connectionName);
try {
assertTrue(c.isOpen());
assertEquals(connectionName, c.getConnectionName());
closeAndWaitForRecovery(c);
assertTrue(c.isOpen());
assertEquals(connectionName, c.getConnectionName());
} finally {
c.abort();
}
}

public void testConnectionRecoveryWithServerRestart() throws IOException, InterruptedException {
assertTrue(connection.isOpen());
restartPrimaryAndWaitForRecovery();
Expand Down Expand Up @@ -739,6 +754,17 @@ private AutorecoveringConnection newRecoveringConnection(List<Address> addresses
return newRecoveringConnection(false, addresses);
}

private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName)
throws IOException, TimeoutException {
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
return (AutorecoveringConnection) cf.newConnection(connectionName);
}

private AutorecoveringConnection newRecoveringConnection(String connectionName)
throws IOException, TimeoutException {
return newRecoveringConnection(false, connectionName);
}

private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) {
ConnectionFactory cf = new ConnectionFactory();
cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
Expand Down