Skip to content

Commit

Permalink
Fix parallel_map for Python3.8 on Mac OSX (#3491)
Browse files Browse the repository at this point in the history
* Set start method to fork for Mac in order to use python3.8

* Added comments that fork start method is unsafe

* Shorten the long line to pass the test

* Update qiskit/tools/parallel.py

Change start method only in python 3.8

Co-Authored-By: Matthew Treinish <mtreinish@kortar.org>

* Fixed import error and too long line to pass the test

* Fixed indentation to pass test

* Used ProcessPoolExecutor instead of Pool to run on spawn start method.

* Set correct circuit name in spawned process

* Merge instance calls and remove unnecessary module import

* Added workaround to avoid setting duplicated circuite name when using ProcessPoolExecutor in python 3.5 and 3.6

* Remove stray comments

Using the fork context for the multiprocessing pool was left in
commented out from a previous iteration of the PR branch. This commit
removes these leftover lines to prepare this for merging.
  • Loading branch information
imaihal authored and mergify[bot] committed Dec 2, 2019
1 parent c4809d4 commit af390a8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
14 changes: 12 additions & 2 deletions qiskit/circuit/quantumcircuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,18 @@ def __init__(self, *regs, name=None):
name = self.cls_prefix() + str(self.cls_instances())
# pylint: disable=not-callable
# (known pylint bug: https://github.com/PyCQA/pylint/issues/1699)
if sys.platform != "win32" and isinstance(mp.current_process(), mp.context.ForkProcess):
name += '-{}'.format(mp.current_process().pid)
if sys.platform != "win32":
if isinstance(mp.current_process(),
(mp.context.ForkProcess, mp.context.SpawnProcess)):
name += '-{}'.format(mp.current_process().pid)
elif sys.version_info[0] == 3 \
and (sys.version_info[1] == 5 or sys.version_info[1] == 6) \
and mp.current_process().name != 'MainProcess':
# It seems condition of if-statement doen't work in python 3.5 and 3.6
# because processes created by "ProcessPoolExecutor" are not
# mp.context.ForkProcess or mp.context.SpawnProcess. As a workaround,
# "name" of the process is checked instead.
name += '-{}'.format(mp.current_process().pid)
self._increment_instances()

if not isinstance(name, str):
Expand Down
27 changes: 13 additions & 14 deletions qiskit/tools/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

import os
import platform
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from qiskit.exceptions import QiskitError
from qiskit.util import local_hardware_info
from qiskit.tools.events.pubsub import Publisher
Expand All @@ -64,6 +64,11 @@
CPU_COUNT = local_hardware_info()['cpus']


def _task_wrapper(param):
(task, value, task_args, task_kwargs) = param
return task(value, *task_args, **task_kwargs)


def parallel_map( # pylint: disable=dangerous-default-value
task, values, task_args=tuple(), task_kwargs={}, num_processes=CPU_COUNT):
"""
Expand Down Expand Up @@ -111,22 +116,16 @@ def _callback(_):
and os.getenv('QISKIT_IN_PARALLEL') == 'FALSE':
os.environ['QISKIT_IN_PARALLEL'] = 'TRUE'
try:
pool = Pool(processes=num_processes)

async_res = [pool.apply_async(task, (value,) + task_args, task_kwargs,
_callback) for value in values]

while not all([item.ready() for item in async_res]):
for item in async_res:
item.wait(timeout=0.1)
results = []
with ProcessPoolExecutor(max_workers=num_processes) as executor:
param = map(lambda value: (task, value, task_args, task_kwargs), values)
future = executor.map(_task_wrapper, param)

pool.terminate()
pool.join()
results = list(future)
Publisher().publish("terra.parallel.done", len(results))

except (KeyboardInterrupt, Exception) as error:
if isinstance(error, KeyboardInterrupt):
pool.terminate()
pool.join()
Publisher().publish("terra.parallel.finish")
os.environ['QISKIT_IN_PARALLEL'] = 'False'
raise QiskitError('Keyboard interrupt in parallel_map.')
Expand All @@ -136,7 +135,7 @@ def _callback(_):

Publisher().publish("terra.parallel.finish")
os.environ['QISKIT_IN_PARALLEL'] = 'FALSE'
return [ar.get() for ar in async_res]
return results

# Cannot do parallel on Windows , if another parallel_map is running in parallel,
# or len(values) == 1.
Expand Down

0 comments on commit af390a8

Please sign in to comment.