diff --git a/jupyterlab_launcher/process.py b/jupyterlab_launcher/process.py new file mode 100644 index 00000000..b966c7b4 --- /dev/null +++ b/jupyterlab_launcher/process.py @@ -0,0 +1,285 @@ +# coding: utf-8 +"""JupyterLab Launcher process handler""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +from __future__ import print_function + +import atexit +import logging +import os +import re +import signal +import sys +import threading +import time +import weakref + +from tornado import gen + +from ipython_genutils.py3compat import which as _which + +try: + import subprocess32 as subprocess +except ImportError: + import subprocess + +try: + import pty +except ImportError: + pty = False + +if sys.platform == 'win32': + list2cmdline = subprocess.list2cmdline +else: + def list2cmdline(cmd_list): + import pipes + return ' '.join(map(pipes.quote, cmd_list)) + +logging.basicConfig(format='%(message)s', level=logging.INFO) + + +def which(command, env=None): + """Get the full path to a command. + + Parameters + ---------- + command: str + The command name or path. + env: dict, optional + The environment variables, defaults to `os.environ`. + """ + env = env or os.environ + path = env.get('PATH') or os.defpath + command_with_path = _which(command, path=path) + + # Allow nodejs as an alias to node. + if command == 'node' and not command_with_path: + command = 'nodejs' + command_with_path = _which('nodejs', path=path) + + if not command_with_path: + if command in ['nodejs', 'node', 'npm']: + msg = 'Please install nodejs 5+ and npm before continuing installation. nodejs may be installed using conda or directly from the nodejs website.' + raise ValueError(msg) + raise ValueError('The command was not found or was not ' + + 'executable: %s.' % command) + return command_with_path + + +class Process(object): + """A wrapper for a child process. + """ + _procs = weakref.WeakSet() + _pool = None + + def __init__(self, cmd, logger=None, cwd=None, kill_event=None, + env=None, quiet=False): + """Start a subprocess that can be run asynchronously. + + Parameters + ---------- + cmd: list + The command to run. + logger: :class:`~logger.Logger`, optional + The logger instance. + cwd: string, optional + The cwd of the process. + env: dict, optional + The environment for the process. + kill_event: :class:`~threading.Event`, optional + An event used to kill the process operation. + """ + if not isinstance(cmd, (list, tuple)): + raise ValueError('Command must be given as a list') + + if kill_event and kill_event.is_set(): + raise ValueError('Process aborted') + + self.logger = logger = logger or logging.getLogger('jupyterlab') + self._last_line = '' + if not quiet: + self.logger.info('> ' + list2cmdline(cmd)) + self.cmd = cmd + + self.proc = self._create_process(cwd=cwd, env=env) + self._kill_event = kill_event or threading.Event() + + Process._procs.add(self) + + def terminate(self): + """Terminate the process and return the exit code. + """ + proc = self.proc + + # Kill the process. + if proc.poll() is None: + os.kill(proc.pid, signal.SIGTERM) + + # Wait for the process to close. + try: + proc.wait() + finally: + Process._procs.remove(self) + + return proc.returncode + + def wait(self): + """Wait for the process to finish. + + Returns + ------- + The process exit code. + """ + proc = self.proc + kill_event = self._kill_event + while proc.poll() is None: + if kill_event.is_set(): + self.terminate() + raise ValueError('Process was aborted') + time.sleep(1.) + return self.terminate() + + @gen.coroutine + def wait_async(self): + """Asynchronously wait for the process to finish. + """ + proc = self.proc + kill_event = self._kill_event + while proc.poll() is None: + if kill_event.is_set(): + self.terminate() + raise ValueError('Process was aborted') + yield gen.sleep(1.) + + raise gen.Return(self.terminate()) + + def _create_process(self, **kwargs): + """Create the process. + """ + cmd = self.cmd + kwargs.setdefault('stderr', subprocess.STDOUT) + + cmd[0] = which(cmd[0], kwargs.get('env')) + + if os.name == 'nt': + kwargs['shell'] = True + + proc = subprocess.Popen(cmd, **kwargs) + return proc + + @classmethod + def _cleanup(cls): + """Clean up the started subprocesses at exit. + """ + for proc in list(cls._procs): + proc.terminate() + + +class WatchHelper(Process): + """A process helper for a watch process. + """ + + def __init__(self, cmd, startup_regex, logger=None, cwd=None, + kill_event=None, env=None): + """Initialize the process helper. + + Parameters + ---------- + cmd: list + The command to run. + startup_regex: string + The regex to wait for at startup. + logger: :class:`~logger.Logger`, optional + The logger instance. + cwd: string, optional + The cwd of the process. + env: dict, optional + The environment for the process. + kill_event: callable, optional + A function to call to check if we should abort. + """ + super(WatchHelper, self).__init__(cmd, logger=logger, + cwd=cwd, kill_event=kill_event, env=env) + + if not pty: + self._stdout = self.proc.stdout + + while 1: + line = self._stdout.readline().decode('utf-8') + if not line: + raise RuntimeError('Process ended improperly') + print(line.rstrip()) + if re.match(startup_regex, line): + break + + self._read_thread = threading.Thread(target=self._read_incoming) + self._read_thread.setDaemon(True) + self._read_thread.start() + + def terminate(self): + """Terminate the process. + """ + proc = self.proc + + if proc.poll() is None: + if os.name != 'nt': + # Kill the process group if we started a new session. + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + else: + os.kill(proc.pid, signal.SIGTERM) + + # Close stdout. + try: + self._stdout.close() + except Exception as e: + pass + + # Wait for the process to close. + try: + proc.wait() + finally: + Process._procs.remove(self) + + return proc.returncode + + def _read_incoming(self): + """Run in a thread to read stdout and print""" + fileno = self._stdout.fileno() + while 1: + try: + buf = os.read(fileno, 1024) + except OSError as e: + self.logger.debug('Read incoming error %s', e) + return + + if not buf: + return + + print(buf.decode('utf-8'), end='') + + def _create_process(self, **kwargs): + """Create the watcher helper process. + """ + kwargs['bufsize'] = 0 + + if pty: + master, slave = pty.openpty() + kwargs['stderr'] = kwargs['stdout'] = slave + kwargs['start_new_session'] = True + self._stdout = os.fdopen(master, 'rb') + else: + kwargs['stdout'] = subprocess.PIPE + + if os.name == 'nt': + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + kwargs['startupinfo'] = startupinfo + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + kwargs['shell'] = True + + return super(WatchHelper, self)._create_process(**kwargs) + + +# Register the cleanup handler. +atexit.register(Process._cleanup) diff --git a/jupyterlab_launcher/process_app.py b/jupyterlab_launcher/process_app.py new file mode 100644 index 00000000..a23137cd --- /dev/null +++ b/jupyterlab_launcher/process_app.py @@ -0,0 +1,44 @@ +# coding: utf-8 +"""A lab app that runs a sub process for a demo or a test.""" + +from __future__ import print_function, absolute_import + +import sys + +from notebook.notebookapp import NotebookApp +from tornado.ioloop import IOLoop +from traitlets import Bool + +from .process import Process + + +class ProcessApp(NotebookApp): + """A notebook app that runs a separate process and exits on completion.""" + + open_browser = Bool(False) + + def get_command(self): + """Get the command and kwargs to run with `Process`. + This is intended to be overridden. + """ + return ['python', '--version'], dict() + + def start(self): + """Start the application. + """ + IOLoop.current().add_callback(self._run_command) + NotebookApp.start(self) + + def _run_command(self): + command, kwargs = self.get_command() + kwargs.setdefault('logger', self.log) + future = Process(command, **kwargs).wait_async() + IOLoop.current().add_future(future, self._process_finished) + + def _process_finished(self, future): + try: + IOLoop.current().stop() + sys.exit(future.result()) + except Exception as e: + self.log.error(str(e)) + sys.exit(1)