Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subprocess widget #13

Merged
merged 12 commits into from
Jan 27, 2016
312 changes: 203 additions & 109 deletions Qcodes example.ipynb

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions qcodes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
# set up the qcodes namespace
# flake8: noqa (we don't need the "<...> imported but unused" error)

from qcodes.station import Station
from qcodes.loops import get_bg, halt_bg, Loop, Task, Wait
# just for convenience in debugging, so we don't have to
# separately import multiprocessing
from multiprocessing import active_children

from qcodes.utils.multiprocessing import set_mp_method
from qcodes.utils.helpers import in_notebook, reload_code

# hack for code that should only be imported into the main (notebook) thread
# see: http://stackoverflow.com/questions/15411967
# code that should only be imported into the main (notebook) thread
# in particular, importing matplotlib in the side processes takes a long
# time and spins up other processes in order to try and get a front end
import sys as _sys
if 'ipy' in repr(_sys.stdout):
if in_notebook():
from qcodes.plots.matplotlib import MatPlot
from qcodes.plots.pyqtgraph import QtPlot
from qcodes.widgets.widgets import show_subprocess_widget

from qcodes.station import Station
from qcodes.loops import get_bg, halt_bg, Loop, Task, Wait

from qcodes.data.manager import get_data_manager
from qcodes.data.data_set import DataMode, DataSet
Expand All @@ -27,9 +33,3 @@
from qcodes.instrument.function import Function
from qcodes.instrument.parameter import Parameter, InstrumentParameter
from qcodes.instrument.sweep_values import SweepFixedValues, AdaptiveSweep

from qcodes.utils.helpers import set_mp_method, reload_code

# just for convenience in debugging, so we don't have to
# separately import multiprocessing
from multiprocessing import active_children
8 changes: 2 additions & 6 deletions qcodes/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from traceback import format_exc
from sys import stderr

from qcodes.utils.helpers import PrintableProcess
from qcodes.utils.multiprocessing import QcodesProcess


def get_data_manager():
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(self, query_timeout=2):
self._start_server()

def _start_server(self):
self._server = DataServerProcess(target=self._run_server, daemon=True)
self._server = QcodesProcess(target=self._run_server, name='DataServer')
self._server.start()

def _run_server(self):
Expand Down Expand Up @@ -118,10 +118,6 @@ def restart(self, force=False):
self._start_server()


class DataServerProcess(PrintableProcess):
name = 'DataServer'


class DataServer(object):
'''
Running in its own process, receives, holds, and returns current `Loop` and
Expand Down
32 changes: 18 additions & 14 deletions qcodes/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,40 @@
from qcodes.station import Station
from qcodes.data.data_set import DataSet, DataMode
from qcodes.data.data_array import DataArray
from qcodes.utils.helpers import wait_secs, PrintableProcess
from qcodes.utils.helpers import wait_secs
from qcodes.utils.multiprocessing import QcodesProcess
from qcodes.utils.sync_async import mock_sync


def get_bg():
MP_NAME = 'MeasurementProcess'


def get_bg(return_first=False):
'''
find the active background measurement process, if any
returns None otherwise

return_first: if there are multiple loops running return the first anyway.
If false, multiple loops is a RuntimeError.
default False
'''
processes = mp.active_children()
loops = [p for p in processes if isinstance(p, MeasurementProcess)]

if len(loops) == 1:
return loops[0]
loops = [p for p in processes if getattr(p, 'name', '') == MP_NAME]

if len(loops):
if len(loops) > 1 and not return_first:
raise RuntimeError('Oops, multiple loops are running???')

if loops:
return loops[0]

return None


def halt_bg(self, timeout=5):
def halt_bg(timeout=5):
'''
Stop the active background measurement process, if any
'''
loop = get_bg()
loop = get_bg(return_first=True)
if not loop:
print('No loop running')
return
Expand Down Expand Up @@ -422,7 +430,7 @@ def run(self, location=None, formatter=None, io=None, data_manager=None,
# TODO: in notebooks, errors in a background sweep will just appear
# the next time a command is run. Do something better?
# (like log them somewhere, show in monitoring window)?
p = MeasurementProcess(target=loop_fn, daemon=True)
p = QcodesProcess(target=loop_fn, name=MP_NAME)
p.is_sweep = True
p.signal_queue = self.signal_queue
p.start()
Expand Down Expand Up @@ -527,10 +535,6 @@ def _wait(self, delay):
time.sleep(wait_secs(finish_datetime))


class MeasurementProcess(PrintableProcess):
name = 'MeasurementLoop'


class Task(object):
'''
A predefined task to be executed within a measurement Loop
Expand Down
109 changes: 109 additions & 0 deletions qcodes/tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from unittest import TestCase
import time
import re
import sys
import multiprocessing as mp

from qcodes.utils.multiprocessing import (set_mp_method, QcodesProcess,
get_stream_queue)


# note sometimes separate processes do not seem to register in
# coverage tests, though it seems that when we actively return stdout and
# stderr to the normal routes, coverage does get tested?
# if that starts to fail, we can insert "pragma no cover" comments around
# such code - but need to be extra careful then that we really do cover it!

