Skip to content

Commit

Permalink
feat(tcm): listen for target LOST events and invalidate from cache
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Dec 6, 2021
1 parent c74d608 commit 24aa160
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/cryostat/net/NetworkModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.cryostat.net.reports.ReportsModule;
import io.cryostat.net.security.SecurityModule;
import io.cryostat.net.web.WebModule;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Scheduler;
import dagger.Binds;
Expand Down Expand Up @@ -115,11 +116,13 @@ static Duration provideMaxTargetTTL(Environment env) {
@Singleton
static TargetConnectionManager provideTargetConnectionManager(
Lazy<JFRConnectionToolkit> connectionToolkit,
PlatformClient platformClient,
@Named(TARGET_CACHE_TTL) Duration maxTargetTtl,
@Named(TARGET_CACHE_SIZE) int maxTargetConnections,
Logger logger) {
return new TargetConnectionManager(
connectionToolkit,
platformClient,
ForkJoinPool.commonPool(),
Scheduler.systemScheduler(),
maxTargetTtl,
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/cryostat/net/TargetConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
Expand All @@ -51,6 +52,8 @@
import io.cryostat.core.net.Credentials;
import io.cryostat.core.net.JFRConnection;
import io.cryostat.core.net.JFRConnectionToolkit;
import io.cryostat.core.net.discovery.JvmDiscoveryClient.EventKind;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
Expand All @@ -75,6 +78,7 @@ public class TargetConnectionManager {

TargetConnectionManager(
Lazy<JFRConnectionToolkit> jfrConnectionToolkit,
PlatformClient platform,
Executor executor,
Scheduler scheduler,
Duration ttl,
Expand All @@ -93,6 +97,23 @@ public class TargetConnectionManager {
cacheBuilder = cacheBuilder.maximumSize(maxTargetConnections);
}
this.connections = cacheBuilder.build(this::connect);

// force removal of connections from cache when we're notified about targets being lost.
// This should already be taken care of by the connection close listener, but this provides
// some additional insurance in case a target disappears and the underlying JMX network
// connection doesn't immediately report itself as closed
platform.addTargetDiscoveryListener(
tde -> {
if (EventKind.LOST.equals(tde.getEventKind())) {
for (ConnectionDescriptor cd : connections.asMap().keySet()) {
if (Objects.equals(
cd.getTargetId(),
tde.getServiceRef().getServiceUri().toString())) {
connections.invalidate(cd);
}
}
}
});
}

public <T> T executeConnectedTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.cryostat.core.log.Logger;
import io.cryostat.core.net.JFRConnection;
import io.cryostat.core.net.JFRConnectionToolkit;
import io.cryostat.platform.PlatformClient;

import com.github.benmanes.caffeine.cache.Scheduler;
import org.hamcrest.MatcherAssert;
Expand All @@ -67,13 +68,15 @@ class TargetConnectionManagerTest {
TargetConnectionManager mgr;
@Mock Logger logger;
@Mock JFRConnectionToolkit jfrConnectionToolkit;
@Mock PlatformClient platformClient;
Duration TTL = Duration.ofMillis(250);

@BeforeEach
void setup() {
this.mgr =
new TargetConnectionManager(
() -> jfrConnectionToolkit,
platformClient,
ForkJoinPool.commonPool(),
Scheduler.systemScheduler(),
TTL,
Expand Down Expand Up @@ -191,6 +194,7 @@ void shouldCreateNewConnectionForAccessDelayedLongerThanTTL() throws Exception {
TargetConnectionManager mgr =
new TargetConnectionManager(
() -> jfrConnectionToolkit,
platformClient,
ForkJoinPool.commonPool(),
Scheduler.systemScheduler(),
Duration.ofNanos(1),
Expand Down Expand Up @@ -231,6 +235,7 @@ void shouldCreateNewConnectionWhenMaxSizeZeroed() throws Exception {
TargetConnectionManager mgr =
new TargetConnectionManager(
() -> jfrConnectionToolkit,
platformClient,
new DirectExecutor(),
Scheduler.disabledScheduler(),
Duration.ofSeconds(1),
Expand Down Expand Up @@ -270,6 +275,7 @@ void shouldCreateNewConnectionPerTarget() throws Exception {
TargetConnectionManager mgr =
new TargetConnectionManager(
() -> jfrConnectionToolkit,
platformClient,
new DirectExecutor(),
Scheduler.disabledScheduler(),
Duration.ofNanos(1),
Expand Down

0 comments on commit 24aa160

Please sign in to comment.