Skip to content

Commit

Permalink
move process utilities from jupyterlab to here
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanov committed Jun 8, 2018
1 parent 7478b6c commit eeb0dbe
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 1 deletion.
2 changes: 1 addition & 1 deletion jupyterlab_launcher/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (0, 10, 5)
version_info = (0, 11, 0)
__version__ = ".".join(map(str, version_info))
285 changes: 285 additions & 0 deletions jupyterlab_launcher/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
# coding: utf-8
"""JupyterLab Launcher process handler"""

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from __future__ import print_function

import atexit
import logging
import os
import re
import signal
import sys
import threading
import time
import weakref

from tornado import gen

from ipython_genutils.py3compat import which as _which

try:
import subprocess32 as subprocess
except ImportError:
import subprocess

try:
import pty
except ImportError:
pty = False

if sys.platform == 'win32':
list2cmdline = subprocess.list2cmdline
else:
def list2cmdline(cmd_list):
import pipes
return ' '.join(map(pipes.quote, cmd_list))

logging.basicConfig(format='%(message)s', level=logging.INFO)


def which(command, env=None):
"""Get the full path to a command.
Parameters
----------
command: str
The command name or path.
env: dict, optional
The environment variables, defaults to `os.environ`.
"""
env = env or os.environ
path = env.get('PATH') or os.defpath
command_with_path = _which(command, path=path)

# Allow nodejs as an alias to node.
if command == 'node' and not command_with_path:
command = 'nodejs'
command_with_path = _which('nodejs', path=path)

if not command_with_path:
if command in ['nodejs', 'node', 'npm']:
msg = 'Please install nodejs 5+ and npm before continuing installation. nodejs may be installed using conda or directly from the nodejs website.'
raise ValueError(msg)
raise ValueError('The command was not found or was not ' +
'executable: %s.' % command)
return command_with_path


class Process(object):
"""A wrapper for a child process.
"""
_procs = weakref.WeakSet()
_pool = None

def __init__(self, cmd, logger=None, cwd=None, kill_event=None,
env=None, quiet=False):
"""Start a subprocess that can be run asynchronously.
Parameters
----------
cmd: list
The command to run.
logger: :class:`~logger.Logger`, optional
The logger instance.
cwd: string, optional
The cwd of the process.
env: dict, optional
The environment for the process.
kill_event: :class:`~threading.Event`, optional
An event used to kill the process operation.
"""
if not isinstance(cmd, (list, tuple)):
raise ValueError('Command must be given as a list')

if kill_event and kill_event.is_set():
raise ValueError('Process aborted')

self.logger = logger = logger or logging.getLogger('jupyterlab')
self._last_line = ''
if not quiet:
self.logger.info('> ' + list2cmdline(cmd))
self.cmd = cmd

self.proc = self._create_process(cwd=cwd, env=env)
self._kill_event = kill_event or threading.Event()

Process._procs.add(self)

def terminate(self):
"""Terminate the process and return the exit code.
"""
proc = self.proc

# Kill the process.
if proc.poll() is None:
os.kill(proc.pid, signal.SIGTERM)

# Wait for the process to close.
try:
proc.wait()
finally:
Process._procs.remove(self)

return proc.returncode

def wait(self):
"""Wait for the process to finish.
Returns
-------
The process exit code.
"""
proc = self.proc
kill_event = self._kill_event
while proc.poll() is None:
if kill_event.is_set():
self.terminate()
raise ValueError('Process was aborted')
time.sleep(1.)
return self.terminate()

@gen.coroutine
def wait_async(self):
"""Asynchronously wait for the process to finish.
"""
proc = self.proc
kill_event = self._kill_event
while proc.poll() is None:
if kill_event.is_set():
self.terminate()
raise ValueError('Process was aborted')
yield gen.sleep(1.)

raise gen.Return(self.terminate())