def sqtest(name, period, cnt):
p = QcodesProcess(target=sqtest_f, args=(name.upper(), period, cnt),
name=name)
p.start()
return p


def sqtest_f(name, period, cnt):
for i in range(cnt):
print('message from {}...'.format(name), end='', flush=True)
if i % 5 == 1:
print('error from {}...'.format(name), end='',
file=sys.stderr, flush=True)
print('', end='') # this one should do nothing
time.sleep(period)
print('') # this one should make a blank line at the very end

# now test that disconnect works, and reverts to regular stdout and stderr
get_stream_queue().disconnect()
print('stdout ', end='', flush=True)
print('stderr ', file=sys.stderr, end='', flush=True)


class TestMpMethod(TestCase):
pass
# TODO - this is going to be a bit fragile and platform-dependent, I think.
# we will need to initialize the start method before *any* tests run,
# which looks like it will require using a plugin.


class TestQcodesProcess(TestCase):
def test_qcodes_process(self):
# set up two processes that take 0.5 and 0.4 sec and produce
# staggered results
# p1 produces more output and sometimes makes two messages in a row
# before p1 produces any.
sq = get_stream_queue()
queue_format = re.compile(
'^\[\d\d:\d\d:\d\d\.\d\d\d p\d( ERR)?\] [^\[\]]*$')

sqtest('p1', 0.05, 10)
time.sleep(0.025)
sqtest('p2', 0.1, 4)

reprs = [repr(p) for p in mp.active_children()]
for name in ('p1', 'p2'):
self.assertIn('<{}, started daemon>'.format(name), reprs)
self.assertEqual(len(reprs), 2, reprs)

time.sleep(0.25)
queue_data1 = sq.get().split('\n')

time.sleep(0.25)
self.assertEqual(mp.active_children(), [])
queue_data2 = sq.get().split('\n')

for line in queue_data1 + queue_data2[1:-1]:
self.assertIsNotNone(queue_format.match(line), line)
# we've tested the header, now strip it
data1 = [line[14:] for line in queue_data1]
data2 = [line[14:] for line in queue_data2[1:-1]]
p1msg = 'p1] message from P1...'
p2msg = p1msg.replace('1', '2')
p1err = 'p1 ERR] error from P1...'
p2err = p1err.replace('1', '2')
expected_data1 = [
p1msg,
p2msg,
p1msg,
p1err,
p1msg,
p2msg,
p2err,
p1msg + p1msg[4:],
p2msg,
p1msg
]
# first line of data2 is special, as it has no header
# because it's continuing a line from the same stream
expected_data2_first = p1msg[4:]
expected_data2 = [
p1err,
p2msg,
p1msg + p1msg[4:],
'p2] ', # p2 is quitting - should send a blank line
p1msg
]
self.assertEqual(data1, expected_data1)
self.assertEqual(queue_data2[0], expected_data2_first)
self.assertEqual(data2, expected_data2)
# last line of data2 is also special, it's a trailing blank
# when p1 quits
self.assertEqual(queue_data2[-1], '')
51 changes: 8 additions & 43 deletions qcodes/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@
from inspect import getargspec, ismethod
import logging
import math
import multiprocessing as mp
import sys
import os
from traceback import format_exc


def in_notebook():
'''
is this code in a process directly connected to a jupyter notebook?
see: http://stackoverflow.com/questions/15411967
'''
return 'ipy' in repr(sys.stdout)


def is_sequence(obj):
'''
is an object a sequence? We do not consider strings to be sequences,
Expand Down Expand Up @@ -99,48 +106,6 @@ def make_unique(s, existing):
return s_out


def set_mp_method(method, force=False):
'''
an idempotent wrapper for multiprocessing.set_start_method
args are the same:

method: one of:
'fork' (default on unix/mac)
'spawn' (default, and only option, on windows)
'forkserver'
force: allow changing context? default False
in the original function, even calling the function again
with the *same* method raises an error, but here we only
raise the error if you *don't* force *and* the context changes
'''
try:
# force windows multiprocessing behavior on mac
mp.set_start_method(method)
except RuntimeError as err:
if err.args != ('context has already been set', ):
raise

mp_method = mp.get_start_method()
if mp_method != method:
raise RuntimeError(
'unexpected multiprocessing method '
'\'{}\' when trying to set \'{}\''.format(mp_method, method))


class PrintableProcess(mp.Process):
'''
controls repr printing of the process
subclasses should provide a `name` attribute to go in repr()
if subclass.name = 'DataServer',
repr results in eg '<DataServer-1, started daemon>'
otherwise would be '<DataServerProcess(DataServerProcess...)>'
'''
def __repr__(self):
cname = self.__class__.__name__
out = super().__repr__().replace(cname + '(' + cname, self.name)
return out.replace(')>', '>')


def safe_getattr(obj, key, attr_dict):
'''
__getattr__ delegation to avoid infinite recursion
Expand Down
Loading