Skip to content

Commit

Permalink
Merge pull request #142 from rabbitmq/rabbitmq-server-104
Browse files Browse the repository at this point in the history
Support client specified connection name
  • Loading branch information
michaelklishin committed Apr 17, 2016
2 parents 52c7b1c + 1835085 commit 177ec8b
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 4 deletions.
14 changes: 14 additions & 0 deletions src/com/rabbitmq/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* Public API: Interface to an AMQ connection. See the see the <a href="http://www.amqp.org/">spec</a> for details.
Expand Down Expand Up @@ -93,6 +94,19 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
*/
Map<String, Object> getClientProperties();

/**
* Returns client-provided connection name, if any. Note that the value
* returned does not uniquely identify a connection and cannot be used
* as a connection identifier in HTTP API requests.
*
*
*
* @return client-provided connection name, if any
* @see ConnectionFactory#newConnection(Address[], String)
* @see ConnectionFactory#newConnection(ExecutorService, Address[], String)
*/
String getClientProvidedName();

/**
* Retrieve the server properties.
* @return a map of the server properties. This typically includes the product name and version of the server.
Expand Down
136 changes: 132 additions & 4 deletions src/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.*;
import java.util.List;
import java.util.Arrays;
Expand Down Expand Up @@ -644,7 +645,29 @@ protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
* @throws IOException if it encounters a problem
*/
public Connection newConnection(Address[] addrs) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Arrays.asList(addrs));
return newConnection(this.sharedExecutor, Arrays.asList(addrs), null);
}


/**
* Create a new broker connection with a client-provided name, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
* @param clientProvidedName application-specific connection name, will be displayed
* in the management UI if RabbitMQ server supports it.
* This value doesn't have to be unique and cannot be used
* as a connection identifier e.g. in HTTP API requests.
* This value is supposed to be human-readable.
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(Address[] addrs, String clientProvidedName) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Arrays.asList(addrs), clientProvidedName);
}

/**
Expand All @@ -660,7 +683,28 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
* @throws IOException if it encounters a problem
*/
public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, addrs);
return newConnection(this.sharedExecutor, addrs, null);
}

/**
* Create a new broker connection with a client-provided name, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param addrs a List of known broker addresses (hostname/port pairs) to try in order
* @param clientProvidedName application-specific connection name, will be displayed
* in the management UI if RabbitMQ server supports it.
* This value doesn't have to be unique and cannot be used
* as a connection identifier e.g. in HTTP API requests.
* This value is supposed to be human-readable.
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(List<Address> addrs, String clientProvidedName) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, addrs, clientProvidedName);
}

/**
Expand All @@ -678,7 +722,31 @@ public Connection newConnection(List<Address> addrs) throws IOException, Timeout
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException {
return newConnection(executor, Arrays.asList(addrs));
return newConnection(executor, Arrays.asList(addrs), null);
}


/**
* Create a new broker connection with a client-provided name, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param executor thread execution service for consumers on the connection
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
* @param clientProvidedName application-specific connection name, will be displayed
* in the management UI if RabbitMQ server supports it.
* This value doesn't have to be unique and cannot be used
* as a connection identifier e.g. in HTTP API requests.
* This value is supposed to be human-readable.
* @return an interface to the connection
* @throws java.io.IOException if it encounters a problem
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, Address[] addrs, String clientProvidedName) throws IOException, TimeoutException {
return newConnection(executor, Arrays.asList(addrs), clientProvidedName);
}

/**
Expand All @@ -695,11 +763,40 @@ public Connection newConnection(ExecutorService executor, Address[] addrs) throw
* @throws java.io.IOException if it encounters a problem
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, List<Address> addrs)
public Connection newConnection(ExecutorService executor, List<Address> addrs) throws IOException, TimeoutException {
return newConnection(executor, addrs, null);
}

/**
* Create a new broker connection with a client-provided name, picking the first available address from
* the list.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
* reconnection attempts will pick a random accessible address from the provided list.
*
* @param executor thread execution service for consumers on the connection
* @param addrs a List of known broker addrs (hostname/port pairs) to try in order
* @param clientProvidedName application-specific connection name, will be displayed
* in the management UI if RabbitMQ server supports it.
* This value doesn't have to be unique and cannot be used
* as a connection identifier e.g. in HTTP API requests.
* This value is supposed to be human-readable.
* @return an interface to the connection
* @throws java.io.IOException if it encounters a problem
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
*/
public Connection newConnection(ExecutorService executor, List<Address> addrs, String clientProvidedName)
throws IOException, TimeoutException {
// make sure we respect the provided thread factory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}

if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
Expand Down Expand Up @@ -760,6 +857,21 @@ public Connection newConnection() throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}

