Skip to content

Commit

Permalink
fix(rules): rule triggering on late-connecting targets (#1646)
Browse files Browse the repository at this point in the history
* tolerate MODIFIED JVM discovery events

* handle target MODIFIED discovery event by re-triggering actions

* rule triggering only replaces STOPPED recordings

* refactoring, parallel execution, handle MODIFIED events that may occur externally

* retry backoff and timeout on nonconnectable targets

* forget about non-connectable targets if they get LOST in the meantime

* only apply timeout/backoff logic to timer-based reconnection attempts, not credentials or discovery driven events

* process rules on dedicated executor pool, not Vertx pool

* perform rule triggering when credentials are added on executor pool

* evaluate rule cleanup against all discovered targets, and be tolerant of already-stopped condition

* attempt rule activation against all targets and handle potential for activation already done on another duplicate target definition

* eliminate bugged method

* use ScheduledExecutorService for background rules processing

* use multithreaded pool

* re-trigger rules on credential removal

* track rule activations using JVM IDs, not whole ServiceRefs

* rule uniqueness should not depend on currently enabled state

* perform discovery background tasks using dedicated scheduler thread and worker pool

* try to add more timeout handling to JVM ID retrieval

* add tolerance to flaky wall-time-checking unit test
  • Loading branch information
andrewazores authored Sep 18, 2023
1 parent 4c6751b commit 5f578f9
Show file tree
Hide file tree
Showing 22 changed files with 676 additions and 399 deletions.
4 changes: 4 additions & 0 deletions src/main/java/io/cryostat/discovery/DiscoveryModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cryostat.configuration.CredentialsManager;
import io.cryostat.configuration.Variables;
import io.cryostat.core.log.Logger;
import io.cryostat.core.sys.Clock;
import io.cryostat.core.sys.Environment;
import io.cryostat.messaging.notifications.NotificationFactory;
import io.cryostat.platform.PlatformModule;
Expand Down Expand Up @@ -77,9 +78,11 @@ static DiscoveryStorage provideDiscoveryStorage(
Lazy<MatchExpressionEvaluator> matchExpressionEvaluator,
Gson gson,
WebClient http,
Clock clock,
Logger logger) {
return new DiscoveryStorage(
deployer,
Executors.newSingleThreadScheduledExecutor(),
Executors.newCachedThreadPool(),
pingPeriod,
builtin,
Expand All @@ -89,6 +92,7 @@ static DiscoveryStorage provideDiscoveryStorage(
matchExpressionEvaluator,
gson,
http,
clock,
logger);
}

Expand Down
283 changes: 181 additions & 102 deletions src/main/java/io/cryostat/discovery/DiscoveryStorage.java

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/main/java/io/cryostat/net/NetworkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ static TargetConnectionManager provideTargetConnectionManager(
DiscoveryStorage storage,
@Named(Variables.TARGET_CACHE_TTL) Duration maxTargetTtl,
@Named(Variables.TARGET_MAX_CONCURRENT_CONNECTIONS) int maxTargetConnections,
@Named(Variables.JMX_CONNECTION_TIMEOUT) long connectionTimeoutSeconds,
Logger logger) {
return new TargetConnectionManager(
connectionToolkit,
Expand All @@ -135,6 +136,7 @@ static TargetConnectionManager provideTargetConnectionManager(
Scheduler.systemScheduler(),
maxTargetTtl,
maxTargetConnections,
connectionTimeoutSeconds,
logger);
}

Expand Down
24 changes: 15 additions & 9 deletions src/main/java/io/cryostat/net/TargetConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -60,6 +61,7 @@ public class TargetConnectionManager {
private final Lazy<JFRConnectionToolkit> jfrConnectionToolkit;
private final Lazy<AgentConnection.Factory> agentConnectionFactory;
private final Executor executor;
private final long connectionTimeoutSeconds;
private final Logger logger;

private final AsyncLoadingCache<ConnectionDescriptor, JFRConnection> connections;
Expand All @@ -74,10 +76,12 @@ public class TargetConnectionManager {
Scheduler scheduler,
Duration ttl,
int maxTargetConnections,
long connectionTimeoutSeconds,
Logger logger) {
this.jfrConnectionToolkit = jfrConnectionToolkit;
this.agentConnectionFactory = agentConnectionFactory;
this.executor = executor;
this.connectionTimeoutSeconds = connectionTimeoutSeconds;
this.logger = logger;

this.targetLocks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -134,7 +138,8 @@ public <T> CompletableFuture<T> executeConnectedTaskAsync(
throw new CompletionException(e);
}
},
executor);
executor)
.orTimeout(connectionTimeoutSeconds, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -282,14 +287,15 @@ private class ConnectionLoader
public CompletableFuture<JFRConnection> asyncLoad(
ConnectionDescriptor key, Executor executor) throws Exception {
return CompletableFuture.supplyAsync(
() -> {
try {
return connect(key);
} catch (Exception e) {
throw new CompletionException(e);
}
},
executor);
() -> {
try {
return connect(key);
} catch (Exception e) {
throw new CompletionException(e);
}
},
executor)
.orTimeout(connectionTimeoutSeconds, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public IntermediateResponse<Void> handle(RequestParameters params) throws ApiExc
}

private void cleanup(RequestParameters params, Rule rule) {
storage.listUniqueReachableServices().stream()
storage.listDiscoverableServices().stream()
.forEach(
(ServiceRef ref) -> {
vertx.executeBlocking(
Expand All @@ -155,7 +155,7 @@ private void cleanup(RequestParameters params, Rule rule) {
new ConnectionDescriptor(
targetId, credentials);
recordings.stopRecording(
cd, rule.getRecordingName());
cd, rule.getRecordingName(), true);
}
promise.complete();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public IntermediateResponse<Void> handle(RequestParameters params) throws ApiExc
}

private void cleanup(RequestParameters params, Rule rule) {
storage.listUniqueReachableServices().stream()
storage.listDiscoverableServices().stream()
.forEach(
(ServiceRef ref) -> {
vertx.executeBlocking(
Expand All @@ -169,7 +169,7 @@ private void cleanup(RequestParameters params, Rule rule) {
new ConnectionDescriptor(
targetId, credentials);
recordings.stopRecording(
cd, rule.getRecordingName());
cd, rule.getRecordingName(), true);
}
promise.complete();
} catch (Exception e) {
Expand Down
13 changes: 2 additions & 11 deletions src/main/java/io/cryostat/platform/PlatformClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package io.cryostat.platform;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;

import io.cryostat.platform.discovery.EnvironmentNode;
Expand All @@ -35,17 +33,10 @@ default void load(Promise<EnvironmentNode> promise) {

List<ServiceRef> listDiscoverableServices();

default List<ServiceRef> listUniqueReachableServices() {
Set<String> uniqueIds = new HashSet<>();
return listDiscoverableServices().stream()
.filter((ref) -> ref.getJvmId() != null && uniqueIds.add(ref.getJvmId()))
.toList();
}

default boolean contains(ServiceRef ref) {
var existingRef =
listUniqueReachableServices().stream()
.filter(sr -> !sr.equals(ref) && sr.getJvmId().equals(ref.getJvmId()))
listDiscoverableServices().stream()
.filter(sr -> sr.equals(ref) || sr.getJvmId().equals(ref.getJvmId()))
.findAny();
return existingRef.isPresent();
}
Expand Down
28 changes: 13 additions & 15 deletions src/main/java/io/cryostat/recordings/JvmIdHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,26 @@ public ServiceRef resolveId(ServiceRef sr) throws JvmIdGetException {
URI serviceUri = sr.getServiceUri();
String uriStr = serviceUri.toString();
try {
CompletableFuture<String> future =
this.targetConnectionManager.executeConnectedTaskAsync(
new ConnectionDescriptor(uriStr, credentialsManager.getCredentials(sr)),
JFRConnection::getJvmId);
future.thenAccept(
id -> {
String prevId = this.ids.synchronous().get(uriStr);
if (Objects.equals(prevId, id)) {
return;
}
this.ids.put(uriStr, CompletableFuture.completedFuture(id));
logger.info("JVM ID: {} -> {}", uriStr, id);
});
String id = future.get(connectionTimeoutSeconds, TimeUnit.SECONDS);
String id =
computeJvmId(uriStr, Optional.ofNullable(credentialsManager.getCredentials(sr)))
.whenComplete(
(i, t) -> {
String prevId = this.ids.synchronous().get(uriStr);
if (Objects.equals(prevId, i)) {
return;
}
this.ids.put(uriStr, CompletableFuture.completedFuture(i));
logger.info("JVM ID: {} -> {}", uriStr, i);
})
.get();

ServiceRef updated = new ServiceRef(id, serviceUri, sr.getAlias().orElse(uriStr));
updated.setLabels(sr.getLabels());
updated.setPlatformAnnotations(sr.getPlatformAnnotations());
updated.setCryostatAnnotations(sr.getCryostatAnnotations());
reverse.put(id, sr);
return updated;
} catch (InterruptedException | ExecutionException | TimeoutException | ScriptException e) {
} catch (InterruptedException | ExecutionException | ScriptException e) {
logger.warn("Could not resolve jvmId for target {}", uriStr);
throw new JvmIdGetException(e, uriStr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,11 @@ public void accept(TargetDiscoveryEvent tde) {
// ID and inform us of that occurrence, and use that invalidation
// message to clear our stored metadata
break;
case MODIFIED:
handleFoundTarget(tde.getServiceRef());
break;
default:
throw new UnsupportedOperationException(tde.getEventKind().toString());
break;
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/cryostat/rules/PeriodicArchiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PeriodicArchiver implements Runnable {
private final CredentialsManager credentialsManager;
private final Rule rule;
private final RecordingArchiveHelper recordingArchiveHelper;
private final Function<Pair<ServiceRef, Rule>, Void> failureNotifier;
private final Function<Pair<String, Rule>, Void> failureNotifier;
private final Logger logger;

private final Queue<String> previousRecordings;
Expand All @@ -53,7 +53,7 @@ class PeriodicArchiver implements Runnable {
CredentialsManager credentialsManager,
Rule rule,
RecordingArchiveHelper recordingArchiveHelper,
Function<Pair<ServiceRef, Rule>, Void> failureNotifier,
Function<Pair<String, Rule>, Void> failureNotifier,
Logger logger) {
this.serviceRef = serviceRef;
this.credentialsManager = credentialsManager;
Expand Down Expand Up @@ -105,7 +105,7 @@ public void run() {
if (AbstractAuthenticatedRequestHandler.isJmxAuthFailure(e)
|| AbstractAuthenticatedRequestHandler.isJmxSslFailure(e)
|| AbstractAuthenticatedRequestHandler.isServiceTypeFailure(e)) {
failureNotifier.apply(Pair.of(serviceRef, rule));
failureNotifier.apply(Pair.of(serviceRef.getJvmId(), rule));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ PeriodicArchiver create(
CredentialsManager credentialsManager,
Rule rule,
RecordingArchiveHelper recordingArchiveHelper,
Function<Pair<ServiceRef, Rule>, Void> failureNotifier) {
Function<Pair<String, Rule>, Void> failureNotifier) {
return new PeriodicArchiver(
serviceRef,
credentialsManager,
Expand Down
38 changes: 35 additions & 3 deletions src/main/java/io/cryostat/rules/Rule.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,45 @@ private static String validateEventSpecifier(String eventSpecifier)
}

@Override
public boolean equals(Object o) {
return EqualsBuilder.reflectionEquals(this, o);
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other == this) {
return true;
}
if (!(other instanceof Rule)) {
return false;
}
Rule r = (Rule) other;
// ignore the enabled state
return new EqualsBuilder()
.append(name, r.name)
.append(description, r.description)
.append(matchExpression, r.matchExpression)
.append(eventSpecifier, r.eventSpecifier)
.append(archivalPeriodSeconds, r.archivalPeriodSeconds)
.append(initialDelaySeconds, r.initialDelaySeconds)
.append(preservedArchives, r.preservedArchives)
.append(maxAgeSeconds, r.maxAgeSeconds)
.append(maxSizeBytes, r.maxSizeBytes)
.build();
}

@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this);
// ignore the enabled state
return new HashCodeBuilder()
.append(name)
.append(description)
.append(matchExpression)
.append(eventSpecifier)
.append(archivalPeriodSeconds)
.append(initialDelaySeconds)
.append(preservedArchives)
.append(maxAgeSeconds)
.append(maxSizeBytes)
.toHashCode();
}

public static class Builder {
Expand Down
Loading

0 comments on commit 5f578f9

Please sign in to comment.