Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rules): do not create cached connections for automated rules #763

Merged
merged 11 commits into from
Dec 7, 2021
Merged
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ Cryostat can be configured via the following environment variables

#### Configuration for JMX Cache

* `CRYOSTAT_TARGET_CACHE_MAX_CONNECTIONS`: the maximum number of JMX connections to cache. Use `-1` for an unlimited amount
* `CRYOSTAT_TARGET_CACHE_TTL`: the time to live in seconds for cached JMX connections
* `CRYOSTAT_TARGET_CACHE_SIZE`: the maximum number of JMX connections to cache.
Use `-1` for an unlimited cache size (TTL expiration only). Defaults to `-1`.
* `CRYOSTAT_TARGET_CACHE_TTL`: the time to live (in seconds) for cached JMX
connections. Defaults to `10`.

#### Configuration for Logging

Expand Down
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,7 @@
<argument>--mount</argument>
<argument>type=tmpfs,target=/opt/cryostat.d/truststore.d</argument>
<argument>--env</argument>
<argument>CRYOSTAT_TARGET_CACHE_MAX_CONNECTIONS=1</argument>
andrewazores marked this conversation as resolved.
Show resolved Hide resolved
<argument>--env</argument>
<argument>CRYOSTAT_TARGET_CACHE_TTL=5</argument>
<argument>CRYOSTAT_TARGET_CACHE_TTL=60</argument>
<argument>--env</argument>
<argument>CRYOSTAT_DISABLE_JMX_AUTH=true</argument>
<argument>--env</argument>
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/cryostat/net/NetworkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.cryostat.net.reports.ReportsModule;
import io.cryostat.net.security.SecurityModule;
import io.cryostat.net.web.WebModule;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Scheduler;
import dagger.Binds;
Expand Down Expand Up @@ -115,11 +116,13 @@ static Duration provideMaxTargetTTL(Environment env) {
@Singleton
static TargetConnectionManager provideTargetConnectionManager(
Lazy<JFRConnectionToolkit> connectionToolkit,
PlatformClient platformClient,
@Named(TARGET_CACHE_TTL) Duration maxTargetTtl,
@Named(TARGET_CACHE_SIZE) int maxTargetConnections,
Logger logger) {
return new TargetConnectionManager(
connectionToolkit,
platformClient,
ForkJoinPool.commonPool(),
Scheduler.systemScheduler(),
maxTargetTtl,
Expand Down
137 changes: 96 additions & 41 deletions src/main/java/io/cryostat/net/TargetConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
Expand All @@ -51,11 +52,12 @@
import io.cryostat.core.net.Credentials;
import io.cryostat.core.net.JFRConnection;
import io.cryostat.core.net.JFRConnectionToolkit;
import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import dagger.Lazy;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -76,6 +78,7 @@ public class TargetConnectionManager {

TargetConnectionManager(
Lazy<JFRConnectionToolkit> jfrConnectionToolkit,
PlatformClient platform,
Executor executor,
Scheduler scheduler,
Duration ttl,
Expand All @@ -89,51 +92,66 @@ public class TargetConnectionManager {
.executor(executor)
.scheduler(scheduler)
.expireAfterAccess(ttl)
.removalListener(
new RemovalListener<ConnectionDescriptor, JFRConnection>() {
@Override
public void onRemoval(
ConnectionDescriptor descriptor,
JFRConnection connection,
RemovalCause cause) {
if (descriptor == null) {
logger.warn(
"Connection eviction triggered with null descriptor");
return;
}
if (connection == null) {
logger.warn(
"Connection eviction triggered with null connection");
return;
}
hareetd marked this conversation as resolved.
Show resolved Hide resolved
JMXConnectionClosed evt =
new JMXConnectionClosed(descriptor.getTargetId());
logger.info(
"Removing cached connection for {}",
descriptor.getTargetId());
evt.begin();
try {
connection.close();
} catch (RuntimeException e) {
evt.setExceptionThrown(true);
throw e;
} finally {
evt.end();
if (evt.shouldCommit()) {
evt.commit();
}
}
}
});
.removalListener(this::closeConnection);
if (maxTargetConnections >= 0) {
cacheBuilder = cacheBuilder.maximumSize(maxTargetConnections);
}
this.connections = cacheBuilder.build(this::connect);

// force removal of connections from cache when we're notified about targets being lost.
// This should already be taken care of by the connection close listener, but this provides
// some additional insurance in case a target disappears and the underlying JMX network
// connection doesn't immediately report itself as closed
platform.addTargetDiscoveryListener(
tde -> {
if (EventKind.LOST.equals(tde.getEventKind())) {
for (ConnectionDescriptor cd : connections.asMap().keySet()) {
if (Objects.equals(
cd.getTargetId(),
tde.getServiceRef().getServiceUri().toString())) {
connections.invalidate(cd);
}
}
}
});
}

public <T> T executeConnectedTask(
ConnectionDescriptor connectionDescriptor, ConnectedTask<T> task) throws Exception {
return task.execute(connections.get(connectionDescriptor));
return executeConnectedTask(connectionDescriptor, task, true);
}

/**
* Execute a {@link ConnectedTask}, optionally caching the connection for future re-use. If
* useCache is true then the connection will be retrieved from cache if available, or created
* and stored in the cache if not. This is subject to the cache maxSize and TTL policy. If
* useCache is false then a connection will be taken from cache if available, otherwise a new
* connection will be created externally from the cache. After the task has completed the
* connection will be closed only if the connection was not originally retrieved from the cache,
* otherwise the connection is left as-is to be subject to the cache's standard eviction policy.
* "Interactive" use cases should prefer to call this with useCache==true (or simply call {@link
* #executeConnectedTask(ConnectionDescriptor cd, ConnectedTask task)} instead). Automated use
* cases such as Automated Rules should call this with useCache==false.
*/
public <T> T executeConnectedTask(
ConnectionDescriptor connectionDescriptor, ConnectedTask<T> task, boolean useCache)
throws Exception {
if (useCache) {
return task.execute(connections.get(connectionDescriptor));
} else {
JFRConnection connection = connections.getIfPresent(connectionDescriptor);
boolean cached = connection != null;
if (!cached) {
connection = connect(connectionDescriptor);
}
try {
return task.execute(connection);
} finally {
if (!cached) {
connection.close();
}
}
}
}

/**
Expand All @@ -153,6 +171,37 @@ public boolean markConnectionInUse(ConnectionDescriptor connectionDescriptor) {
return connections.getIfPresent(connectionDescriptor) != null;
}

private void closeConnection(
ConnectionDescriptor descriptor, JFRConnection connection, RemovalCause cause) {
if (descriptor == null) {
logger.error("Connection eviction triggered with null descriptor");
return;
}
if (connection == null) {
logger.error("Connection eviction triggered with null connection");
return;
}
try {
JMXConnectionClosed evt =
new JMXConnectionClosed(descriptor.getTargetId(), cause.name());
logger.info("Removing cached connection for {}: {}", descriptor.getTargetId(), cause);
evt.begin();
try {
connection.close();
} catch (RuntimeException e) {
evt.setExceptionThrown(true);
throw e;
} finally {
evt.end();
if (evt.shouldCommit()) {
evt.commit();
}
}
} catch (Exception e) {
logger.error(e);
}
}

private JFRConnection connect(ConnectionDescriptor connectionDescriptor) throws Exception {
try {
return attemptConnectAsJMXServiceURL(connectionDescriptor);
Expand Down Expand Up @@ -191,15 +240,19 @@ private JFRConnection connect(
ConnectionDescriptor cacheKey, JMXServiceURL url, Optional<Credentials> credentials)
throws Exception {
JMXConnectionOpened evt = new JMXConnectionOpened(url.toString());
logger.info("Creating connection for {}", url.toString());
logger.info("Creating connection for {}", url);
evt.begin();
try {
return jfrConnectionToolkit
.get()
.connect(
url,
credentials.orElse(null),
Collections.singletonList(() -> this.connections.invalidate(cacheKey)));
Collections.singletonList(
() -> {
logger.info("Connection for {} closed", url);
hareetd marked this conversation as resolved.
Show resolved Hide resolved
this.connections.invalidate(cacheKey);
}));
} catch (Exception e) {
evt.setExceptionThrown(true);
throw e;
Expand Down Expand Up @@ -244,10 +297,12 @@ void setExceptionThrown(boolean exceptionThrown) {
public static class JMXConnectionClosed extends Event {
String serviceUri;
boolean exceptionThrown;
String reason;

JMXConnectionClosed(String serviceUri) {
JMXConnectionClosed(String serviceUri, String reason) {
this.serviceUri = serviceUri;
this.exceptionThrown = false;
this.reason = reason;
}

void setExceptionThrown(boolean exceptionThrown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,30 @@ public void handleWithValidJwt(RoutingContext ctx, JWT jwt) throws Exception {

Exception rootCause = (Exception) ExceptionUtils.getRootCause(ee);

if (rootCause instanceof RecordingNotFoundException
|| targetRecordingNotFound(rootCause)) {
if (targetRecordingNotFound(rootCause)) {
throw new HttpStatusException(404, ee);
}
throw ee;
}
}

private boolean targetRecordingNotFound(Exception rootCause) {
return rootCause instanceof SubprocessReportGenerator.ReportGenerationException
&& (((SubprocessReportGenerator.ReportGenerationException) rootCause)
.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE)
|| (((SubprocessReportGenerator.ReportGenerationException) rootCause).getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING);
if (rootCause instanceof RecordingNotFoundException) {
return true;
}
boolean isReportGenerationException =
rootCause instanceof SubprocessReportGenerator.ReportGenerationException;
if (!isReportGenerationException) {
return false;
}
SubprocessReportGenerator.ReportGenerationException generationException =
(SubprocessReportGenerator.ReportGenerationException) rootCause;
boolean isTargetConnectionFailure =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE;
boolean isNoSuchRecording =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING;
return isTargetConnectionFailure || isNoSuchRecording;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,30 @@ public void handleAuthenticated(RoutingContext ctx) throws Exception {

Exception rootCause = (Exception) ExceptionUtils.getRootCause(ee);

if (rootCause instanceof RecordingNotFoundException
|| targetRecordingNotFound(rootCause)) {
if (targetRecordingNotFound(rootCause)) {
throw new HttpStatusException(404, ee);
}
throw ee;
}
}

private boolean targetRecordingNotFound(Exception rootCause) {
return rootCause instanceof SubprocessReportGenerator.ReportGenerationException
&& (((SubprocessReportGenerator.ReportGenerationException) rootCause)
.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE)
|| (((SubprocessReportGenerator.ReportGenerationException) rootCause).getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING);
if (rootCause instanceof RecordingNotFoundException) {
return true;
}
boolean isReportGenerationException =
rootCause instanceof SubprocessReportGenerator.ReportGenerationException;
if (!isReportGenerationException) {
return false;
}
SubprocessReportGenerator.ReportGenerationException generationException =
(SubprocessReportGenerator.ReportGenerationException) rootCause;
boolean isTargetConnectionFailure =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE;
boolean isNoSuchRecording =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING;
return isTargetConnectionFailure || isNoSuchRecording;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public Future<String> saveRecording(
throw new RecordingNotFoundException(
"active recordings", recordingName);
}
});
},
false);
future.complete(saveName);
notificationFactory
.createBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public IRecordingDescriptor startRecording(
.build()
.send();
return desc;
});
},
false);
}

public IRecordingDescriptor startRecording(
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/cryostat/rules/RuleProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ private void archiveRuleRecording(ConnectionDescriptor connectionDescriptor, Rul
}

return null;
});
},
false);
}

private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule rule)
Expand Down Expand Up @@ -272,6 +273,7 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule
template.getLeft(),
template.getRight());
return null;
});
},
false);
}
}
Loading