From af390a89833c67d4ffd08b2034d82a37540ab8ae Mon Sep 17 00:00:00 2001 From: Haruki Imai Date: Tue, 3 Dec 2019 02:18:03 +0900 Subject: [PATCH] Fix parallel_map for Python3.8 on Mac OSX (#3491) * Set start method to fork for Mac in order to use python3.8 * Added comments that fork start method is unsafe * Shorten the long line to pass the test * Update qiskit/tools/parallel.py Change start method only in python 3.8 Co-Authored-By: Matthew Treinish * Fixed import error and too long line to pass the test * Fixed indentation to pass test * Used ProcessPoolExecutor instead of Pool to run on spawn start method. * Set correct circuit name in spawned process * Merge instance calls and remove unnecessary module import * Added workaround to avoid setting duplicated circuite name when using ProcessPoolExecutor in python 3.5 and 3.6 * Remove stray comments Using the fork context for the multiprocessing pool was left in commented out from a previous iteration of the PR branch. This commit removes these leftover lines to prepare this for merging. --- qiskit/circuit/quantumcircuit.py | 14 ++++++++++++-- qiskit/tools/parallel.py | 27 +++++++++++++-------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/qiskit/circuit/quantumcircuit.py b/qiskit/circuit/quantumcircuit.py index b18b72384747..c6cf37ca09de 100644 --- a/qiskit/circuit/quantumcircuit.py +++ b/qiskit/circuit/quantumcircuit.py @@ -139,8 +139,18 @@ def __init__(self, *regs, name=None): name = self.cls_prefix() + str(self.cls_instances()) # pylint: disable=not-callable # (known pylint bug: https://github.com/PyCQA/pylint/issues/1699) - if sys.platform != "win32" and isinstance(mp.current_process(), mp.context.ForkProcess): - name += '-{}'.format(mp.current_process().pid) + if sys.platform != "win32": + if isinstance(mp.current_process(), + (mp.context.ForkProcess, mp.context.SpawnProcess)): + name += '-{}'.format(mp.current_process().pid) + elif sys.version_info[0] == 3 \ + and (sys.version_info[1] == 5 or sys.version_info[1] == 6) \ + and mp.current_process().name != 'MainProcess': + # It seems condition of if-statement doen't work in python 3.5 and 3.6 + # because processes created by "ProcessPoolExecutor" are not + # mp.context.ForkProcess or mp.context.SpawnProcess. As a workaround, + # "name" of the process is checked instead. + name += '-{}'.format(mp.current_process().pid) self._increment_instances() if not isinstance(name, str): diff --git a/qiskit/tools/parallel.py b/qiskit/tools/parallel.py index c8b4da7589eb..d380b5e5c797 100644 --- a/qiskit/tools/parallel.py +++ b/qiskit/tools/parallel.py @@ -52,7 +52,7 @@ import os import platform -from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor from qiskit.exceptions import QiskitError from qiskit.util import local_hardware_info from qiskit.tools.events.pubsub import Publisher @@ -64,6 +64,11 @@ CPU_COUNT = local_hardware_info()['cpus'] +def _task_wrapper(param): + (task, value, task_args, task_kwargs) = param + return task(value, *task_args, **task_kwargs) + + def parallel_map( # pylint: disable=dangerous-default-value task, values, task_args=tuple(), task_kwargs={}, num_processes=CPU_COUNT): """ @@ -111,22 +116,16 @@ def _callback(_): and os.getenv('QISKIT_IN_PARALLEL') == 'FALSE': os.environ['QISKIT_IN_PARALLEL'] = 'TRUE' try: - pool = Pool(processes=num_processes) - - async_res = [pool.apply_async(task, (value,) + task_args, task_kwargs, - _callback) for value in values] - - while not all([item.ready() for item in async_res]): - for item in async_res: - item.wait(timeout=0.1) + results = [] + with ProcessPoolExecutor(max_workers=num_processes) as executor: + param = map(lambda value: (task, value, task_args, task_kwargs), values) + future = executor.map(_task_wrapper, param) - pool.terminate() - pool.join() + results = list(future) + Publisher().publish("terra.parallel.done", len(results)) except (KeyboardInterrupt, Exception) as error: if isinstance(error, KeyboardInterrupt): - pool.terminate() - pool.join() Publisher().publish("terra.parallel.finish") os.environ['QISKIT_IN_PARALLEL'] = 'False' raise QiskitError('Keyboard interrupt in parallel_map.') @@ -136,7 +135,7 @@ def _callback(_): Publisher().publish("terra.parallel.finish") os.environ['QISKIT_IN_PARALLEL'] = 'FALSE' - return [ar.get() for ar in async_res] + return results # Cannot do parallel on Windows , if another parallel_map is running in parallel, # or len(values) == 1.