/**
* Create a new broker connection.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Reconnection
* attempts will always use the address configured on {@link ConnectionFactory}.
*
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(String connectionName) throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())), connectionName);
}

/**
* Create a new broker connection.
*
Expand All @@ -775,6 +887,22 @@ public Connection newConnection(ExecutorService executor) throws IOException, Ti
return newConnection(executor, Collections.singletonList(new Address(getHost(), getPort())));
}

/**
* Create a new broker connection.
*
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
* is enabled, the connection returned by this method will be {@link Recoverable}. Reconnection
* attempts will always use the address configured on {@link ConnectionFactory}.
*
* @param executor thread execution service for consumers on the connection
* @param connectionName arbitrary sring for connection name client property
* @return an interface to the connection
* @throws IOException if it encounters a problem
*/
public Connection newConnection(ExecutorService executor, String connectionName) throws IOException, TimeoutException {
return newConnection(executor, Collections.singletonList(new Address(getHost(), getPort())), connectionName);
}

@Override public ConnectionFactory clone(){
try {
return (ConnectionFactory)super.clone();
Expand Down
4 changes: 4 additions & 0 deletions src/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,10 @@ public Map<String, Object> getClientProperties() {
return new HashMap<String, Object>(_clientProperties);
}

public String getClientProvidedName() {
return (String) _clientProperties.get("connection_name");
}

/**
* Protected API - retrieve the current ExceptionHandler
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MissedHeartbeatException;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
Expand All @@ -27,6 +28,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -156,6 +158,15 @@ public Map<String, Object> getClientProperties() {
return delegate.getClientProperties();
}

/**
* @see com.rabbitmq.client.Connection#getClientProvidedName()
* @see ConnectionFactory#newConnection(Address[], String)
* @see ConnectionFactory#newConnection(ExecutorService, Address[], String)
*/
public String getClientProvidedName() {
return delegate.getClientProvidedName();
}

/**
* @see com.rabbitmq.client.Connection#getFrameMax()
*/
Expand Down
33 changes: 33 additions & 0 deletions test/src/com/rabbitmq/client/test/AMQConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import com.rabbitmq.client.Address;
import java.util.Arrays;

import com.rabbitmq.client.impl.ConnectionParams;
import com.rabbitmq.client.TopologyRecoveryException;
Expand Down Expand Up @@ -174,6 +177,36 @@ public void testConnectionHangInNegotiation() {
assertEquals("Wrong type of exception returned.", SocketTimeoutException.class, exceptionList.get(0).getClass());
}

public void testClientProvidedConnectionName() throws IOException, TimeoutException {
String providedName = "event consumers connection";
Connection connection = factory.newConnection(providedName);
assertEquals(providedName, connection.getClientProvidedName());
connection.close();

List<Address> addrs1 = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672));
connection = factory.newConnection(addrs1, providedName);
assertEquals(providedName, connection.getClientProvidedName());
connection.close();

Address[] addrs2 = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)};
connection = factory.newConnection(addrs2, providedName);
assertEquals(providedName, connection.getClientProvidedName());
connection.close();

ExecutorService xs = Executors.newSingleThreadExecutor();
connection = factory.newConnection(xs, providedName);
assertEquals(providedName, connection.getClientProvidedName());
connection.close();

connection = factory.newConnection(xs, addrs1, providedName);
assertEquals(providedName, connection.getClientProvidedName());
connection.close();

connection = factory.newConnection(xs, addrs2, providedName);
assertEquals(providedName, connection.getClientProvidedName());
connection.close();
}

/** Mock frame handler to facilitate testing. */
private static class MockFrameHandler implements FrameHandler {
/** How many times has sendHeader() been called? */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ public void testConnectionRecovery() throws IOException, InterruptedException {
assertTrue(connection.isOpen());
}

public void testNamedConnectionRecovery()
throws IOException, InterruptedException, TimeoutException {
String connectionName = "custom name";
AutorecoveringConnection c = newRecoveringConnection(connectionName);
try {
assertTrue(c.isOpen());
assertEquals(connectionName, c.getClientProvidedName());
closeAndWaitForRecovery(c);
assertTrue(c.isOpen());
assertEquals(connectionName, c.getClientProvidedName());
} finally {
c.abort();
}
}

public void testConnectionRecoveryWithServerRestart() throws IOException, InterruptedException {
assertTrue(connection.isOpen());
restartPrimaryAndWaitForRecovery();
Expand Down Expand Up @@ -739,6 +754,17 @@ private AutorecoveringConnection newRecoveringConnection(List<Address> addresses
return newRecoveringConnection(false, addresses);
}

private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName)
throws IOException, TimeoutException {
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
return (AutorecoveringConnection) cf.newConnection(connectionName);
}

private AutorecoveringConnection newRecoveringConnection(String connectionName)
throws IOException, TimeoutException {
return newRecoveringConnection(false, connectionName);
}

private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) {
ConnectionFactory cf = new ConnectionFactory();
cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
Expand Down

0 comments on commit 177ec8b

Please sign in to comment.