Skip to content

Commit

Permalink
fix(rules): do not create cached connections for automated rules (#763)
Browse files Browse the repository at this point in the history
* fix(rules): signal connections opened by rules may be immediately closed

Related to #756

* fix(pom): remove itest connection cache size config

Remove connection cache size/TTL config, which used an incorrect/outdated cache max size env var name, to better reflect a standard Cryostat deployment using default cache settings (unlimited size, 10s TTL)

Fixes #762

* fix(rules): do not create cached connections for automated rules

Related to #756

Automated rules processing will reuse existing cached connections if available. If none are available then a new connection will be created and NOT cached, in order to avoid automated rules causing evictions of cached connections for other interactive clients

* remove unnecessary toString()

* re-add null guards

* allow TargetConnectionManager to handle closing connection

* fix(activereports): fix classcastexception

* fix(itest): correct /api/v1 to /api/beta in JWT asset download tests

* test(pom): set itest cache TTL to 60s

* feat(tcm): listen for target LOST events and invalidate from cache

* update README
  • Loading branch information
andrewazores authored Dec 7, 2021
1 parent 0595551 commit 7c010b2
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 76 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ Cryostat can be configured via the following environment variables

#### Configuration for JMX Cache

* `CRYOSTAT_TARGET_CACHE_MAX_CONNECTIONS`: the maximum number of JMX connections to cache. Use `-1` for an unlimited amount
* `CRYOSTAT_TARGET_CACHE_TTL`: the time to live in seconds for cached JMX connections
* `CRYOSTAT_TARGET_CACHE_SIZE`: the maximum number of JMX connections to cache.
Use `-1` for an unlimited cache size (TTL expiration only). Defaults to `-1`.
* `CRYOSTAT_TARGET_CACHE_TTL`: the time to live (in seconds) for cached JMX
connections. Defaults to `10`.

#### Configuration for Logging

Expand Down
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,7 @@
<argument>--mount</argument>
<argument>type=tmpfs,target=/opt/cryostat.d/truststore.d</argument>
<argument>--env</argument>
<argument>CRYOSTAT_TARGET_CACHE_MAX_CONNECTIONS=1</argument>
<argument>--env</argument>
<argument>CRYOSTAT_TARGET_CACHE_TTL=5</argument>
<argument>CRYOSTAT_TARGET_CACHE_TTL=60</argument>
<argument>--env</argument>
<argument>CRYOSTAT_DISABLE_JMX_AUTH=true</argument>
<argument>--env</argument>
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/cryostat/net/NetworkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.cryostat.net.reports.ReportsModule;
import io.cryostat.net.security.SecurityModule;
import io.cryostat.net.web.WebModule;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Scheduler;
import dagger.Binds;
Expand Down Expand Up @@ -115,11 +116,13 @@ static Duration provideMaxTargetTTL(Environment env) {
@Singleton
static TargetConnectionManager provideTargetConnectionManager(
Lazy<JFRConnectionToolkit> connectionToolkit,
PlatformClient platformClient,
@Named(TARGET_CACHE_TTL) Duration maxTargetTtl,
@Named(TARGET_CACHE_SIZE) int maxTargetConnections,
Logger logger) {
return new TargetConnectionManager(
connectionToolkit,
platformClient,
ForkJoinPool.commonPool(),
Scheduler.systemScheduler(),
maxTargetTtl,
Expand Down
137 changes: 96 additions & 41 deletions src/main/java/io/cryostat/net/TargetConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
Expand All @@ -51,11 +52,12 @@
import io.cryostat.core.net.Credentials;
import io.cryostat.core.net.JFRConnection;
import io.cryostat.core.net.JFRConnectionToolkit;
import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import dagger.Lazy;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -76,6 +78,7 @@ public class TargetConnectionManager {

TargetConnectionManager(
Lazy<JFRConnectionToolkit> jfrConnectionToolkit,
PlatformClient platform,
Executor executor,
Scheduler scheduler,
Duration ttl,
Expand All @@ -89,51 +92,66 @@ public class TargetConnectionManager {
.executor(executor)
.scheduler(scheduler)
.expireAfterAccess(ttl)
.removalListener(
new RemovalListener<ConnectionDescriptor, JFRConnection>() {
@Override
public void onRemoval(
ConnectionDescriptor descriptor,
JFRConnection connection,
RemovalCause cause) {
if (descriptor == null) {
logger.warn(
"Connection eviction triggered with null descriptor");
return;
}
if (connection == null) {
logger.warn(
"Connection eviction triggered with null connection");
return;
}
JMXConnectionClosed evt =
new JMXConnectionClosed(descriptor.getTargetId());
logger.info(
"Removing cached connection for {}",
descriptor.getTargetId());
evt.begin();
try {
connection.close();
} catch (RuntimeException e) {
evt.setExceptionThrown(true);
throw e;
} finally {
evt.end();
if (evt.shouldCommit()) {
evt.commit();
}
}
}
});
.removalListener(this::closeConnection);
if (maxTargetConnections >= 0) {
cacheBuilder = cacheBuilder.maximumSize(maxTargetConnections);
}
this.connections = cacheBuilder.build(this::connect);

// force removal of connections from cache when we're notified about targets being lost.
// This should already be taken care of by the connection close listener, but this provides
// some additional insurance in case a target disappears and the underlying JMX network
// connection doesn't immediately report itself as closed
platform.addTargetDiscoveryListener(
tde -> {
if (EventKind.LOST.equals(tde.getEventKind())) {
for (ConnectionDescriptor cd : connections.asMap().keySet()) {
if (Objects.equals(
cd.getTargetId(),
tde.getServiceRef().getServiceUri().toString())) {
connections.invalidate(cd);
}
}
}
});
}

public <T> T executeConnectedTask(
ConnectionDescriptor connectionDescriptor, ConnectedTask<T> task) throws Exception {
return task.execute(connections.get(connectionDescriptor));
return executeConnectedTask(connectionDescriptor, task, true);
}

/**
* Execute a {@link ConnectedTask}, optionally caching the connection for future re-use. If
* useCache is true then the connection will be retrieved from cache if available, or created
* and stored in the cache if not. This is subject to the cache maxSize and TTL policy. If
* useCache is false then a connection will be taken from cache if available, otherwise a new
* connection will be created externally from the cache. After the task has completed the
* connection will be closed only if the connection was not originally retrieved from the cache,
* otherwise the connection is left as-is to be subject to the cache's standard eviction policy.
* "Interactive" use cases should prefer to call this with useCache==true (or simply call {@link
* #executeConnectedTask(ConnectionDescriptor cd, ConnectedTask task)} instead). Automated use
* cases such as Automated Rules should call this with useCache==false.
*/
public <T> T executeConnectedTask(
ConnectionDescriptor connectionDescriptor, ConnectedTask<T> task, boolean useCache)
throws Exception {
if (useCache) {
return task.execute(connections.get(connectionDescriptor));
} else {
JFRConnection connection = connections.getIfPresent(connectionDescriptor);
boolean cached = connection != null;
if (!cached) {
connection = connect(connectionDescriptor);
}
try {
return task.execute(connection);
} finally {
if (!cached) {
connection.close();
}
}
}
}

/**
Expand All @@ -153,6 +171,37 @@ public boolean markConnectionInUse(ConnectionDescriptor connectionDescriptor) {
return connections.getIfPresent(connectionDescriptor) != null;
}

private void closeConnection(
ConnectionDescriptor descriptor, JFRConnection connection, RemovalCause cause) {
if (descriptor == null) {
logger.error("Connection eviction triggered with null descriptor");
return;
}
if (connection == null) {
logger.error("Connection eviction triggered with null connection");
return;
}
try {
JMXConnectionClosed evt =
new JMXConnectionClosed(descriptor.getTargetId(), cause.name());
logger.info("Removing cached connection for {}: {}", descriptor.getTargetId(), cause);
evt.begin();
try {
connection.close();
} catch (RuntimeException e) {
evt.setExceptionThrown(true);
throw e;
} finally {
evt.end();
if (evt.shouldCommit()) {
evt.commit();
}
}
} catch (Exception e) {
logger.error(e);
}
}

private JFRConnection connect(ConnectionDescriptor connectionDescriptor) throws Exception {
try {
return attemptConnectAsJMXServiceURL(connectionDescriptor);
Expand Down Expand Up @@ -191,15 +240,19 @@ private JFRConnection connect(
ConnectionDescriptor cacheKey, JMXServiceURL url, Optional<Credentials> credentials)
throws Exception {
JMXConnectionOpened evt = new JMXConnectionOpened(url.toString());
logger.info("Creating connection for {}", url.toString());
logger.info("Creating connection for {}", url);
evt.begin();
try {
return jfrConnectionToolkit
.get()
.connect(
url,
credentials.orElse(null),
Collections.singletonList(() -> this.connections.invalidate(cacheKey)));
Collections.singletonList(
() -> {
logger.info("Connection for {} closed", url);
this.connections.invalidate(cacheKey);
}));
} catch (Exception e) {
evt.setExceptionThrown(true);
throw e;
Expand Down Expand Up @@ -244,10 +297,12 @@ void setExceptionThrown(boolean exceptionThrown) {
public static class JMXConnectionClosed extends Event {
String serviceUri;
boolean exceptionThrown;
String reason;

JMXConnectionClosed(String serviceUri) {
JMXConnectionClosed(String serviceUri, String reason) {
this.serviceUri = serviceUri;
this.exceptionThrown = false;
this.reason = reason;
}

void setExceptionThrown(boolean exceptionThrown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,30 @@ public void handleWithValidJwt(RoutingContext ctx, JWT jwt) throws Exception {

Exception rootCause = (Exception) ExceptionUtils.getRootCause(ee);

if (rootCause instanceof RecordingNotFoundException
|| targetRecordingNotFound(rootCause)) {
if (targetRecordingNotFound(rootCause)) {
throw new HttpStatusException(404, ee);
}
throw ee;
}
}

private boolean targetRecordingNotFound(Exception rootCause) {
return rootCause instanceof SubprocessReportGenerator.ReportGenerationException
&& (((SubprocessReportGenerator.ReportGenerationException) rootCause)
.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE)
|| (((SubprocessReportGenerator.ReportGenerationException) rootCause).getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING);
if (rootCause instanceof RecordingNotFoundException) {
return true;
}
boolean isReportGenerationException =
rootCause instanceof SubprocessReportGenerator.ReportGenerationException;
if (!isReportGenerationException) {
return false;
}
SubprocessReportGenerator.ReportGenerationException generationException =
(SubprocessReportGenerator.ReportGenerationException) rootCause;
boolean isTargetConnectionFailure =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE;
boolean isNoSuchRecording =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING;
return isTargetConnectionFailure || isNoSuchRecording;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,30 @@ public void handleAuthenticated(RoutingContext ctx) throws Exception {

Exception rootCause = (Exception) ExceptionUtils.getRootCause(ee);

if (rootCause instanceof RecordingNotFoundException
|| targetRecordingNotFound(rootCause)) {
if (targetRecordingNotFound(rootCause)) {
throw new HttpStatusException(404, ee);
}
throw ee;
}
}

private boolean targetRecordingNotFound(Exception rootCause) {
return rootCause instanceof SubprocessReportGenerator.ReportGenerationException
&& (((SubprocessReportGenerator.ReportGenerationException) rootCause)
.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE)
|| (((SubprocessReportGenerator.ReportGenerationException) rootCause).getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING);
if (rootCause instanceof RecordingNotFoundException) {
return true;
}
boolean isReportGenerationException =
rootCause instanceof SubprocessReportGenerator.ReportGenerationException;
if (!isReportGenerationException) {
return false;
}
SubprocessReportGenerator.ReportGenerationException generationException =
(SubprocessReportGenerator.ReportGenerationException) rootCause;
boolean isTargetConnectionFailure =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.TARGET_CONNECTION_FAILURE;
boolean isNoSuchRecording =
generationException.getStatus()
== SubprocessReportGenerator.ExitStatus.NO_SUCH_RECORDING;
return isTargetConnectionFailure || isNoSuchRecording;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public Future<String> saveRecording(
throw new RecordingNotFoundException(
"active recordings", recordingName);
}
});
},
false);
future.complete(saveName);
notificationFactory
.createBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public IRecordingDescriptor startRecording(
.build()
.send();
return desc;
});
},
false);
}

public IRecordingDescriptor startRecording(
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/cryostat/rules/RuleProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ private void archiveRuleRecording(ConnectionDescriptor connectionDescriptor, Rul
}

return null;
});
},
false);
}

private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule rule)
Expand Down Expand Up @@ -272,6 +273,7 @@ private void startRuleRecording(ConnectionDescriptor connectionDescriptor, Rule
template.getLeft(),
template.getRight());
return null;
});
},
false);
}
}
Loading

0 comments on commit 7c010b2

Please sign in to comment.