Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
- reaping of zombies using reflection access to UNIXProcess.waitForProcessExit()
- hold on to first process launced (to use for native method invocation
- don't start reaping until a process is successfully launched
- check that pid field is int in UNIXProcess
- remove thread interruption in VepRunner - now StreamTransferrer threads will exit naturally when zombies are reaped
  • Loading branch information
sheridancbio committed Jun 24, 2020
1 parent 7b91f78 commit e5f57b6
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.sql.Timestamp;
import java.util.*;

/** Returns a list of elements representing all processes on system
*/
public class ProcessSurveyor {

private static ArrayList<String> commandElements;
Expand All @@ -27,7 +29,7 @@ public class ProcessSurveyor {
}
}

public static ProcessSurveyItem parsePsOutputLine(String line) {
private static ProcessSurveyItem parsePsOutputLine(String line) {
// each output line from the ps command will have 4 fields, such as these example lines:
// PID PPID S COMMAND
// 1 1 S java
Expand Down Expand Up @@ -68,6 +70,9 @@ public static ProcessSurveyItem parsePsOutputLine(String line) {
return new ProcessSurveyItem(processId, parentProcessId, stateCode, fields[firstFieldIndex + 3]);
}

/** use command "ps axo pid,ppid,state,comm" in a forked process on the system and parse output
* @return a list of ProcessSurveyItem representing each process in the output
*/
public static List<ProcessSurveyItem> getProcessSurvey() {
ArrayList<ProcessSurveyItem> processSurvey = new ArrayList<ProcessSurveyItem>();
try {
Expand Down
195 changes: 122 additions & 73 deletions src/main/java/org/genomenexus/vep_wrapper/SystemProcessManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.*;
import java.sql.Timestamp;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.*;
Expand All @@ -14,64 +12,67 @@
/**
* Launches and destroys vep command line tool processes.
* Because vep is a perl process which forks child processes, the Java
* Process.destroy() eliminates the launched process but not the child
* processes, which enter a Sleep state and are inherited by the parent
* process of the launched vep process. On a full unix platform, these
* orphaned children should normally be cleaned up by the init process.
* However, in our Docker container deployment, the init process is not
* present and the parent process is the running JVM, which does not
* automatically clean up orphaned processes.
* Process.destroy() eliminates the launched process but not any child
* processes, which are inherited by the parent process of the launched vep
* process after it is destroyed. On a full unix platform, these orphaned
* children should normally be cleaned up by the init process. However, in
* our Docker container deployment, the init process is not present and the
* parent process is the running JVM, which does not automatically clean up
* orphaned processes.
*
* By using this class to launch VEP processes and to (when needed)
* forcibly destroy launced VEP processes, this class is able to track
* the processes which have been launched and are expected to be
* running. These processes will be children of the JVM process, and
* by keeping track they can be distinguished from orphaned child
* processes of destroyed VEP processes which are now children of
* the JVM process.
* forcibly destroy launced VEP processes, this class is able to track the
* processes which have been launched and are expected to be running. These
* processes will be children of the JVM process, and by keeping track they
* can be distinguished from orphaned child processes of destroyed VEP
* processes which are now children of the JVM process.
*
* A Daemon thread is started at the time of the first launch of VEP.
* This thread periodically examines the processes running on the JVM
* and purges processes in the Zombie state which are children of the
* JVM process, and also will forcibly kill processes for which all
* of the following are true:
* A Daemon thread is started at the time of the first launch of VEP. This
* thread periodically examines the processes running on the JVM and purges
* processes in the Zombie state which are children of the JVM process, and
* also will forcibly kill processes for which all of the following are
* true:
* - not tracked as being launched via SystemProcessManager
* - children of the JVM process (process 1)
* - child of the JVM process (process 1)
* - in the Sleep state
* - launced using the primary command "perl"
* These rules should purge orphaned forked children of any failed
* or destroyed VEP processes. When using this code the application
* should be sure to launch any process using the command "perl"
* through this class only.
* These rules should purge orphaned forked children of any failed or
* destroyed VEP processes. When using this code the application should be
* sure to launch any process using the command "perl" through this class
* only.
*
* Synchronization prevents concurrent execution of code which alters
* the list of tracked processes, or the Daemon thread's reliance on
* that list during a cleanup operation. Additionally, there may be a
* short delay between calling Process.start() and being able to
* determine the process identifier (pid). The Daemon thread will
* wait a short time for the process identifier to be available. If
* unable to determine the process identifier, it will skip execution
* for one iteration time period.
* Synchronization prevents concurrent execution of code which alters the
* list of tracked processes, or the Daemon thread's code which relies on
* that list during a cleanup operation. Additionally, there may be a short
* delay between calling Process.start() and being able to determine the
* process identifier (pid). The Daemon thread will wait a short time for
* the process identifier to be available. If unable to determine the
* process identifier, it will skip execution for one iteration time
* period.
*
* The cleanup thread is shut down if it is determined that the system
* on which the code is deployed does not support unix style process
* The cleanup thread is limited if it is determined that the system on
* which the code is deployed does not support unix style process
* management through commands 'ps', 'kill', and the java class
* java.lang.Process
* java.lang.Process. If these capabilites are not present, the daemon
* thread will still remove launched VEP processes from the tracking list
* when they have finished processing and are no longer alive.
*/
public class SystemProcessManager implements Runnable {

private static Set<Process> launchedProcesses = Collections.synchronizedSet(new HashSet<>());
private static String UNIX_PROCESS_CLASS_NAME = "java.lang.UNIXProcess";
public static final Long PID_PROBE_WAIT_PERIOD = 2000L; // milliseconds
public static final Long REAPER_DAEMON_PERIOD = 250L; // milliseconds
public static final int JVM_PROCESS_ID = 1; // maybe this should be tested or determined at runtime
public static final Long REAPER_DAEMON_WAIT_PERIOD = 2000L; // milliseconds
public static final Long PID_PROBE_WAIT_PERIOD = 250L; // milliseconds
public static final int JVM_PROCESS_ID = 1; // TODO: maybe this should be tested or determined at runtime
public static final int PID_UNAVAILABLE = -1;
private static Boolean processIdCanBeReadFromToStringMethod = null; // if available, this avoids reflection
private static Boolean processIdCanBeReadFromUnixProcessField = null; // this method depends on an internal field
private static Boolean psCommandAvailableOnSystem = null;
private static Boolean killCommandAvailableOnSystem = null;
private static Field processPidField = null;
private static Method waitForProcessExitMethod = null;
private static Pattern PID_REPORT_PATTERN = Pattern.compile("pid\\s*=\\s*(\\d+)"); // used to search toString output
private static Process rememberedUnixProcess = null; // saved for calls to method waitForProcessExitMethod
private static File devNull = null;
private static Thread reaperDaemonThread = null;
private static final Object reaperDaemonLock = new Object();
Expand All @@ -85,19 +86,31 @@ public class SystemProcessManager implements Runnable {
}
}

/**
* launch a new instance of the VEP command line tool.
* by using this method, SystemProcessManager will be tracking the processes
* launched by the web service and will be able to avoid improper purging of
* these processes, which might otherwise appear to be orphaned child
* processes forked by other running VEP command line tool processes.
* @param pb A ProcessBuilder instance pre-loaded with the arguments for
* launching VEP command line tool.
* @return A started process, or null if an exception prevents the launch
*/
public static Process launchVepProcess(ProcessBuilder pb) {
// pb arguments are expected to use the vep command .. we could consider checking
Process p = null;
synchronized(reaperDaemonLock) {
try {
p = pb.start();
if (!launchedProcesses.add(p)) {
System.err.println("Warning : attempted to add process to launchedProcesses while it was already present");
}
launchedProcesses.add(p);
} catch (IOException e) {
return null;
}
}
startReaperDaemonThreadIfNeeded();
if (p != null) {
rememberedUnixProcess = p;
startReaperDaemonThreadIfNeeded();
}
return p;
}

Expand All @@ -122,41 +135,50 @@ public void run() {
if (killCommandAvailableOnSystem == null) {
determineIfKillCommandIsAvailable();
}
determineWaitForProcessExitMethod();
determineAvailableMethodToReadPid();
// ReaperDaemonThread
while (true) {
attemptSleep(REAPER_DAEMON_PERIOD); // sleep is done outside of synchronized region
// purge launchedProcesses of completed processes
reaping_loop : while (true) {
attemptSleep(REAPER_DAEMON_WAIT_PERIOD); // sleep is done outside of synchronized region
synchronized(reaperDaemonLock) {
// purge launchedProcesses of completed processes
for (Process p : launchedProcesses) {
if (!p.isAlive()) {
launchedProcesses.remove(p);
}
}
// // reap zombies if possible
// if (psCommandAvailableOnSystem) {
// Set<int> zombieIds = getZombieIds();
// for (int id : zombieIds) {
// // use waitFor(int) method in UNIXProcess
// }
// }

// reap zombies if possible
List<ProcessSurveyItem> processSurvey = null;
if (psCommandAvailableOnSystem && waitForProcessExitMethod != null) {
processSurvey = ProcessSurveyor.getProcessSurvey();
for (ProcessSurveyItem psi : processSurvey) {
if (processIsZombie(psi) && waitForProcessExitMethod != null) {
try {
// reap
waitForProcessExitMethod.invoke(rememberedUnixProcess, psi.getProcessId());
} catch (IllegalAccessException e) {
waitForProcessExitMethod = null; // cannot use
} catch (InvocationTargetException e) {
waitForProcessExitMethod = null; // cannot use
}
}
}
}
// check whether system has 'ps' and 'kill' and might support pid determination
if (!systemMightAllowProcessControl()) {
continue;
continue reaping_loop;
}
// check whether we can get pid for all launched processes
// there may be a short time between Process.start() and pid being available
// if any processes is not ready to report pid, restart loop
for (Process p : launchedProcesses) {
if (waitForPidForProcess(p) == PID_UNAVAILABLE) {
attemptSleep(REAPER_DAEMON_PERIOD);
continue;
continue reaping_loop;
}
}
// reap all perl processes which are children of process "1" (the jvm)
List<ProcessSurveyItem> processSurvey = ProcessSurveyor.getProcessSurvey();
if (processSurvey == null) {
continue; // "ps" failed this time, even though we know it can work on this system
continue reaping_loop; // "ps" failed this time, even though we know it can work on this system
}
for (ProcessSurveyItem psi : processSurvey) {
if (processShouldBeReaped(psi)) {
Expand All @@ -171,6 +193,16 @@ public static File getDevNull() {
return devNull;
}

private static boolean processIsZombie(ProcessSurveyItem psi) {
if (psi.getParentProcessId() != JVM_PROCESS_ID) {
return false; // only reap zombies which are children of the jvm
}
if (psi.getStateCode() != 'Z' && psi.getStateCode() != 'z') {
return false; // only reap processes which are in zombie state
}
return true;
}

private static boolean processShouldBeReaped(ProcessSurveyItem psi) {
if (psi.getParentProcessId() != JVM_PROCESS_ID ) {
return false; // only reap processes which are children of the jvm
Expand Down Expand Up @@ -272,19 +304,45 @@ private static int waitForPidForProcess(Process p) {
return pid;
}

private static void determineAvailableMethodToReadPid(Process process) {
Class<?> processClass = process.getClass();
private static void determineWaitForProcessExitMethod() {
waitForProcessExitMethod = null;
Class<?> processClass = rememberedUnixProcess.getClass();
Method[] classMethods = processClass.getDeclaredMethods();
for (Method m : classMethods) {
if (!m.getName().equals("waitForProcessExit")) {
continue;
}
Class[] methodArgTypes = m.getParameterTypes();
if (methodArgTypes.length != 1) {
continue;
}
if (!methodArgTypes[0].getName().equals("int")) {
continue;
}
waitForProcessExitMethod = m;
waitForProcessExitMethod.setAccessible(true);
break;
}
}

private static void determineAvailableMethodToReadPid() {
processPidField = null;
Class<?> processClass = rememberedUnixProcess.getClass();
if (UNIX_PROCESS_CLASS_NAME.equals(processClass.getName())) {
if (processIdCanBeReadFromToStringMethod == null) {
processIdCanBeReadFromToStringMethod = false;
processIdCanBeReadFromToStringMethod = PID_REPORT_PATTERN.matcher(process.toString()).find();
processIdCanBeReadFromToStringMethod = PID_REPORT_PATTERN.matcher(rememberedUnixProcess.toString()).find();
}
if (processIdCanBeReadFromUnixProcessField == null) {
processIdCanBeReadFromUnixProcessField = false;
try {
processPidField = processClass.getDeclaredField("pid");
processPidField.setAccessible(true);
processIdCanBeReadFromUnixProcessField = true;
if (processPidField.getType().getName().equals("int")) {
processPidField.setAccessible(true);
processIdCanBeReadFromUnixProcessField = true;
} else {
processPidField = null;
}
} catch (NoSuchFieldException e) {
// leave processIdCanBeReadFromUnixProcessField as false if there is no pid field
}
Expand All @@ -293,9 +351,6 @@ private static void determineAvailableMethodToReadPid(Process process) {
}

public static int getProcessId(Process process) {
if (pidDeterminationIsNotYetKnown()) {
determineAvailableMethodToReadPid(process);
}
if (Boolean.TRUE.equals(processIdCanBeReadFromToStringMethod)) {
Matcher pidReportMatcher = PID_REPORT_PATTERN.matcher(process.toString());
if (pidReportMatcher.find()) {
Expand All @@ -322,17 +377,11 @@ public static int getProcessId(Process process) {
return PID_UNAVAILABLE;
}

// for pidDeterminationIsNotYetKnown and pidDeterminationIsImpossible:
// if both static variables are set to Boolean.FALSE, we know that no
// method is available. if either variable is null, it is not fully
// known what methods are available. if either is Boolean.TRUE
// then we know there is an available method

private static boolean pidDeterminationIsNotYetKnown() {
return processIdCanBeReadFromToStringMethod == null ||
processIdCanBeReadFromUnixProcessField == null;
}

private static boolean pidDeterminationIsImpossible() {
return Boolean.FALSE.equals(processIdCanBeReadFromToStringMethod) &&
Boolean.FALSE.equals(processIdCanBeReadFromUnixProcessField);
Expand Down
Loading

0 comments on commit e5f57b6

Please sign in to comment.