Skip to content

Commit

Permalink
Implement concurrent JMX connection and enable timed reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Dec 18, 2020
1 parent d4da3aa commit d5cbe46
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ static NetworkResolver provideNetworkResolver() {
@Singleton
static TargetConnectionManager provideTargetConnectionManager(
Logger logger, Lazy<JFRConnectionToolkit> connectionToolkit) {
return new TargetConnectionManager(logger, connectionToolkit);
return new TargetConnectionManager(
logger, connectionToolkit, TargetConnectionManager.DEFAULT_TTL);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,54 +42,69 @@
package com.redhat.rhjmc.containerjfr.net;

import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.management.remote.JMXServiceURL;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;

import com.redhat.rhjmc.containerjfr.core.log.Logger;
import com.redhat.rhjmc.containerjfr.core.net.Credentials;
import com.redhat.rhjmc.containerjfr.core.net.JFRConnection;
import com.redhat.rhjmc.containerjfr.core.net.JFRConnectionToolkit;
import dagger.Lazy;

public class TargetConnectionManager {
public class TargetConnectionManager
implements RemovalListener<ConnectionDescriptor, JFRConnection> {

public static final Pattern HOST_PORT_PAIR_PATTERN =
Pattern.compile("^([^:\\s]+)(?::(\\d{1,5}))?$");

static final Duration DEFAULT_TTL = Duration.ofSeconds(90);

private final Logger logger;
// FIXME verify concurrent connection safety and remove locking
private final ReentrantLock lock = new ReentrantLock();
// maintain a short-lived cache of connections to allow nested ConnectedTasks
// without having to manage connection reuse
private final Map<ConnectionDescriptor, JFRConnection> activeConnections = new HashMap<>();
private final Lazy<JFRConnectionToolkit> jfrConnectionToolkit;

public TargetConnectionManager(Logger logger, Lazy<JFRConnectionToolkit> jfrConnectionToolkit) {
private final LoadingCache<ConnectionDescriptor, JFRConnection> connections;

TargetConnectionManager(
Logger logger, Lazy<JFRConnectionToolkit> jfrConnectionToolkit, Duration ttl) {
this.logger = logger;
this.jfrConnectionToolkit = jfrConnectionToolkit;

this.connections =
Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
.expireAfterAccess(ttl)
.removalListener(this)
.build(this::connect);
}

@Override
public void onRemoval(
ConnectionDescriptor descriptor, JFRConnection connection, RemovalCause cause) {
if (descriptor == null) {
logger.warn("Connection eviction triggered with null descriptor");
return;
}
if (connection == null) {
logger.warn("Connection eviction triggered with null connection");
return;
}
logger.info(String.format("Removing cached connection for %s", descriptor.getTargetId()));
connection.close();
}

public <T> T executeConnectedTask(
ConnectionDescriptor connectionDescriptor, ConnectedTask<T> task) throws Exception {
try {
if (activeConnections.containsKey(connectionDescriptor)) {
return task.execute(activeConnections.get(connectionDescriptor));
} else {
try (JFRConnection connection = connect(connectionDescriptor)) {
activeConnections.put(connectionDescriptor, connection);
return task.execute(connection);
}
}
} finally {
activeConnections.remove(connectionDescriptor);
}
return task.execute(connections.get(connectionDescriptor));
}

/**
Expand Down Expand Up @@ -132,20 +147,8 @@ private JFRConnection attemptConnectAsHostPortPair(ConnectionDescriptor connecti

private JFRConnection connect(JMXServiceURL url, Optional<Credentials> credentials)
throws Exception {
logger.trace(String.format("Locking connection %s", url.toString()));
lock.lockInterruptibly();
return jfrConnectionToolkit
.get()
.connect(
url,
credentials.orElse(null),
List.of(
lock::unlock,
() ->
logger.trace(
String.format(
"Unlocking connection %s",
url.toString()))));
logger.info(String.format("Creating connection for %s", url.toString()));
return jfrConnectionToolkit.get().connect(url, credentials.orElse(null));
}

public interface ConnectedTask<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
*/
package com.redhat.rhjmc.containerjfr.net;

import java.time.Duration;

import javax.management.remote.JMXServiceURL;

import org.hamcrest.MatcherAssert;
Expand All @@ -64,17 +66,17 @@ class TargetConnectionManagerTest {
TargetConnectionManager mgr;
@Mock Logger logger;
@Mock JFRConnectionToolkit jfrConnectionToolkit;
@Mock JFRConnection conn;
Duration TTL = Duration.ofMillis(250);

@BeforeEach
void setup() {
this.mgr = new TargetConnectionManager(logger, () -> jfrConnectionToolkit);
this.mgr = new TargetConnectionManager(logger, () -> jfrConnectionToolkit, TTL);
}

@Test
void shouldDelegateToToolkitForJMXURLConnection() throws Exception {
Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(conn);
JFRConnection conn = Mockito.mock(JFRConnection.class);
Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any())).thenReturn(conn);
JFRConnection c =
mgr.connect(
new ConnectionDescriptor(
Expand All @@ -98,19 +100,92 @@ public JMXServiceURL answer(InvocationOnMock args) throws Throwable {
String.format("/jndi/rmi://%s:%d/jmxrmi", host, port));
}
});
Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(conn);
JFRConnection a =
mgr.executeConnectedTask(
new ConnectionDescriptor("foo"),
b -> {
JFRConnection d =
mgr.executeConnectedTask(
new ConnectionDescriptor("foo"), c -> c);
MatcherAssert.assertThat(d, Matchers.sameInstance(b));
MatcherAssert.assertThat(d, Matchers.sameInstance(conn));
return b;
Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any()))
.thenAnswer(
new Answer<JFRConnection>() {
@Override
public JFRConnection answer(InvocationOnMock invocation)
throws Throwable {
return Mockito.mock(JFRConnection.class);
}
});
ConnectionDescriptor descriptor = new ConnectionDescriptor("foo");
mgr.executeConnectedTask(
descriptor,
conn1 -> {
mgr.executeConnectedTask(
descriptor,
conn2 -> {
MatcherAssert.assertThat(conn1, Matchers.sameInstance(conn2));
return null;
});
return null;
});
}

@Test
void shouldReuseConnectionInSequentialAccessWithoutDelay() throws Exception {
Mockito.when(jfrConnectionToolkit.createServiceURL(Mockito.anyString(), Mockito.anyInt()))
.thenAnswer(
new Answer<JMXServiceURL>() {
@Override
public JMXServiceURL answer(InvocationOnMock args) throws Throwable {
String host = args.getArgument(0);
int port = args.getArgument(1);
return new JMXServiceURL(
"rmi",
"",
0,
String.format("/jndi/rmi://%s:%d/jmxrmi", host, port));
}
});
Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any()))
.thenAnswer(
new Answer<JFRConnection>() {
@Override
public JFRConnection answer(InvocationOnMock invocation)
throws Throwable {
return Mockito.mock(JFRConnection.class);
}
});
ConnectionDescriptor desc = new ConnectionDescriptor("foo");
JFRConnection conn1 = mgr.executeConnectedTask(desc, a -> a);
JFRConnection conn2 = mgr.executeConnectedTask(desc, a -> a);
MatcherAssert.assertThat(conn1, Matchers.sameInstance(conn2));
}

@Test
void shouldCreateNewConnectionForAccessDelayedLongerThanTTL() throws Exception {
TargetConnectionManager mgr =
new TargetConnectionManager(
logger, () -> jfrConnectionToolkit, Duration.ofNanos(1));
Mockito.when(jfrConnectionToolkit.createServiceURL(Mockito.anyString(), Mockito.anyInt()))
.thenAnswer(
new Answer<JMXServiceURL>() {
@Override
public JMXServiceURL answer(InvocationOnMock args) throws Throwable {
String host = args.getArgument(0);
int port = args.getArgument(1);
return new JMXServiceURL(
"rmi",
"",
0,
String.format("/jndi/rmi://%s:%d/jmxrmi", host, port));
}
});
Mockito.when(jfrConnectionToolkit.connect(Mockito.any(), Mockito.any()))
.thenAnswer(
new Answer<JFRConnection>() {
@Override
public JFRConnection answer(InvocationOnMock invocation)
throws Throwable {
return Mockito.mock(JFRConnection.class);
}
});
MatcherAssert.assertThat(a, Matchers.sameInstance(conn));
ConnectionDescriptor desc = new ConnectionDescriptor("foo");
JFRConnection conn1 = mgr.executeConnectedTask(desc, a -> a);
Thread.sleep(10);
JFRConnection conn2 = mgr.executeConnectedTask(desc, a -> a);
MatcherAssert.assertThat(conn1, Matchers.not(Matchers.sameInstance(conn2)));
}
}

0 comments on commit d5cbe46

Please sign in to comment.