Skip to content

Commit

Permalink
fixup(discovery): targets should be correctly persisted
Browse files Browse the repository at this point in the history
  • Loading branch information
tthvo committed May 2, 2024
1 parent 28426ac commit 1f58984
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 48 deletions.
97 changes: 49 additions & 48 deletions src/main/java/io/cryostat/discovery/ContainerDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.http.HttpMethod;
import io.vertx.mutiny.core.Vertx;
Expand All @@ -67,7 +66,6 @@
import jakarta.persistence.NoResultException;
import jakarta.transaction.Transactional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -103,7 +101,7 @@ protected boolean enabled() {
return enabled;
}

@ConsumeEvent(blocking = true, ordered = true)
@ConsumeEvent(blocking = true)
@Transactional
void handleContainerEvent(ContainerDiscoveryEvent evt) {
updateDiscoveryTree(evt);
Expand Down Expand Up @@ -145,7 +143,7 @@ protected boolean enabled() {
return enabled;
}

@ConsumeEvent(blocking = true, ordered = true)
@ConsumeEvent(blocking = true)
@Transactional
void handleContainerEvent(ContainerDiscoveryEvent evt) {
updateDiscoveryTree(evt);
Expand Down Expand Up @@ -182,7 +180,6 @@ public abstract class ContainerDiscovery {
// Priority is set higher than default 0 such that onStart is called first before onAfterStart
// This ensures realm node is persisted before querying for containers
@Transactional
@Blocking
void onStart(@Observes @Priority(1) StartupEvent evt) {
if (!enabled()) {
return;
Expand Down Expand Up @@ -210,7 +207,6 @@ void onStart(@Observes @Priority(1) StartupEvent evt) {
logger.infov("Starting {0} client", getRealm());
}

@Blocking
void onAfterStart(@Observes StartupEvent evt) {
if (!(enabled() && available())) {
return;
Expand All @@ -220,7 +216,6 @@ void onAfterStart(@Observes StartupEvent evt) {
this.timerId = vertx.setPeriodic(pollPeriod.toMillis(), unused -> queryContainers());
}

@Blocking
void onStop(@Observes ShutdownEvent evt) {
if (!(enabled() && available())) {
return;
Expand All @@ -234,7 +229,6 @@ boolean available() {
return fs.exists(socketPath) && fs.isReadable(socketPath);
}

// Construct a target representation (non-persistent) from ContainerSpec
private Target toTarget(ContainerSpec desc) {
URI connectUrl;
String hostname;
Expand Down Expand Up @@ -276,6 +270,20 @@ private Target toTarget(ContainerSpec desc) {
return null;
}

// Check for any targets with the same connectUrl in other realms
try {
Target persistedTarget = Target.getTargetByConnectUrl(connectUrl);
String realmOfTarget = persistedTarget.annotations.cryostat().get("REALM");
if (!getRealm().equals(realmOfTarget)) {
logger.warnv(
"Expected persisted target with serviceURL {0} to be under realm"
+ " {1} but found under {2} ",
persistedTarget.connectUrl, getRealm(), realmOfTarget);
return null;
}
} catch (NoResultException e) {
}

Target target = new Target();
target.activeRecordings = new ArrayList<>();
target.connectUrl = connectUrl;
Expand Down Expand Up @@ -365,28 +373,22 @@ private CompletableFuture<ContainerDetails> doContainerInspectRequest(ContainerS
}

private void handleObservedContainers(List<ContainerSpec> current) {
List<DiscoveryNode> targetNodes =
DiscoveryNode.findAllByNodeType(BaseNodeType.JVM).stream()
.filter(
(n) ->
Objects.nonNull(n.target)
&& getRealm()
.equals(
n.target
.annotations
.cryostat()
.get("REALM")))
.collect(Collectors.toList());

Map<Target, ContainerSpec> containerRefMap = new HashMap<>();
current.stream()
.map((desc) -> Pair.of(toTarget(desc), desc))
.filter((pair) -> Objects.nonNull(pair.getLeft()))
.forEach((pair) -> containerRefMap.put(pair.getLeft(), pair.getRight()));
Map<URI, ContainerSpec> containerRefMap = new HashMap<>();

Set<Target> persistedTargets =
targetNodes.stream().map((n) -> n.target).collect(Collectors.toSet());
Set<Target> observedTargets = containerRefMap.keySet();
Target.findByRealm(getRealm()).stream().collect(Collectors.toSet());
Set<Target> observedTargets =
current.stream()
.map(
(desc) -> {
Target t = toTarget(desc);
if (Objects.nonNull(t)) {
containerRefMap.put(t.connectUrl, desc);
}
return t;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

Target.compare(persistedTargets)
.to(observedTargets)
Expand All @@ -395,7 +397,9 @@ && getRealm()
(t) ->
notify(
ContainerDiscoveryEvent.from(
containerRefMap.get(t), t, EventKind.FOUND)));
containerRefMap.get(t.connectUrl),
t,
EventKind.FOUND)));

Target.compare(persistedTargets)
.to(observedTargets)
Expand All @@ -404,35 +408,25 @@ && getRealm()
(t) ->
notify(
ContainerDiscoveryEvent.from(
containerRefMap.get(t), t, EventKind.LOST)));
containerRefMap.get(t.connectUrl),
t,
EventKind.LOST)));
}

public void updateDiscoveryTree(ContainerDiscoveryEvent evt) {
EventKind evtKind = evt.eventKind;
ContainerSpec desc = evt.desc;

Target target = evt.target;
// Check for any targets with the same connectUrl in other realms
// during EventKind.FOUND event.
if (!target.isPersistent()) {
try {
Target persistedTarget = Target.getTargetByConnectUrl(target.connectUrl);
String realmOfTarget = persistedTarget.annotations.cryostat().get("REALM");
if (!getRealm().equals(realmOfTarget)) {
logger.warnv(
"Expected persisted target with serviceURL {0} to be under realm {1}"
+ " but found under {2} ",
persistedTarget.connectUrl, getRealm(), realmOfTarget);
throw new IllegalStateException();
}
} catch (NoResultException e) {
}
}

DiscoveryNode realm = DiscoveryNode.getRealm(getRealm()).orElseThrow();

if (evtKind == EventKind.FOUND) {

if (target.isPersistent()) {
logger.infov(
"Target with serviceURL {0} already exist in discovery tree. Skip adding",
target.connectUrl);
return;
}
DiscoveryNode node = DiscoveryNode.target(target, BaseNodeType.JVM);
target.discoveryNode = node;

Expand Down Expand Up @@ -470,6 +464,13 @@ public void updateDiscoveryTree(ContainerDiscoveryEvent evt) {
node.persist();
realm.persist();
} else {
if (!target.isPersistent()) {
logger.infov(
"Target with serviceURL {0} does not exist in discovery tree. Skip"
+ " deleting",
target.connectUrl);
return;
}
DiscoveryNode node = target.discoveryNode;

while (true) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/cryostat/targets/Target.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.cryostat.ConfigProperties;
import io.cryostat.core.JvmIdentifier;
Expand Down Expand Up @@ -139,6 +140,14 @@ public static boolean deleteByConnectUrl(URI connectUrl) {
return delete("connectUrl", connectUrl) > 0;
}

public static List<Target> findByRealm(String realm) {
List<Target> targets = findAll().list();

return targets.stream()
.filter((t) -> realm.equals(t.annotations.cryostat().get("REALM")))
.collect(Collectors.toList());
}

public ActiveRecording getRecordingById(long remoteId) {
return activeRecordings.stream()
.filter(rec -> rec.remoteId == remoteId)
Expand Down

0 comments on commit 1f58984

Please sign in to comment.