Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notifications): Add "Recording Stopped" notification on implied stop #810

Merged
merged 16 commits into from
Feb 8, 2022
34 changes: 22 additions & 12 deletions src/main/java/io/cryostat/net/TargetConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -75,6 +77,7 @@ public class TargetConnectionManager {
private final Logger logger;

private final LoadingCache<ConnectionDescriptor, JFRConnection> connections;
private final Map<String, Object> targetLocks;

TargetConnectionManager(
Lazy<JFRConnectionToolkit> jfrConnectionToolkit,
Expand All @@ -87,6 +90,8 @@ public class TargetConnectionManager {
this.jfrConnectionToolkit = jfrConnectionToolkit;
this.logger = logger;

this.targetLocks = new ConcurrentHashMap<>();

Caffeine<ConnectionDescriptor, JFRConnection> cacheBuilder =
Caffeine.newBuilder()
.executor(executor)
Expand Down Expand Up @@ -136,19 +141,23 @@ public <T> T executeConnectedTask(
public <T> T executeConnectedTask(
ConnectionDescriptor connectionDescriptor, ConnectedTask<T> task, boolean useCache)
throws Exception {
if (useCache) {
return task.execute(connections.get(connectionDescriptor));
} else {
JFRConnection connection = connections.getIfPresent(connectionDescriptor);
boolean cached = connection != null;
if (!cached) {
connection = connect(connectionDescriptor);
}
try {
return task.execute(connection);
} finally {
synchronized (
targetLocks.computeIfAbsent(
connectionDescriptor.getTargetId(), k -> new Object())) {
if (useCache) {
return task.execute(connections.get(connectionDescriptor));
} else {
JFRConnection connection = connections.getIfPresent(connectionDescriptor);
boolean cached = connection != null;
if (!cached) {
connection.close();
connection = connect(connectionDescriptor);
}
try {
return task.execute(connection);
} finally {
if (!cached) {
connection.close();
}
}
}
}
Expand Down Expand Up @@ -188,6 +197,7 @@ private void closeConnection(
evt.begin();
try {
connection.close();
targetLocks.remove(descriptor.getTargetId());
} catch (RuntimeException e) {
evt.setExceptionThrown(true);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@
*/
package io.cryostat.net.web.http.api.v1;

import io.cryostat.messaging.notifications.NotificationFactory;
import io.cryostat.net.TargetConnectionManager;
import io.cryostat.net.web.http.RequestHandler;
import io.cryostat.recordings.RecordingArchiveHelper;
import io.cryostat.recordings.RecordingTargetHelper;

import dagger.Binds;
import dagger.Module;
Expand Down Expand Up @@ -99,9 +98,8 @@ static TargetRecordingPatchSave provideTargetRecordingPatchSave(

@Provides
static TargetRecordingPatchStop provideTargetRecordingPatchStop(
TargetConnectionManager targetConnectionManager,
NotificationFactory notificationFactory) {
return new TargetRecordingPatchStop(targetConnectionManager, notificationFactory);
RecordingTargetHelper recordingTargetHelper) {
return new TargetRecordingPatchStop(recordingTargetHelper);
}

@Binds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,70 +37,33 @@
*/
package io.cryostat.net.web.http.api.v1;

import java.util.Map;
import java.util.Optional;

import javax.inject.Inject;

import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor;

import io.cryostat.messaging.notifications.NotificationFactory;
import io.cryostat.net.ConnectionDescriptor;
import io.cryostat.net.TargetConnectionManager;
import io.cryostat.net.web.http.HttpMimeType;
import io.cryostat.recordings.RecordingNotFoundException;
import io.cryostat.recordings.RecordingTargetHelper;

import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.impl.HttpStatusException;

class TargetRecordingPatchStop {

private final TargetConnectionManager targetConnectionManager;
private final NotificationFactory notificationFactory;

private static final String NOTIFICATION_CATEGORY = "RecordingStopped";
private final RecordingTargetHelper recordingTargetHelper;

@Inject
TargetRecordingPatchStop(
TargetConnectionManager targetConnectionManager,
NotificationFactory notificationFactory) {
this.targetConnectionManager = targetConnectionManager;
this.notificationFactory = notificationFactory;
TargetRecordingPatchStop(RecordingTargetHelper recordingTargetHelper) {
this.recordingTargetHelper = recordingTargetHelper;
}

void handle(RoutingContext ctx, ConnectionDescriptor connectionDescriptor) throws Exception {
String recordingName = ctx.pathParam("recordingName");

targetConnectionManager.executeConnectedTask(
connectionDescriptor,
connection -> {
Optional<IRecordingDescriptor> descriptor =
connection.getService().getAvailableRecordings().stream()
.filter(recording -> recording.getName().equals(recordingName))
.findFirst();
if (descriptor.isPresent()) {
connection.getService().stop(descriptor.get());
return null;
} else {
throw new HttpStatusException(
404,
String.format(
"Recording with name \"%s\" not found", recordingName));
}
});
try {
recordingTargetHelper.stopRecording(connectionDescriptor, recordingName).get();
} catch (RecordingNotFoundException e) {
throw new HttpStatusException(404, e);
}
ctx.response().setStatusCode(200);
ctx.response().end();

notificationFactory
.createBuilder()
.metaCategory(NOTIFICATION_CATEGORY)
.metaType(HttpMimeType.JSON)
.message(
Map.of(
"recording",
recordingName,
"target",
connectionDescriptor.getTargetId()))
.build()
.send();
}
}
112 changes: 110 additions & 2 deletions src/main/java/io/cryostat/recordings/RecordingTargetHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,22 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.inject.Named;

import org.openjdk.jmc.common.unit.IConstrainedMap;
import org.openjdk.jmc.flightrecorder.configuration.events.EventOptionID;
import org.openjdk.jmc.flightrecorder.configuration.recording.RecordingOptionsBuilder;
import org.openjdk.jmc.rjmx.services.jfr.IEventTypeInfo;
import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor;
import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor.RecordingState;

import io.cryostat.core.log.Logger;
import io.cryostat.core.net.JFRConnection;
Expand All @@ -70,7 +77,9 @@

public class RecordingTargetHelper {

private static final String NOTIFICATION_CATEGORY = "RecordingCreated";
private static final String CREATE_NOTIFICATION_CATEGORY = "RecordingCreated";
private static final String STOP_NOTIFICATION_CATEGORY = "RecordingStopped";
private static final long TIMESTAMP_DRIFT_SAFEGUARD = 3_000L;

private static final Pattern TEMPLATE_PATTERN =
Pattern.compile("^template=([\\w]+)(?:,type=([\\w]+))?$");
Expand All @@ -81,7 +90,9 @@ public class RecordingTargetHelper {
private final NotificationFactory notificationFactory;
private final RecordingOptionsBuilderFactory recordingOptionsBuilderFactory;
private final ReportService reportService;
private final ScheduledExecutorService scheduler;
private final Logger logger;
private final Map<Pair<String, String>, Future<?>> scheduledStopNotifications;

RecordingTargetHelper(
TargetConnectionManager targetConnectionManager,
Expand All @@ -90,14 +101,17 @@ public class RecordingTargetHelper {
NotificationFactory notificationFactory,
RecordingOptionsBuilderFactory recordingOptionsBuilderFactory,
ReportService reportService,
@Named(RecordingsModule.NOTIFICATION_SCHEDULER) ScheduledExecutorService scheduler,
Logger logger) {
this.targetConnectionManager = targetConnectionManager;
this.webServer = webServer;
this.eventOptionsBuilderFactory = eventOptionsBuilderFactory;
this.notificationFactory = notificationFactory;
this.recordingOptionsBuilderFactory = recordingOptionsBuilderFactory;
this.reportService = reportService;
this.scheduler = scheduler;
this.logger = logger;
this.scheduledStopNotifications = new ConcurrentHashMap<>();
}

public IRecordingDescriptor startRecording(
Expand Down Expand Up @@ -131,7 +145,7 @@ public IRecordingDescriptor startRecording(
enableEvents(connection, templateName, templateType));
notificationFactory
.createBuilder()
.metaCategory(NOTIFICATION_CATEGORY)
.metaCategory(CREATE_NOTIFICATION_CATEGORY)
.metaType(HttpMimeType.JSON)
.message(
Map.of(
Expand All @@ -141,6 +155,17 @@ public IRecordingDescriptor startRecording(
connectionDescriptor.getTargetId()))
.build()
.send();

Object fixedDuration =
recordingOptions.get(RecordingOptionsBuilder.KEY_DURATION);
if (fixedDuration != null) {
Long delay =
Long.valueOf(fixedDuration.toString().replaceAll("[^0-9]", ""));

scheduleRecordingStopNotification(
recordingName, delay, connectionDescriptor);
}

return desc;
},
false);
Expand Down Expand Up @@ -206,6 +231,7 @@ public Future<Void> deleteRecording(
if (descriptor.isPresent()) {
connection.getService().close(descriptor.get());
reportService.delete(connectionDescriptor, recordingName);
this.cancelScheduledNotificationIfExists(targetId, recordingName);
} else {
throw new RecordingNotFoundException(targetId, recordingName);
}
Expand All @@ -218,6 +244,36 @@ public Future<Void> deleteRecording(
return future;
}

public Future<Void> stopRecording(
ConnectionDescriptor connectionDescriptor, String recordingName) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
String targetId = connectionDescriptor.getTargetId();
try {
targetConnectionManager.executeConnectedTask(
connectionDescriptor,
connection -> {
Optional<IRecordingDescriptor> descriptor =
connection.getService().getAvailableRecordings().stream()
.filter(
recording ->
recording.getName().equals(recordingName))
.findFirst();
if (descriptor.isPresent()) {
connection.getService().stop(descriptor.get());
this.cancelScheduledNotificationIfExists(targetId, recordingName);
this.notifyRecordingStopped(targetId, recordingName);
return null;
} else {
throw new RecordingNotFoundException(targetId, recordingName);
}
});
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}

public Future<HyperlinkedSerializableRecordingDescriptor> createSnapshot(
ConnectionDescriptor connectionDescriptor) {
CompletableFuture<HyperlinkedSerializableRecordingDescriptor> future =
Expand Down Expand Up @@ -314,6 +370,23 @@ public Optional<IRecordingDescriptor> getDescriptorByName(
.findFirst();
}

private void notifyRecordingStopped(String targetId, String recordingName) {
notificationFactory
.createBuilder()
.metaCategory(STOP_NOTIFICATION_CATEGORY)
.metaType(HttpMimeType.JSON)
.message(Map.of("recording", recordingName, "target", targetId))
.build()
.send();
}

private void cancelScheduledNotificationIfExists(String targetId, String stoppedRecordingName) {
var f = scheduledStopNotifications.remove(Pair.of(targetId, stoppedRecordingName));
if (f != null) {
f.cancel(true);
}
}

private IConstrainedMap<EventOptionID> enableEvents(
JFRConnection connection, String templateName, TemplateType templateType)
throws Exception {
Expand Down Expand Up @@ -365,6 +438,41 @@ private IConstrainedMap<EventOptionID> enableAllEvents(JFRConnection connection)

return builder.build();
}

private void scheduleRecordingStopNotification(
String recordingName, long delay, ConnectionDescriptor connectionDescriptor) {
String targetId = connectionDescriptor.getTargetId();
ScheduledFuture<Optional<IRecordingDescriptor>> scheduledFuture =
this.scheduler.schedule(
() -> {
return targetConnectionManager.executeConnectedTask(
connectionDescriptor,
connection -> {
Optional<IRecordingDescriptor> desc =
getDescriptorByName(connection, recordingName);

long recordingStopped =
desc.stream()
.map(IRecordingDescriptor::getState)
.filter(
s ->
s.equals(
RecordingState
.STOPPED))
.count();
if (recordingStopped > 0) {
this.notifyRecordingStopped(targetId, recordingName);
}

return desc;
});
},
delay + TIMESTAMP_DRIFT_SAFEGUARD,
TimeUnit.MILLISECONDS);

scheduledStopNotifications.put(Pair.of(targetId, recordingName), scheduledFuture);
}

/**
* This method will consume the first byte of the {@link InputStream} it is verifying, so
* verification should only be done if the @param snapshot stream in question will not be used
Expand Down
Loading