def _create_process(self, **kwargs):
"""Create the process.
"""
cmd = self.cmd
kwargs.setdefault('stderr', subprocess.STDOUT)

cmd[0] = which(cmd[0], kwargs.get('env'))

if os.name == 'nt':
kwargs['shell'] = True

proc = subprocess.Popen(cmd, **kwargs)
return proc

@classmethod
def _cleanup(cls):
"""Clean up the started subprocesses at exit.
"""
for proc in list(cls._procs):
proc.terminate()


class WatchHelper(Process):
"""A process helper for a watch process.
"""

def __init__(self, cmd, startup_regex, logger=None, cwd=None,
kill_event=None, env=None):
"""Initialize the process helper.
Parameters
----------
cmd: list
The command to run.
startup_regex: string
The regex to wait for at startup.
logger: :class:`~logger.Logger`, optional
The logger instance.
cwd: string, optional
The cwd of the process.
env: dict, optional
The environment for the process.
kill_event: callable, optional
A function to call to check if we should abort.
"""
super(WatchHelper, self).__init__(cmd, logger=logger,
cwd=cwd, kill_event=kill_event, env=env)

if not pty:
self._stdout = self.proc.stdout

while 1:
line = self._stdout.readline().decode('utf-8')
if not line:
raise RuntimeError('Process ended improperly')
print(line.rstrip())
if re.match(startup_regex, line):
break

self._read_thread = threading.Thread(target=self._read_incoming)
self._read_thread.setDaemon(True)
self._read_thread.start()

def terminate(self):
"""Terminate the process.
"""
proc = self.proc

if proc.poll() is None:
if os.name != 'nt':
# Kill the process group if we started a new session.
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
else:
os.kill(proc.pid, signal.SIGTERM)

# Close stdout.
try:
self._stdout.close()
except Exception as e:
pass

# Wait for the process to close.
try:
proc.wait()
finally:
Process._procs.remove(self)

return proc.returncode

def _read_incoming(self):
"""Run in a thread to read stdout and print"""
fileno = self._stdout.fileno()
while 1:
try:
buf = os.read(fileno, 1024)
except OSError as e:
self.logger.debug('Read incoming error %s', e)
return

if not buf:
return

print(buf.decode('utf-8'), end='')

def _create_process(self, **kwargs):
"""Create the watcher helper process.
"""
kwargs['bufsize'] = 0

if pty:
master, slave = pty.openpty()
kwargs['stderr'] = kwargs['stdout'] = slave
kwargs['start_new_session'] = True
self._stdout = os.fdopen(master, 'rb')
else:
kwargs['stdout'] = subprocess.PIPE

if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
kwargs['shell'] = True

return super(WatchHelper, self)._create_process(**kwargs)


# Register the cleanup handler.
atexit.register(Process._cleanup)
44 changes: 44 additions & 0 deletions jupyterlab_launcher/process_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# coding: utf-8
"""A lab app that runs a sub process for a demo or a test."""

from __future__ import print_function, absolute_import

import sys

from notebook.notebookapp import NotebookApp
from tornado.ioloop import IOLoop
from traitlets import Bool

from .process import Process


class ProcessApp(NotebookApp):
"""A notebook app that runs a separate process and exits on completion."""

open_browser = Bool(False)

def get_command(self):
"""Get the command and kwargs to run with `Process`.
This is intended to be overridden.
"""
return ['python', '--version'], dict()

def start(self):
"""Start the application.
"""
IOLoop.current().add_callback(self._run_command)
NotebookApp.start(self)

def _run_command(self):
command, kwargs = self.get_command()
kwargs.setdefault('logger', self.log)
future = Process(command, **kwargs).wait_async()
IOLoop.current().add_future(future, self._process_finished)

def _process_finished(self, future):
try:
IOLoop.current().stop()
sys.exit(future.result())
except Exception as e:
self.log.error(str(e))
sys.exit(1)

0 comments on commit eeb0dbe

Please sign in to comment.