diff --git a/qiskit/passmanager/__init__.py b/qiskit/passmanager/__init__.py
index 298925f41dd5..579a8569b750 100644
--- a/qiskit/passmanager/__init__.py
+++ b/qiskit/passmanager/__init__.py
@@ -20,46 +20,178 @@
Overview
========
-Qiskit pass manager is somewhat inspired by the `LLVM compiler `_,
-but it is designed to take Qiskit object as an input instead of plain source code.
+The Qiskit pass manager is somewhat inspired by the `LLVM compiler `_,
+but it is designed to take a Python object as an input instead of plain source code.
-The pass manager converts the input object into an intermediate representation (IR),
+The pass manager converts the input Python object into an intermediate representation (IR),
and it can be optimized and get lowered with a variety of transformations over multiple passes.
-This representation must be preserved throughout the transformation.
-The passes may consume the hardware constraints that Qiskit backend may provide.
-Finally, the IR is converted back to some Qiskit object.
-Note that the input type and output type don't need to match.
-
-Execution of passes is managed by the :class:`.FlowController`,
-which is initialized with a set of transform and analysis passes and provides an iterator of them.
-This iterator can be conditioned on the :class:`.PropertySet`, which is a namespace
-storing the intermediate data necessary for the transformation.
-A pass has read and write access to the property set namespace,
-and the stored data is shared among scheduled passes.
-
-The :class:`BasePassManager` provides a user interface to build and execute transform passes.
-It internally spawns a :class:`BasePassRunner` instance to apply transforms to
-the input object. In this sense, the pass manager itself is unaware of the
-underlying IR, but it is indirectly tied to a particular IR through the pass runner class.
-
-The responsibilities of the pass runner are the following:
-
-* Defining the input type / pass manager IR / output type.
-* Converting an input object to a particular pass manager IR.
-* Preparing own property set namespace.
-* Running scheduled flow controllers to apply a series of transformations to the IR.
-* Converting the IR back to an output object.
-
-A single pass runner always takes a single input object and returns a single output object.
-Parallelism for multiple input objects is supported by the :class:`BasePassManager` by
-broadcasting the pass runner via the :mod:`qiskit.tools.parallel_map` function.
-
-The base class :class:`BasePassRunner` doesn't define any associated type by itself,
-and a developer needs to implement a subclass for a particular object type to optimize.
-This `veil of ignorance` allows us to choose the most efficient data representation
-for a particular optimization task, while we can reuse the pass flow control machinery
+The pass manager framework may employ multiple IRs with interleaved conversion passes,
+depending on the context of the optimization.
+
+.. note::
+
+ Currently there is no actual use/design of multiple IRs in the builtin Qiskit pass managers.
+ The implementation of the :mod:`passmanager` module is agnostic to
+ actual IR types (i.e. no strict type check is performed), and the pass manager works
+ as long as the IR implements all methods required by subsequent passes.
+ A concrete design for the use of multiple IRs might be provided in the future release.
+
+The passes may consume the hardware constraints that the Qiskit backend may provide.
+Finally, the IR is converted back to some Python object.
+Note that the input type and output type are not necessarily the same.
+
+Compilation in the pass manager is a chain of :class:`~.passmanager.Task` executions that
+take an IR and output a new IR with some optimization or data analysis.
+An atomic task is a *pass* which is a subclass of :class:`.GenericPass` that implements
+a :meth:`.~GenericPass.run` method that performs some work on the received IR.
+A set of passes may form a *flow controller*, which is a subclass of
+:class:`.BaseController`, which can implement arbitrary compilation-state-dependent logic for
+deciding which pass will get run next.
+Passes share intermediate data via the :class:`.PropertySet` object which is
+a free-form dictionary. A pass can populate the property set dictionary during the task execution.
+A flow controller can also consume the property set to control the pass execution,
+but this access must be read-only.
+The property set is portable and handed over from pass to pass at execution.
+In addition to the property set, tasks also receive a :class:`.WorkflowStatus` data structure.
+This object is initialized when the pass manager is run and handed over to underlying tasks.
+The status is updated after every pass is run, and contains information about the pipeline state
+(number of passes run, failure state, and so on) as opposed to the :class:`PropertySet`, which
+contains information about the IR being optimized.
+
+A pass manager is a wrapper of the flow controller, with responsibilities of
+
+* Scheduling optimization tasks,
+* Converting an input Python object to a particular Qiskit IR,
+* Initializing a property set and workflow status,
+* Running scheduled tasks to apply a series of transformations to the IR,
+* Converting the IR back to an output Python object.
+
+This indicates that the flow controller itself is type-agnostic, and a developer must
+implement a subclass of the :class:`BasePassManager` to manage the data conversion steps.
+This *veil of ignorance* allows us to choose the most efficient data representation
+for a particular pass manager task, while we can reuse the flow control machinery
for different input and output types.
+A single flow controller always takes a single IR object, and returns a single
+IR object. Parallelism for multiple input objects is supported by the
+:class:`BasePassManager` by broadcasting the flow controller via
+the :func:`qiskit.tools.parallel_map` function.
+
+
+Examples
+========
+
+We look into a toy optimization task, namely, preparing a row of numbers
+and remove a digit if the number is five.
+Such task might be easily done by converting the input numbers into string.
+We use the pass manager framework here, putting the efficiency aside for
+a moment to learn how to build a custom Qiskit compiler.
+
+.. code-block:: python
+
+ from qiskit.passmanager import BasePassManager, GenericPass, ConditionalController
+
+ class ToyPassManager(BasePassManager):
+
+ def _passmanager_frontend(self, input_program: int, **kwargs) -> str:
+ return str(input_program)
+
+ def _passmanager_backend(self, passmanager_ir: str, in_program: int, **kwargs) -> int:
+ return int(passmanager_ir)
+
+This pass manager inputs and outputs an integer number, while
+performing the optimization tasks on a string data.
+Hence, input, IR, output type are integer, string, integer, respectively.
+The :meth:`.~BasePassManager._passmanager_frontend` method defines the conversion from the
+input data to IR, and :meth:`.~BasePassManager._passmanager_backend` defines
+the conversion from the IR to output data.
+The pass manager backend is also given an :code:`in_program` parameter that contains the original
+``input_program`` to the front end, for referencing any original metadata of the input program for
+the final conversion.
+
+Next, we implement a pass that removes a digit when the number is five.
+
+.. code-block:: python
+
+ class RemoveFive(GenericPass):
+
+ def run(self, passmanager_ir: str):
+ return passmanager_ir.replace("5", "")
+
+ task = RemoveFive()
+
+Finally, we instantiate a pass manager and schedule the task with it.
+Running the pass manager with random row of numbers returns
+new numbers that don't contain five.
+
+.. code-block:: python
+
+ pm = ToyPassManager()
+ pm.append(task)
+
+ pm.run([123456789, 45654, 36785554])
+
+Output:
+
+.. parsed-literal::
+
+ [12346789, 464, 36784]
+
+Now we consider the case of conditional execution.
+We avoid execution of the "remove five" task when the input number is
+six digits or less. Such control can be implemented by a flow controller.
+We start from an analysis pass that provides the flow controller
+with information about the number of digits.
+
+.. code-block:: python
+
+ class CountDigits(GenericPass):
+
+ def run(self, passmanager_ir: str):
+ self.property_set["ndigits"] = len(passmanager_ir)
+
+ analysis_task = CountDigits()
+
+Then, we wrap the remove five task with the :class:`.ConditionalController`
+that runs the stored tasks only when the condition is met.
+
+.. code-block:: python
+
+ def digit_condition(property_set):
+ # Return True when condition is met.
+ return property_set["ndigits"] > 6
+
+ conditional_task = ConditionalController(
+ tasks=[RemoveFive()],
+ condition=digit_condition,
+ )
+
+As before, we schedule these passes with the pass manager and run.
+
+.. code-block:: python
+
+ pm = ToyPassManager()
+ pm.append(analysis_task)
+ pm.append(conditional_task)
+
+ pm.run([123456789, 45654, 36785554])
+
+Output:
+
+.. parsed-literal::
+
+ [12346789, 45654, 36784]
+
+The "remove five" task is triggered only for the first and third input
+values, which have more than six digits.
+
+With the pass manager framework, a developer can flexibly customize
+the optimization task by combining multiple passes and flow controllers.
+See details for following class API documentations.
+
+
+Interface
+=========
Base classes
------------
@@ -67,8 +199,9 @@
.. autosummary::
:toctree: ../stubs/
- BasePassRunner
BasePassManager
+ BaseController
+ GenericPass
Flow controllers
----------------
@@ -77,27 +210,33 @@
:toctree: ../stubs/
FlowController
+ FlowControllerLinear
ConditionalController
DoWhileController
-PropertySet
------------
+Compilation state
+-----------------
.. autosummary::
:toctree: ../stubs/
PropertySet
+ WorkflowStatus
+ PassManagerState
Exceptions
----------
.. autoexception:: PassManagerError
-
"""
-from .passrunner import BasePassRunner
from .passmanager import BasePassManager
-from .flow_controllers import FlowController, ConditionalController, DoWhileController
-from .base_pass import GenericPass
-from .propertyset import PropertySet
+from .flow_controllers import (
+ FlowController,
+ FlowControllerLinear,
+ ConditionalController,
+ DoWhileController,
+)
+from .base_tasks import GenericPass, BaseController
+from .compilation_status import PropertySet, WorkflowStatus, PassManagerState
from .exceptions import PassManagerError
diff --git a/qiskit/passmanager/base_pass.py b/qiskit/passmanager/base_pass.py
deleted file mode 100644
index f6cc27fc9b30..000000000000
--- a/qiskit/passmanager/base_pass.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# This code is part of Qiskit.
-#
-# (C) Copyright IBM 2023.
-#
-# This code is licensed under the Apache License, Version 2.0. You may
-# obtain a copy of this license in the LICENSE.txt file in the root directory
-# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
-#
-# Any modifications or derivative works of this code must retain this
-# copyright notice, and modified files need to carry a notice indicating
-# that they have been altered from the originals.
-
-"""Metaclass of Qiskit pass manager pass."""
-
-from abc import abstractmethod
-from collections.abc import Hashable
-from inspect import signature
-from typing import Any
-
-from .propertyset import PropertySet
-
-
-class MetaPass(type):
- """Metaclass for transpiler passes.
-
- Enforces the creation of some fields in the pass while allowing passes to
- override ``__init__``.
- """
-
- def __call__(cls, *args, **kwargs):
- pass_instance = type.__call__(cls, *args, **kwargs)
- pass_instance._hash = hash(MetaPass._freeze_init_parameters(cls, args, kwargs))
- return pass_instance
-
- @staticmethod
- def _freeze_init_parameters(class_, args, kwargs):
- self_guard = object()
- init_signature = signature(class_.__init__)
- bound_signature = init_signature.bind(self_guard, *args, **kwargs)
- arguments = [("class_.__name__", class_.__name__)]
- for name, value in bound_signature.arguments.items():
- if value == self_guard:
- continue
- if isinstance(value, Hashable):
- arguments.append((name, type(value), value))
- else:
- arguments.append((name, type(value), repr(value)))
- return frozenset(arguments)
-
-
-class GenericPass(metaclass=MetaPass):
- """Generic pass class with IR-agnostic minimal functionality."""
-
- def __init__(self):
- self.requires = [] # List of passes that requires
- self.preserves = [] # List of passes that preserves
- self.property_set = PropertySet() # This pass's pointer to the pass manager's property set.
- self._hash = hash(None)
-
- def __hash__(self):
- return self._hash
-
- def __eq__(self, other):
- return hash(self) == hash(other)
-
- def name(self):
- """Return the name of the pass."""
- return self.__class__.__name__
-
- @abstractmethod
- def run(self, passmanager_ir: Any):
- """Run a pass on the pass manager IR. This is implemented by the pass developer.
-
- Args:
- passmanager_ir: the dag on which the pass is run.
-
- Raises:
- NotImplementedError: when this is left unimplemented for a pass.
- """
- raise NotImplementedError
diff --git a/qiskit/passmanager/base_tasks.py b/qiskit/passmanager/base_tasks.py
new file mode 100644
index 000000000000..84a944d78042
--- /dev/null
+++ b/qiskit/passmanager/base_tasks.py
@@ -0,0 +1,228 @@
+# This code is part of Qiskit.
+#
+# (C) Copyright IBM 2023.
+#
+# This code is licensed under the Apache License, Version 2.0. You may
+# obtain a copy of this license in the LICENSE.txt file in the root directory
+# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
+#
+# Any modifications or derivative works of this code must retain this
+# copyright notice, and modified files need to carry a notice indicating
+# that they have been altered from the originals.
+
+"""Baseclasses for the Qiskit passmanager optimization tasks."""
+from __future__ import annotations
+
+import logging
+import time
+from abc import abstractmethod, ABC
+from collections.abc import Iterable, Callable, Generator
+from typing import Any
+
+from .compilation_status import RunState, PassManagerState
+
+logger = logging.getLogger(__name__)
+
+# Type alias
+PassManagerIR = Any
+
+
+class Task(ABC):
+ """An interface of the pass manager task.
+
+ The task takes a Qiskit IR, and outputs new Qiskit IR after some operation on it.
+ A task can rely on the :class:`.PropertySet` to communicate intermediate data among tasks.
+ """
+
+ @abstractmethod
+ def execute(
+ self,
+ passmanager_ir: PassManagerIR,
+ state: PassManagerState,
+ callback: Callable = None,
+ ) -> tuple[PassManagerIR, PassManagerState]:
+ """Execute optimization task for input Qiskit IR.
+
+ Args:
+ passmanager_ir: Qiskit IR to optimize.
+ state: State associated with workflow execution by the pass manager itself.
+ callback: A callback function which is caller per execution of optimization task.
+
+ Returns:
+ Optimized Qiskit IR and state of the workflow.
+ """
+ pass
+
+
+class GenericPass(Task, ABC):
+ """Base class of a single pass manager task.
+
+ A pass instance can read and write to the provided :class:`.PropertySet`,
+ and may modify the input pass manager IR.
+ """
+
+ def __init__(self):
+ self.requires: Iterable[Task] = []
+
+ def name(self) -> str:
+ """Name of the pass."""
+ return self.__class__.__name__
+
+ def execute(
+ self,
+ passmanager_ir: PassManagerIR,
+ state: PassManagerState,
+ callback: Callable = None,
+ ) -> tuple[PassManagerIR, PassManagerState]:
+ # Overriding this method is not safe.
+ # Pass subclass must keep current implementation.
+ # Especially, task execution may break when method signature is modified.
+
+ if self.requires:
+ # pylint: disable=cyclic-import
+ from .flow_controllers import FlowControllerLinear
+
+ passmanager_ir, state = FlowControllerLinear(self.requires).execute(
+ passmanager_ir=passmanager_ir,
+ state=state,
+ callback=callback,
+ )
+
+ run_state = None
+ ret = None
+ start_time = time.time()
+ try:
+ if self not in state.workflow_status.completed_passes:
+ ret = self.run(passmanager_ir)
+ run_state = RunState.SUCCESS
+ else:
+ run_state = RunState.SKIP
+ except Exception:
+ run_state = RunState.FAIL
+ raise
+ finally:
+ ret = ret or passmanager_ir
+ if run_state != RunState.SKIP:
+ running_time = time.time() - start_time
+ logger.info("Pass: %s - %.5f (ms)", self.name(), running_time * 1000)
+ if callback is not None:
+ callback(
+ task=self,
+ passmanager_ir=ret,
+ property_set=state.property_set,
+ running_time=running_time,
+ count=state.workflow_status.count,
+ )
+ return ret, self.update_status(state, run_state)
+
+ def update_status(
+ self,
+ state: PassManagerState,
+ run_state: RunState,
+ ) -> PassManagerState:
+ """Update workflow status.
+
+ Args:
+ state: Pass manager state to update.
+ run_state: Completion status of current task.
+
+ Returns:
+ Updated pass manager state.
+ """
+ state.workflow_status.previous_run = run_state
+ if run_state == RunState.SUCCESS:
+ state.workflow_status.count += 1
+ state.workflow_status.completed_passes.add(self)
+ return state
+
+ @abstractmethod
+ def run(
+ self,
+ passmanager_ir: PassManagerIR,
+ ) -> PassManagerIR:
+ """Run optimization task.
+
+ Args:
+ passmanager_ir: Qiskit IR to optimize.
+
+ Returns:
+ Optimized Qiskit IR.
+ """
+ pass
+
+
+class BaseController(Task, ABC):
+ """Base class of controller.
+
+ A controller is built with a collection of pass manager tasks,
+ and a subclass provides a custom logic to choose next task to run.
+ Note a controller can be nested into another controller,
+ and a controller itself doesn't provide any subroutine to modify the input IR.
+ """
+
+ def __init__(
+ self,
+ options: dict[str, Any] | None = None,
+ ):
+ """Create new flow controller.
+
+ Args:
+ options: Option for this flow controller.
+ """
+ self._options = options or {}
+
+ @abstractmethod
+ def iter_tasks(
+ self,
+ state: PassManagerState,
+ ) -> Generator[Task, PassManagerState, None]:
+ """A custom logic to choose a next task to run.
+
+ Controller subclass can consume the state to build a proper task pipeline. The updated
+ state after a task execution will be fed back in as the "return" value of any ``yield``
+ statements. This indicates the order of task execution is only determined at running time.
+ This method is not allowed to mutate the given state object.
+
+ Args:
+ state: The state of the passmanager workflow at the beginning of this flow controller's
+ execution.
+
+ Receives:
+ state: the state of pass manager after the execution of the last task that was yielded.
+ The generator does not need to inspect this if it is irrelevant to its logic, nor
+ update it.
+
+ Yields:
+ Task: Next task to run.
+ """
+ pass
+
+ def execute(
+ self,
+ passmanager_ir: PassManagerIR,
+ state: PassManagerState,
+ callback: Callable = None,
+ ) -> tuple[PassManagerIR, PassManagerState]:
+ # Overriding this method is not safe.
+ # Pass subclass must keep current implementation.
+ # Especially, task execution may break when method signature is modified.
+
+ task_generator = self.iter_tasks(state)
+ try:
+ next_task = task_generator.send(None)
+ except StopIteration:
+ return passmanager_ir, state
+ while True:
+ passmanager_ir, state = next_task.execute(
+ passmanager_ir=passmanager_ir,
+ state=state,
+ callback=callback,
+ )
+ try:
+ # Sending the object through the generator implies the custom controllers
+ # can always rely on the latest data to choose the next task to run.
+ next_task = task_generator.send(state)
+ except StopIteration:
+ break
+
+ return passmanager_ir, state
diff --git a/qiskit/passmanager/compilation_status.py b/qiskit/passmanager/compilation_status.py
new file mode 100644
index 000000000000..e38cdc9fad83
--- /dev/null
+++ b/qiskit/passmanager/compilation_status.py
@@ -0,0 +1,74 @@
+# This code is part of Qiskit.
+#
+# (C) Copyright IBM 2017, 2018, 2023.
+#
+# This code is licensed under the Apache License, Version 2.0. You may
+# obtain a copy of this license in the LICENSE.txt file in the root directory
+# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
+#
+# Any modifications or derivative works of this code must retain this
+# copyright notice, and modified files need to carry a notice indicating
+# that they have been altered from the originals.
+
+"""A property set dictionary that shared among optimization passes."""
+
+
+from dataclasses import dataclass, field
+from enum import Enum
+
+
+class PropertySet(dict):
+ """A default dictionary-like object."""
+
+ def __missing__(self, key):
+ return None
+
+
+class RunState(Enum):
+ """Allowed values for the result of a pass execution."""
+
+ SUCCESS = 0
+ FAIL = 1
+ SKIP = 2
+
+
+@dataclass
+class WorkflowStatus:
+ """Collection of compilation status of workflow, i.e. pass manager run.
+
+ This data structure is initialized when the pass manager is run,
+ and recursively handed over to underlying tasks.
+ Each pass will update this status once after being executed, and the lifetime of the
+ workflow status object is the time during which the pass manager is running.
+ """
+
+ count: int = 0
+ """Current number of pass execution."""
+
+ completed_passes: set = field(default_factory=set)
+ """Passes already run that have not been invalidated."""
+
+ previous_run: RunState = RunState.FAIL
+ """Status of the latest pass run."""
+
+
+@dataclass
+class PassManagerState:
+ """A portable container object that pass manager tasks communicate through generator.
+
+ This object can contain every information about the running pass manager workflow,
+ except for the IR object being optimized.
+ The data structure consists of two elements; one for the status of the
+ workflow itself, and another one for the additional information about the IR
+ analyzed through pass executions. This container aims at just providing
+ a robust interface for the :meth:`.Task.execute`, and no logic that modifies
+ the container elements must be implemented.
+
+ This object is mutable, and might be mutated by pass executions.
+ """
+
+ workflow_status: WorkflowStatus
+ """Status of the current compilation workflow."""
+
+ property_set: PropertySet
+ """Information about IR being optimized."""
diff --git a/qiskit/passmanager/flow_controllers.py b/qiskit/passmanager/flow_controllers.py
index 6fe795222ac8..bf831b94b35c 100644
--- a/qiskit/passmanager/flow_controllers.py
+++ b/qiskit/passmanager/flow_controllers.py
@@ -10,149 +10,325 @@
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
-"""Pass flow controllers to provide pass iterator conditioned on the property set."""
+"""Built-in pass flow controllers."""
from __future__ import annotations
-from collections import OrderedDict
-from collections.abc import Sequence
-from typing import Union, List
+
import logging
+from collections.abc import Callable, Iterable, Generator
+from typing import Type, Any
-from .base_pass import GenericPass
+from qiskit.utils.deprecation import deprecate_func
+from .base_tasks import BaseController, Task
+from .compilation_status import PassManagerState, PropertySet
from .exceptions import PassManagerError
logger = logging.getLogger(__name__)
-class FlowController:
- """Base class for multiple types of working list.
-
- This class is a base class for multiple types of working list. When you iterate on it, it
- returns the next pass to run.
- """
+class FlowControllerLinear(BaseController):
+ """A standard flow controller that runs tasks one after the other."""
- registered_controllers = OrderedDict()
+ def __init__(
+ self,
+ tasks: Task | Iterable[Task] = (),
+ *,
+ options: dict[str, Any] | None = None,
+ ):
+ super().__init__(options)
+
+ if not isinstance(tasks, Iterable):
+ tasks = [tasks]
+ self.tasks: tuple[Task] = tuple(tasks)
+
+ @property
+ def passes(self) -> list[Task]:
+ """Alias of tasks for backward compatibility."""
+ return list(self.tasks)
+
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg="All tasks must be provided at construction time of the controller object.",
+ )
+ def append(
+ self,
+ passes: Task | list[Task],
+ ):
+ """Add new task to pipeline.
- def __init__(self, passes, options, **partial_controller):
- self._passes = passes
- self.passes = FlowController.controller_factory(passes, options, **partial_controller)
- self.options = options
+ Args:
+ passes: A new task or list of tasks to add.
+ """
+ if not isinstance(passes, Iterable):
+ passes = [passes]
+
+ tasks = list(self.tasks)
+ for task in passes:
+ if not isinstance(task, Task):
+ raise TypeError(
+ f"New task {task} is not a valid pass manager pass or flow controller."
+ )
+ tasks.append(task)
+ self.tasks = tuple(tasks)
+
+ def iter_tasks(self, state: PassManagerState) -> Generator[Task, PassManagerState, None]:
+ for task in self.tasks:
+ state = yield task
+
+
+class DoWhileController(BaseController):
+ """Run the given tasks in a loop until the ``do_while`` condition on the property set becomes
+ ``False``.
+
+ The given tasks will always run at least once, and on iteration of the loop, all the
+ tasks will be run (with the exception of a failure state being set)."""
+
+ def __init__(
+ self,
+ tasks: Task | Iterable[Task] = (),
+ do_while: Callable[[PropertySet], bool] = None,
+ *,
+ options: dict[str, Any] | None = None,
+ ):
+ super().__init__(options)
- def __iter__(self):
- yield from self.passes
+ if not isinstance(tasks, Iterable):
+ tasks = [tasks]
+ self.tasks: tuple[Task] = tuple(tasks)
+ self.do_while = do_while
- def dump_passes(self):
- """Fetches the passes added to this flow controller.
+ @property
+ def passes(self) -> list[Task]:
+ """Alias of tasks for backward compatibility."""
+ return list(self.tasks)
+
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg="All tasks must be provided at construction time of the controller object.",
+ )
+ def append(
+ self,
+ passes: Task | list[Task],
+ ):
+ """Add new task to pipeline.
- Returns:
- dict: {'options': self.options, 'passes': [passes], 'type': type(self)}
+ Args:
+ passes: A new task or list of tasks to add.
"""
- # TODO remove
- ret = {"options": self.options, "passes": [], "type": type(self)}
- for pass_ in self._passes:
- if isinstance(pass_, FlowController):
- ret["passes"].append(pass_.dump_passes())
- else:
- ret["passes"].append(pass_)
- return ret
+ if not isinstance(passes, Iterable):
+ passes = [passes]
+
+ tasks = list(self.tasks)
+ for task in passes:
+ if not isinstance(task, Task):
+ raise TypeError(
+ f"New task {task} is not a valid pass manager pass or flow controller."
+ )
+ tasks.append(task)
+ self.tasks = tuple(tasks)
+
+ def iter_tasks(self, state: PassManagerState) -> Generator[Task, PassManagerState, None]:
+ max_iteration = self._options.get("max_iteration", 1000)
+ for _ in range(max_iteration):
+ for task in self.tasks:
+ state = yield task
+ if not self.do_while(state.property_set):
+ return
+ raise PassManagerError("Maximum iteration reached. max_iteration=%i" % max_iteration)
- @classmethod
- def add_flow_controller(cls, name, controller):
- """Adds a flow controller.
- Args:
- name (string): Name of the controller to add.
- controller (type(FlowController)): The class implementing a flow controller.
- """
- cls.registered_controllers[name] = controller
+class ConditionalController(BaseController):
+ """A flow controller runs the pipeline once if the condition is true, or does nothing if the
+ condition is false."""
- @classmethod
- def remove_flow_controller(cls, name):
- """Removes a flow controller.
+ def __init__(
+ self,
+ tasks: Task | Iterable[Task] = (),
+ condition: Callable[[PropertySet], bool] = None,
+ *,
+ options: dict[str, Any] | None = None,
+ ):
+ super().__init__(options)
+
+ if not isinstance(tasks, Iterable):
+ tasks = [tasks]
+ self.tasks: tuple[Task] = tuple(tasks)
+ self.condition = condition
+
+ @property
+ def passes(self) -> list[Task]:
+ """Alias of tasks for backward compatibility."""
+ return list(self.tasks)
+
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg="All tasks must be provided at construction time of the controller object.",
+ )
+ def append(
+ self,
+ passes: Task | list[Task],
+ ):
+ """Add new task to pipeline.
Args:
- name (string): Name of the controller to remove.
- Raises:
- KeyError: If the controller to remove was not registered.
+ passes: A new task or list of tasks to add.
"""
- if name not in cls.registered_controllers:
- raise KeyError("Flow controller not found: %s" % name)
- del cls.registered_controllers[name]
+ if not isinstance(passes, Iterable):
+ passes = [passes]
+
+ tasks = list(self.tasks)
+ for task in passes:
+ if not isinstance(task, Task):
+ raise TypeError(
+ f"New task {task} is not a valid pass manager pass or flow controller."
+ )
+ tasks.append(task)
+ self.tasks = tuple(tasks)
+
+ def iter_tasks(self, state: PassManagerState) -> Generator[Task, PassManagerState, None]:
+ if self.condition(state.property_set):
+ for task in self.tasks:
+ state = yield task
+
+
+class FlowController(BaseController):
+ """A legacy factory for other flow controllers.
+
+ .. warning::
+
+ This class is primarily for compatibility with legacy versions of Qiskit, and in general,
+ you should prefer simply instantiating the controller you want, and adding it to the
+ relevant :class:`.PassManager` or other controller. Its use is deprecated.
+
+ This allows syntactic sugar for writing pipelines. For example::
+
+ FlowController.add_flow_controller("my_condition", CustomController)
+
+ controller = FlowController.controller_factory(
+ [PassA(), PassB()],
+ {"max_iteration": 1000},
+ condition=lambda prop_set: prop_set["x"] == 0,
+ do_while=lambda prop_set: prop_set["x"] < 100,
+ my_condition=lambda prop_set: prop_set["y"] = "abc",
+ )
+
+ This creates a nested flow controller that runs when the value :code:`x` in the
+ :class:`.PropertySet` is zero and repeats the pipeline until the value becomes 100.
+ In each innermost loop, the custom iteration condition provided by
+ the ``CustomController`` is also evaluated.
+
+ .. warning::
+
+ :class:`.BaseController` must be directly subclassed to define a custom flow controller.
+ This class provides a controller factory method, which consumes a class variable
+ :attr:`.registered_controllers`. Subclassing FlowController may cause
+ unexpected behavior in the factory method.
+ Note that factory method implicitly determines the priority of the builtin controllers
+ when multiple controllers are called together,
+ and the behavior of generated controller is hardly debugged.
+ """
+
+ registered_controllers = {
+ "condition": ConditionalController,
+ "do_while": DoWhileController,
+ }
+ hierarchy = [
+ "condition",
+ "do_while",
+ ]
@classmethod
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg=(
+ "Controller object must be explicitly instantiated. "
+ "Building controller with keyword arguments may yield race condition when "
+ "multiple keyword arguments are provided together, which is likely unsafe."
+ ),
+ )
def controller_factory(
cls,
- passes: Sequence[GenericPass | "FlowController"],
+ passes: Task | list[Task],
options: dict,
- **partial_controller,
+ **controllers,
):
- """Constructs a flow controller based on the partially evaluated controller arguments.
+ """Create a new flow controller with normalization.
Args:
- passes: passes to add to the flow controller.
- options: PassManager options.
- **partial_controller: Partially evaluated controller arguments in the form `{name:partial}`
-
- Raises:
- PassManagerError: When partial_controller is not well-formed.
+ passes: A list of optimization tasks.
+ options: Option for this flow controller.
+ controllers: Dictionary of controller callables keyed on flow controller alias.
Returns:
- FlowController: A FlowController instance.
+ An instance of normalized flow controller.
"""
- if None in partial_controller.values():
- raise PassManagerError("The controller needs a condition.")
+ if None in controllers.values():
+ raise PassManagerError("The controller needs a callable. Value cannot be None.")
+
+ if isinstance(passes, BaseController):
+ instance = passes
+ else:
+ instance = FlowControllerLinear(passes, options=options)
+
+ if controllers:
+ # Alias in higher hierarchy becomes outer controller.
+ for alias in cls.hierarchy[::-1]:
+ if alias not in controllers:
+ continue
+ class_type = cls.registered_controllers[alias]
+ init_kwargs = {
+ "options": options,
+ alias: controllers.pop(alias),
+ }
+ instance = class_type([instance], **init_kwargs)
+
+ return instance
- if partial_controller:
- for registered_controller in cls.registered_controllers.keys():
- if registered_controller in partial_controller:
- return cls.registered_controllers[registered_controller](
- passes, options, **partial_controller
- )
- raise PassManagerError("The controllers for %s are not registered" % partial_controller)
-
- return FlowControllerLinear(passes, options)
-
-
-class FlowControllerLinear(FlowController):
- """The basic controller runs the passes one after the other."""
-
- def __init__(self, passes, options): # pylint: disable=super-init-not-called
- self.passes = self._passes = passes
- self.options = options
-
-
-class DoWhileController(FlowController):
- """Implements a set of passes in a do-while loop."""
-
- def __init__(self, passes, options=None, do_while=None, **partial_controller):
- self.do_while = do_while
- self.max_iteration = options["max_iteration"] if options else 1000
- super().__init__(passes, options, **partial_controller)
-
- def __iter__(self):
- for _ in range(self.max_iteration):
- yield from self.passes
-
- if not self.do_while():
- return
-
- raise PassManagerError("Maximum iteration reached. max_iteration=%i" % self.max_iteration)
-
-
-class ConditionalController(FlowController):
- """Implements a set of passes under a certain condition."""
-
- def __init__(self, passes, options=None, condition=None, **partial_controller):
- self.condition = condition
- super().__init__(passes, options, **partial_controller)
+ @classmethod
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg=(
+ "Controller factory method is deprecated and managing the custom flow controllers "
+ "with alias no longer helps building the task pipeline. "
+ "Controllers must be explicitly instantiated and appended to the pipeline."
+ ),
+ )
+ def add_flow_controller(
+ cls,
+ name: str,
+ controller: Type[BaseController],
+ ):
+ """Adds a flow controller.
- def __iter__(self):
- if self.condition():
- yield from self.passes
+ Args:
+ name: Alias of controller class in the namespace.
+ controller: Flow controller class.
+ """
+ cls.registered_controllers[name] = controller
+ if name not in cls.hierarchy:
+ cls.hierarchy.append(name)
+ @classmethod
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg=(
+ "Controller factory method is deprecated and managing the custom flow controllers "
+ "with alias no longer helps building the task pipeline. "
+ "Controllers must be explicitly instantiated and appended to the pipeline."
+ ),
+ )
+ def remove_flow_controller(
+ cls,
+ name: str,
+ ):
+ """Removes a flow controller.
-# Alias to a sequence of all kind of pass elements
-PassSequence = Union[Union[GenericPass, FlowController], List[Union[GenericPass, FlowController]]]
+ Args:
+ name: Alias of the controller to remove.
-# Default controllers
-FlowController.add_flow_controller("condition", ConditionalController)
-FlowController.add_flow_controller("do_while", DoWhileController)
+ Raises:
+ KeyError: If the controller to remove was not registered.
+ """
+ if name not in cls.hierarchy:
+ raise KeyError("Flow controller not found: %s" % name)
+ del cls.registered_controllers[name]
+ cls.hierarchy.remove(name)
diff --git a/qiskit/passmanager/passmanager.py b/qiskit/passmanager/passmanager.py
index 935a3afb3918..fb9c8b523ba6 100644
--- a/qiskit/passmanager/passmanager.py
+++ b/qiskit/passmanager/passmanager.py
@@ -12,17 +12,20 @@
"""Manager for a set of Passes and their scheduling during transpilation."""
from __future__ import annotations
-from abc import ABC
-from collections.abc import Callable, Sequence
-from typing import Any
+
import logging
+from abc import ABC, abstractmethod
+from collections.abc import Callable, Sequence, Iterable
+from itertools import chain
+from typing import Any
+
import dill
-from qiskit.tools.parallel import parallel_map
-from .base_pass import GenericPass
-from .passrunner import BasePassRunner
+from qiskit.tools.parallel import parallel_map
+from .base_tasks import Task, PassManagerIR
from .exceptions import PassManagerError
-from .flow_controllers import FlowController, PassSequence
+from .flow_controllers import FlowControllerLinear
+from .compilation_status import PropertySet, WorkflowStatus, PassManagerState
logger = logging.getLogger(__name__)
@@ -30,79 +33,61 @@
class BasePassManager(ABC):
"""Pass manager base class."""
- PASS_RUNNER = BasePassRunner
-
def __init__(
self,
- passes: PassSequence | None = None,
+ tasks: Task | list[Task] = (),
max_iteration: int = 1000,
):
"""Initialize an empty pass manager object.
Args:
- passes: A pass set to be added to the pass manager schedule.
+ tasks: A pass set to be added to the pass manager schedule.
max_iteration: The maximum number of iterations the schedule will be looped if the
condition is not met.
"""
- # the pass manager's schedule of passes, including any control-flow.
- # Populated via PassManager.append().
- self._pass_sets: list[dict[str, Any]] = []
+ self._tasks = []
self.max_iteration = max_iteration
+ self.property_set = PropertySet()
- if passes is not None:
- self.append(passes)
+ if tasks:
+ self.append(tasks)
def append(
self,
- passes: PassSequence,
- **flow_controller_conditions: Callable,
+ tasks: Task | list[Task],
) -> None:
- """Append a Pass Set to the schedule of passes.
+ """Append tasks to the schedule of passes.
Args:
- passes: A set of passes (a pass set) to be added to schedule. A pass set is a list of
- passes that are controlled by the same flow controller. If a single pass is
- provided, the pass set will only have that pass a single element.
- It is also possible to append a
- :class:`~qiskit.transpiler.runningpassmanager.FlowController` instance and the
- rest of the parameter will be ignored.
- flow_controller_conditions: Dictionary of control flow plugins. Default:
-
- * do_while (callable property_set -> boolean): The passes repeat until the
- callable returns False.
- Default: `lambda x: False # i.e. passes run once`
-
- * condition (callable property_set -> boolean): The passes run only if the
- callable returns True.
- Default: `lambda x: True # i.e. passes run`
+ tasks: A set of pass manager tasks to be added to schedule.
+
+ Raises:
+ TypeError: When any element of tasks is not a subclass of passmanager Task.
"""
- passes = self._normalize_passes(passes)
- self._pass_sets.append({"passes": passes, "flow_controllers": flow_controller_conditions})
+ if isinstance(tasks, Task):
+ tasks = [tasks]
+ if any(not isinstance(t, Task) for t in tasks):
+ raise TypeError("Added tasks are not all valid pass manager task types.")
+
+ self._tasks.append(tasks)
def replace(
self,
index: int,
- passes: PassSequence,
- **flow_controller_conditions: Any,
+ tasks: Task | list[Task],
) -> None:
"""Replace a particular pass in the scheduler.
Args:
- index: Pass index to replace, based on the position in passes().
- passes: A pass set (as defined in :py:func:`qiskit.transpiler.PassManager.append`)
- to be added to the pass manager schedule.
- flow_controller_conditions: control flow plugins.
+ index: Task index to replace, based on the position in :meth:`tasks`
+ tasks: A set of pass manager tasks to be added to schedule.
Raises:
- PassManagerError: if a pass in passes is not a proper pass or index not found.
+ TypeError: When any element of tasks is not a subclass of passmanager Task.
+ PassManagerError: If the index is not found.
"""
- passes = self._normalize_passes(passes)
-
try:
- self._pass_sets[index] = {
- "passes": passes,
- "flow_controllers": flow_controller_conditions,
- }
+ self._tasks[index] = tasks
except IndexError as ex:
raise PassManagerError(f"Index to replace {index} does not exists") from ex
@@ -110,13 +95,13 @@ def remove(self, index: int) -> None:
"""Removes a particular pass in the scheduler.
Args:
- index: Pass index to replace, based on the position in passes().
+ index: Pass index to remove, based on the position in :meth:`passes`.
Raises:
- PassManagerError: if the index is not found.
+ PassManagerError: If the index is not found.
"""
try:
- del self._pass_sets[index]
+ del self._tasks[index]
except IndexError as ex:
raise PassManagerError(f"Index to replace {index} does not exists") from ex
@@ -124,25 +109,21 @@ def __setitem__(self, index, item):
self.replace(index, item)
def __len__(self):
- return len(self._pass_sets)
+ return len(self._tasks)
def __getitem__(self, index):
new_passmanager = self.__class__(max_iteration=self.max_iteration)
- _pass_sets = self._pass_sets[index]
- if isinstance(_pass_sets, dict):
- _pass_sets = [_pass_sets]
- new_passmanager._pass_sets = _pass_sets
+ new_passmanager._tasks = self._tasks[index]
return new_passmanager
def __add__(self, other):
+ new_passmanager = self.__class__(max_iteration=self.max_iteration)
+ new_passmanager._tasks = self._tasks
if isinstance(other, self.__class__):
- new_passmanager = self.__class__(max_iteration=self.max_iteration)
- new_passmanager._pass_sets = self._pass_sets + other._pass_sets
+ new_passmanager._tasks += other._tasks
return new_passmanager
else:
try:
- new_passmanager = self.__class__(max_iteration=self.max_iteration)
- new_passmanager._pass_sets += self._pass_sets
new_passmanager.append(other)
return new_passmanager
except PassManagerError as ex:
@@ -150,45 +131,59 @@ def __add__(self, other):
"unsupported operand type + for %s and %s" % (self.__class__, other.__class__)
) from ex
- def _normalize_passes(
+ @abstractmethod
+ def _passmanager_frontend(
+ self,
+ input_program: Any,
+ **kwargs,
+ ) -> PassManagerIR:
+ """Convert input program into pass manager IR.
+
+ Args:
+ in_program: Input program.
+
+ Returns:
+ Pass manager IR.
+ """
+ pass
+
+ @abstractmethod
+ def _passmanager_backend(
self,
- passes: PassSequence,
- ) -> Sequence[GenericPass | FlowController] | FlowController:
- if isinstance(passes, FlowController):
- return passes
- if isinstance(passes, GenericPass):
- passes = [passes]
- for pass_ in passes:
- if isinstance(pass_, FlowController):
- # Normalize passes in nested FlowController.
- # TODO: Internal renormalisation should be the responsibility of the
- # `FlowController`, but the separation between `FlowController`,
- # `RunningPassManager` and `PassManager` is so muddled right now, it would be better
- # to do this as part of more top-down refactoring. ---Jake, 2022-10-03.
- pass_.passes = self._normalize_passes(pass_.passes)
- elif not isinstance(pass_, GenericPass):
- raise PassManagerError(
- "%s is not a pass or FlowController instance " % pass_.__class__
- )
- return passes
+ passmanager_ir: PassManagerIR,
+ in_program: Any,
+ **kwargs,
+ ) -> Any:
+ """Convert pass manager IR into output program.
+
+ Args:
+ passmanager_ir: Pass manager IR after optimization.
+ in_program: The input program, this can be used if you need
+ any metadata about the original input for the output.
+ It should not be mutated.
+
+ Returns:
+ Output program.
+ """
+ pass
def run(
self,
in_programs: Any,
- callback: Callable | None = None,
- **metadata,
+ callback: Callable = None,
+ **kwargs,
) -> Any:
"""Run all the passes on the specified ``circuits``.
Args:
in_programs: Input programs to transform via all the registered passes.
callback: A callback function that will be called after each pass execution. The
- function will be called with 5 keyword arguments::
+ function will be called with 4 keyword arguments::
- pass_ (Pass): the pass being run
+ task (GenericPass): the pass being run
passmanager_ir (Any): depending on pass manager subclass
- time (float): the time to execute the pass
property_set (PropertySet): the property set
+ running_time (float): the time to execute the pass
count (int): the index for the pass execution
The exact arguments pass expose the internals of the pass
@@ -201,68 +196,120 @@ def run(
take in kwargs dict and access the variables. For example::
def callback_func(**kwargs):
- pass_ = kwargs['pass_']
- dag = kwargs['dag']
- time = kwargs['time']
+ task = kwargs['task']
+ passmanager_ir = kwargs['passmanager_ir']
property_set = kwargs['property_set']
+ running_time = kwargs['running_time']
count = kwargs['count']
...
- metadata: Metadata which might be attached to output program.
+ kwargs: Arbitrary arguments passed to the compiler frontend and backend.
Returns:
The transformed program(s).
"""
- if not self._pass_sets and not metadata and callback is None:
+ if not self._tasks and not kwargs and callback is None:
return in_programs
is_list = True
- if isinstance(in_programs, self.PASS_RUNNER.IN_PROGRAM_TYPE):
+ if not isinstance(in_programs, Sequence):
in_programs = [in_programs]
is_list = False
if len(in_programs) == 1:
- out_program = self._run_single_circuit(in_programs[0], callback, **metadata)
+ out_program = _run_workflow(
+ program=in_programs[0],
+ pass_manager=self,
+ callback=callback,
+ **kwargs,
+ )
if is_list:
return [out_program]
return out_program
- # TODO support for List(output_name) and List(callback)
- del metadata
del callback
+ del kwargs
- return self._run_several_circuits(in_programs)
-
- def _create_running_passmanager(self) -> BasePassRunner:
- # Must be implemented by followup PR.
- # BasePassRunner.append assumes normalized pass input, which is not pass_sets.
- raise NotImplementedError
-
- def _run_single_circuit(
- self,
- input_program: Any,
- callback: Callable | None = None,
- **metadata,
- ) -> Any:
- pass_runner = self._create_running_passmanager()
- return pass_runner.run(input_program, callback=callback, **metadata)
-
- def _run_several_circuits(
- self,
- input_programs: Sequence[Any],
- ) -> Any:
- # Pass runner may contain callable and we need to serialize through dill rather than pickle.
+ # Pass manager may contain callable and we need to serialize through dill rather than pickle.
# See https://github.com/Qiskit/qiskit-terra/pull/3290
# Note that serialized object is deserialized as a different object.
- # Thus, we can resue the same runner without state collision, without building it per thread.
+ # Thus, we can resue the same manager without state collision, without building it per thread.
return parallel_map(
- self._in_parallel, input_programs, task_kwargs={"pm_dill": dill.dumps(self)}
+ _run_workflow_in_new_process,
+ values=in_programs,
+ task_kwargs={"pass_manager_bin": dill.dumps(self)},
)
- @staticmethod
- def _in_parallel(
- in_program: Any,
- pm_dill: bytes = None,
- ) -> Any:
- pass_runner = dill.loads(pm_dill)._create_running_passmanager()
- return pass_runner.run(in_program)
+ def to_flow_controller(self) -> FlowControllerLinear:
+ """Linearize this manager into a single :class:`.FlowControllerLinear`,
+ so that it can be nested inside another pass manager.
+
+ Returns:
+ A linearized pass manager.
+ """
+ flatten_tasks = list(self._flatten_tasks(self._tasks))
+ return FlowControllerLinear(flatten_tasks)
+
+ def _flatten_tasks(self, elements: Iterable | Task) -> Iterable:
+ """A helper method to recursively flatten a nested task chain."""
+ if not isinstance(elements, Iterable):
+ return [elements]
+ return chain(*map(self._flatten_tasks, elements))
+
+
+def _run_workflow(
+ program: Any,
+ pass_manager: BasePassManager,
+ **kwargs,
+) -> Any:
+ """Run single program optimization with a pass manager.
+
+ Args:
+ program: Arbitrary program to optimize.
+ pass_manager: Pass manager with scheduled passes.
+ **kwargs: Keyword arguments for IR conversion.
+
+ Returns:
+ Optimized program.
+ """
+ flow_controller = pass_manager.to_flow_controller()
+ initial_status = WorkflowStatus()
+
+ passmanager_ir = pass_manager._passmanager_frontend(
+ input_program=program,
+ **kwargs,
+ )
+ passmanager_ir, _ = flow_controller.execute(
+ passmanager_ir=passmanager_ir,
+ state=PassManagerState(
+ workflow_status=initial_status,
+ property_set=pass_manager.property_set,
+ ),
+ callback=kwargs.get("callback", None),
+ )
+ out_program = pass_manager._passmanager_backend(
+ passmanager_ir=passmanager_ir,
+ in_program=program,
+ **kwargs,
+ )
+
+ return out_program
+
+
+def _run_workflow_in_new_process(
+ program: Any,
+ pass_manager_bin: bytes,
+) -> Any:
+ """Run single program optimization in new process.
+
+ Args:
+ program: Arbitrary program to optimize.
+ pass_manager_bin: Binary of the pass manager with scheduled passes.
+
+ Returns:
+ Optimized program.
+ """
+ return _run_workflow(
+ program=program,
+ pass_manager=dill.loads(pass_manager_bin),
+ )
diff --git a/qiskit/passmanager/passrunner.py b/qiskit/passmanager/passrunner.py
deleted file mode 100644
index 16126b02fe18..000000000000
--- a/qiskit/passmanager/passrunner.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# This code is part of Qiskit.
-#
-# (C) Copyright IBM 2023.
-#
-# This code is licensed under the Apache License, Version 2.0. You may
-# obtain a copy of this license in the LICENSE.txt file in the root directory
-# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
-#
-# Any modifications or derivative works of this code must retain this
-# copyright notice, and modified files need to carry a notice indicating
-# that they have been altered from the originals.
-
-"""Pass runner to apply transformation on passmanager IR."""
-from __future__ import annotations
-import logging
-import time
-from abc import ABC, abstractmethod
-from functools import partial
-from collections.abc import Callable
-from typing import Any
-
-from .base_pass import GenericPass
-from .exceptions import PassManagerError
-from .flow_controllers import FlowController, ConditionalController, DoWhileController
-from .propertyset import PropertySet
-
-logger = logging.getLogger(__name__)
-
-# NoneType is removed from types module in < Python3.10.
-NoneType = type(None)
-
-
-class BasePassRunner(ABC):
- """Pass runner base class."""
-
- IN_PROGRAM_TYPE = NoneType
- OUT_PROGRAM_TYPE = NoneType
- IR_TYPE = NoneType
-
- def __init__(self, max_iteration: int):
- """Initialize an empty pass runner object.
-
- Args:
- max_iteration: The schedule looping iterates until the condition is met or until
- max_iteration is reached.
- """
- self.callback = None
- self.count = None
- self.metadata = None
-
- # the pass manager's schedule of passes, including any control-flow.
- # Populated via PassManager.append().
- self.working_list = []
-
- # global property set is the context of the circuit held by the pass manager
- # as it runs through its scheduled passes. The flow controller
- # have read-only access (via the fenced_property_set).
- self.property_set = PropertySet()
-
- # passes already run that have not been invalidated
- self.valid_passes = set()
-
- # pass manager's overriding options for the passes it runs (for debugging)
- self.passmanager_options = {"max_iteration": max_iteration}
-
- def append(
- self,
- flow_controller: FlowController,
- ):
- """Append a flow controller to the schedule of controllers.
-
- Args:
- flow_controller: A normalized flow controller instance.
- """
- # We assume flow controller is already normalized.
- self.working_list.append(flow_controller)
-
- @abstractmethod
- def _to_passmanager_ir(self, in_program):
- """Convert input program into pass manager IR.
-
- Args:
- in_program: Input program.
-
- Returns:
- Pass manager IR.
- """
- pass
-
- @abstractmethod
- def _to_target(self, passmanager_ir, in_program):
- """Convert pass manager IR into output program.
-
- Args:
- passmanager_ir: Pass manager IR after optimization.
- in_program: The input program, this can be used if you need
- any metadata about the original input for the output. It
- should not be mutated.
-
- Returns:
- Output program.
- """
- pass
-
- @abstractmethod
- def _run_base_pass(
- self,
- pass_: GenericPass,
- passmanager_ir: Any,
- ) -> Any:
- """Do a single base pass.
-
- Args:
- pass_: A base pass to run.
- passmanager_ir: Pass manager IR.
-
- Returns:
- Pass manager IR with optimization.
- """
- pass
-
- def _run_pass_generic(
- self,
- pass_sequence: GenericPass | FlowController,
- passmanager_ir: Any,
- options: dict[str, Any] | None = None,
- ) -> Any:
- """Do either base pass or single flow controller.
-
- Args:
- pass_sequence: Base pass or flow controller to run.
- passmanager_ir: Pass manager IR.
- options: PassManager options.
-
- Returns:
- Pass manager IR with optimization.
-
- Raises:
- PassManagerError: When pass_sequence is not a valid class.
- TypeError: When IR type changed during transformation.
- """
- if isinstance(pass_sequence, GenericPass):
- # First, do the requirements of this pass
- for required_pass in pass_sequence.requires:
- passmanager_ir = self._run_pass_generic(
- pass_sequence=required_pass,
- passmanager_ir=passmanager_ir,
- options=options,
- )
- # Run the pass itself, if not already run
- if pass_sequence not in self.valid_passes:
- start_time = time.time()
- try:
- passmanager_ir = self._run_base_pass(
- pass_=pass_sequence,
- passmanager_ir=passmanager_ir,
- )
- finally:
- run_time = time.time() - start_time
- log_msg = f"Pass: {pass_sequence.name()} - {run_time * 1000:.5f} (ms)"
- logger.info(log_msg)
- if self.callback:
- self.callback(
- pass_=pass_sequence,
- passmanager_ir=passmanager_ir,
- time=run_time,
- property_set=self.property_set,
- count=self.count,
- )
- self.count += 1
- self._update_valid_passes(pass_sequence)
- if not isinstance(passmanager_ir, self.IR_TYPE):
- raise TypeError(
- f"A transformed object {passmanager_ir} is not valid IR in this pass manager. "
- "Object representation type must be preserved during transformation. "
- f"The pass {pass_sequence.name()} returns invalid object."
- )
- return passmanager_ir
-
- if isinstance(pass_sequence, FlowController):
- # This will be removed in followup PR. Code is temporary.
- fenced_property_set = getattr(self, "fenced_property_set")
-
- if isinstance(pass_sequence, ConditionalController) and not isinstance(
- pass_sequence.condition, partial
- ):
- pass_sequence.condition = partial(pass_sequence.condition, fenced_property_set)
- if isinstance(pass_sequence, DoWhileController) and not isinstance(
- pass_sequence.do_while, partial
- ):
- pass_sequence.do_while = partial(pass_sequence.do_while, fenced_property_set)
- for pass_ in pass_sequence:
- passmanager_ir = self._run_pass_generic(
- pass_sequence=pass_,
- passmanager_ir=passmanager_ir,
- options=pass_sequence.options,
- )
- return passmanager_ir
-
- raise PassManagerError(
- f"{pass_sequence.__class__} is not a valid base pass nor flow controller."
- )
-
- def run(
- self,
- in_program: Any,
- callback: Callable | None = None,
- **metadata,
- ) -> Any:
- """Run all the passes on an input program.
-
- Args:
- in_program: Input program to compile via all the registered passes.
- callback: A callback function that will be called after each pass execution.
- **metadata: Metadata attached to the output program.
-
- Returns:
- Compiled or optimized program.
-
- Raises:
- TypeError: When input or output object is unexpected type.
- """
- if not isinstance(in_program, self.IN_PROGRAM_TYPE):
- raise TypeError(
- f"Input object {in_program} is not valid type for this pass manager. "
- f"This pass manager accepts {self.IN_PROGRAM_TYPE}."
- )
-
- if callback:
- self.callback = callback
- self.count = 0
- self.metadata = metadata
-
- passmanager_ir = self._to_passmanager_ir(in_program)
-
- for controller in self.working_list:
- passmanager_ir = self._run_pass_generic(
- pass_sequence=controller,
- passmanager_ir=passmanager_ir,
- options=self.passmanager_options,
- )
- out_program = self._to_target(passmanager_ir, in_program)
- del in_program
-
- if not isinstance(out_program, self.OUT_PROGRAM_TYPE):
- raise TypeError(
- f"Output object {out_program} is not valid type for this pass manager. "
- f"This pass manager must return {self.OUT_PROGRAM_TYPE}."
- )
- return out_program
-
- def _update_valid_passes(self, pass_):
- self.valid_passes.add(pass_)
diff --git a/qiskit/passmanager/propertyset.py b/qiskit/passmanager/propertyset.py
deleted file mode 100644
index d13eeb7c0032..000000000000
--- a/qiskit/passmanager/propertyset.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# This code is part of Qiskit.
-#
-# (C) Copyright IBM 2017, 2018, 2023.
-#
-# This code is licensed under the Apache License, Version 2.0. You may
-# obtain a copy of this license in the LICENSE.txt file in the root directory
-# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
-#
-# Any modifications or derivative works of this code must retain this
-# copyright notice, and modified files need to carry a notice indicating
-# that they have been altered from the originals.
-
-"""A property set is maintained by the pass runner.
-
-
-This is sort of shared memory space among passes.
-"""
-
-
-class PropertySet(dict):
- """A default dictionary-like object"""
-
- def __missing__(self, key):
- return None
diff --git a/qiskit/transpiler/__init__.py b/qiskit/transpiler/__init__.py
index 8d4dd6b157ab..2a234401b013 100644
--- a/qiskit/transpiler/__init__.py
+++ b/qiskit/transpiler/__init__.py
@@ -1222,8 +1222,8 @@
.. autosummary::
:toctree: ../stubs/
- FencedDAGCircuit
FencedPropertySet
+ FencedDAGCircuit
Abstract Passes
---------------
diff --git a/qiskit/transpiler/basepasses.py b/qiskit/transpiler/basepasses.py
index 646bc836f130..02593c0bb9fd 100644
--- a/qiskit/transpiler/basepasses.py
+++ b/qiskit/transpiler/basepasses.py
@@ -11,24 +11,89 @@
# that they have been altered from the originals.
"""Base transpiler passes."""
+from __future__ import annotations
+import abc
from abc import abstractmethod
+from collections.abc import Callable, Hashable, Iterable
+from inspect import signature
-from qiskit.passmanager.base_pass import GenericPass
-from qiskit.passmanager.propertyset import PropertySet
+from qiskit.circuit import QuantumCircuit
+from qiskit.converters import circuit_to_dag, dag_to_circuit
+from qiskit.dagcircuit import DAGCircuit
+from qiskit.passmanager.base_tasks import GenericPass, PassManagerIR
+from qiskit.passmanager.compilation_status import PropertySet, RunState, PassManagerState
+from .exceptions import TranspilerError
from .layout import TranspileLayout
-class BasePass(GenericPass):
+class MetaPass(abc.ABCMeta):
+ """Metaclass for transpiler passes.
+
+ Enforces the creation of some fields in the pass while allowing passes to
+ override ``__init__``.
+ """
+
+ # Drop this functionality in the future.
+ # This metaclass provides a pass equivalence evaluation based on the constructor arguments.
+ # This implicit fake-hash based equivalence is fragile, and the pass developer must
+ # explicitly implement equivalence check logic for each pass if necessary.
+ # Currently, this metaclass is just here for backward compatibility, because
+ # circuit pass manager has a functionality to avoid multiple execution of the
+ # same pass (even though they are scheduled so). This is managed by the valid_passes set,
+ # and executed passes are added to this collection to avoid future execution.
+ # Dropping this metaclass causes many unittest failures and thus this is
+ # considered as a breaking API change.
+ # For example, test.python.transpiler.test_pass_scheduler.TestLogPasses.test_passes_in_linear
+
+ def __call__(cls, *args, **kwargs):
+ pass_instance = type.__call__(cls, *args, **kwargs)
+ pass_instance._hash = hash(MetaPass._freeze_init_parameters(cls, args, kwargs))
+ return pass_instance
+
+ @staticmethod
+ def _freeze_init_parameters(class_, args, kwargs):
+ self_guard = object()
+ init_signature = signature(class_.__init__)
+ bound_signature = init_signature.bind(self_guard, *args, **kwargs)
+ arguments = [("class_.__name__", class_.__name__)]
+ for name, value in bound_signature.arguments.items():
+ if value == self_guard:
+ continue
+ if isinstance(value, Hashable):
+ arguments.append((name, type(value), value))
+ else:
+ arguments.append((name, type(value), repr(value)))
+ return frozenset(arguments)
+
+
+class BasePass(GenericPass, metaclass=MetaPass):
"""Base class for transpiler passes."""
+ def __init__(self):
+ super().__init__()
+ self.preserves: Iterable[GenericPass] = []
+ self.property_set = PropertySet()
+ self._hash = hash(None)
+
+ def __hash__(self):
+ return self._hash
+
+ def __eq__(self, other):
+ # Note that this implementation is incorrect.
+ # This must be reimplemented in the future release.
+ # See the discussion below for details.
+ # https://github.com/Qiskit/qiskit/pull/10127#discussion_r1329982732
+ return hash(self) == hash(other)
+
@abstractmethod
- def run(self, dag): # pylint: disable=arguments-differ
+ def run(self, dag: DAGCircuit): # pylint: disable=arguments-differ
"""Run a pass on the DAGCircuit. This is implemented by the pass developer.
Args:
- dag (DAGCircuit): the dag on which the pass is run.
+ dag: the dag on which the pass is run.
+
Raises:
NotImplementedError: when this is left unimplemented for a pass.
"""
@@ -53,26 +118,43 @@ def is_analysis_pass(self):
"""
return isinstance(self, AnalysisPass)
- def __call__(self, circuit, property_set=None):
+ def execute(
+ self,
+ passmanager_ir: PassManagerIR,
+ state: PassManagerState,
+ callback: Callable = None,
+ ) -> tuple[PassManagerIR, PassManagerState]:
+ # For backward compatibility.
+ # Circuit passes access self.property_set.
+ self.property_set = state.property_set
+ return super().execute(
+ passmanager_ir=passmanager_ir,
+ state=state,
+ callback=callback,
+ )
+
+ def __call__(
+ self,
+ circuit: QuantumCircuit,
+ property_set: PropertySet | dict | None = None,
+ ) -> QuantumCircuit:
"""Runs the pass on circuit.
Args:
- circuit (QuantumCircuit): the dag on which the pass is run.
- property_set (PropertySet or dict or None): input/output property set. An analysis pass
+ circuit: The dag on which the pass is run.
+ property_set: Input/output property set. An analysis pass
might change the property set in-place.
Returns:
- QuantumCircuit: If on transformation pass, the resulting QuantumCircuit. If analysis
- pass, the input circuit.
+ If on transformation pass, the resulting QuantumCircuit.
+ If analysis pass, the input circuit.
"""
- from qiskit.converters import circuit_to_dag, dag_to_circuit
- from qiskit.dagcircuit.dagcircuit import DAGCircuit
-
property_set_ = None
if isinstance(property_set, dict): # this includes (dict, PropertySet)
property_set_ = PropertySet(property_set)
if isinstance(property_set_, PropertySet):
+ # pylint: disable=attribute-defined-outside-init
self.property_set = property_set_
result = self.run(circuit_to_dag(circuit))
@@ -116,10 +198,40 @@ def __call__(self, circuit, property_set=None):
class AnalysisPass(BasePass): # pylint: disable=abstract-method
"""An analysis pass: change property set, not DAG."""
- pass
-
class TransformationPass(BasePass): # pylint: disable=abstract-method
"""A transformation pass: change DAG, not property set."""
- pass
+ def execute(
+ self,
+ passmanager_ir: PassManagerIR,
+ state: PassManagerState,
+ callback: Callable = None,
+ ) -> tuple[PassManagerIR, PassManagerState]:
+ new_dag, state = super().execute(
+ passmanager_ir=passmanager_ir,
+ state=state,
+ callback=callback,
+ )
+
+ if state.workflow_status.previous_run == RunState.SUCCESS:
+ if isinstance(new_dag, DAGCircuit):
+ # Copy calibration data from the original program
+ new_dag.calibrations = passmanager_ir.calibrations
+ else:
+ raise TranspilerError(
+ "Transformation passes should return a transformed dag."
+ f"The pass {self.__class__.__name__} is returning a {type(new_dag)}"
+ )
+
+ return new_dag, state
+
+ def update_status(
+ self,
+ state: PassManagerState,
+ run_state: RunState,
+ ) -> PassManagerState:
+ state = super().update_status(state, run_state)
+ if run_state == RunState.SUCCESS:
+ state.workflow_status.completed_passes.intersection_update(set(self.preserves))
+ return state
diff --git a/qiskit/transpiler/fencedobjs.py b/qiskit/transpiler/fencedobjs.py
index 927f2866e5b0..8113e58d76d9 100644
--- a/qiskit/transpiler/fencedobjs.py
+++ b/qiskit/transpiler/fencedobjs.py
@@ -12,6 +12,7 @@
"""Fenced objects are wraps for raising TranspilerError when they are modified."""
+from qiskit.utils.deprecation import deprecate_func
from .exceptions import TranspilerError
@@ -19,6 +20,15 @@ class FencedObject:
"""Given an instance and a list of attributes to fence, raises a TranspilerError when one
of these attributes is accessed."""
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg=(
+ "Internal use of FencedObject is already removed from pass manager. "
+ "Implementation of a task subclass with protection for input object modification "
+ "is now responsibility of the developer."
+ ),
+ pending=True,
+ )
def __init__(self, instance, attributes_to_fence):
self._wrapped = instance
self._attributes_to_fence = attributes_to_fence
diff --git a/qiskit/transpiler/passmanager.py b/qiskit/transpiler/passmanager.py
index 0221f429cad7..93eee91e540a 100644
--- a/qiskit/transpiler/passmanager.py
+++ b/qiskit/transpiler/passmanager.py
@@ -12,19 +12,26 @@
"""Manager for a set of Passes and their scheduling during transpilation."""
from __future__ import annotations
+
import inspect
import io
import re
+import warnings
+from collections.abc import Iterator, Iterable, Callable
from functools import wraps
-from collections.abc import Iterator, Iterable, Callable, Sequence
from typing import Union, List, Any
from qiskit.circuit import QuantumCircuit
-from qiskit.passmanager import BasePassManager
-from qiskit.passmanager.flow_controllers import PassSequence, FlowController
+from qiskit.converters import circuit_to_dag, dag_to_circuit
+from qiskit.dagcircuit import DAGCircuit
+from qiskit.passmanager.passmanager import BasePassManager
+from qiskit.passmanager.base_tasks import Task, BaseController
+from qiskit.passmanager.flow_controllers import FlowController
from qiskit.passmanager.exceptions import PassManagerError
+from qiskit.utils.deprecation import deprecate_arg
from .basepasses import BasePass
from .exceptions import TranspilerError
+from .layout import TranspileLayout
from .runningpassmanager import RunningPassManager
_CircuitsT = Union[List[QuantumCircuit], QuantumCircuit]
@@ -33,27 +40,77 @@
class PassManager(BasePassManager):
"""Manager for a set of Passes and their scheduling during transpilation."""
- PASS_RUNNER = RunningPassManager
-
def __init__(
self,
- passes: PassSequence | None = None,
+ passes: Task | list[Task] = (),
max_iteration: int = 1000,
):
- """Initialize an empty `PassManager` object (with no passes scheduled).
+ """Initialize an empty pass manager object.
Args:
- passes: A pass set (as defined in :py:func:`qiskit.transpiler.PassManager.append`)
- to be added to the pass manager schedule.
+ passes: A pass set to be added to the pass manager schedule.
max_iteration: The maximum number of iterations the schedule will be looped if the
condition is not met.
"""
- super().__init__(passes, max_iteration)
- self.property_set = None
+ # For backward compatibility.
+ self._pass_sets = []
+
+ super().__init__(
+ tasks=passes,
+ max_iteration=max_iteration,
+ )
+
+ def _passmanager_frontend(
+ self,
+ input_program: QuantumCircuit,
+ **kwargs,
+ ) -> DAGCircuit:
+ return circuit_to_dag(input_program, copy_operations=False)
+ def _passmanager_backend(
+ self,
+ passmanager_ir: DAGCircuit,
+ in_program: QuantumCircuit,
+ **kwargs,
+ ) -> QuantumCircuit:
+ out_program = dag_to_circuit(passmanager_ir)
+
+ out_name = kwargs.get("output_name", None)
+ if out_name is not None:
+ out_program.name = out_name
+
+ if self.property_set["layout"] is not None:
+ out_program._layout = TranspileLayout(
+ initial_layout=self.property_set["layout"],
+ input_qubit_mapping=self.property_set["original_qubit_indices"],
+ final_layout=self.property_set["final_layout"],
+ _input_qubit_count=len(in_program.qubits),
+ _output_qubit_list=out_program.qubits,
+ )
+ out_program._clbit_write_latency = self.property_set["clbit_write_latency"]
+ out_program._conditional_latency = self.property_set["conditional_latency"]
+
+ if self.property_set["node_start_time"]:
+ # This is dictionary keyed on the DAGOpNode, which is invalidated once
+ # dag is converted into circuit. So this schedule information is
+ # also converted into list with the same ordering with circuit.data.
+ topological_start_times = []
+ start_times = self.property_set["node_start_time"]
+ for dag_node in passmanager_ir.topological_op_nodes():
+ topological_start_times.append(start_times[dag_node])
+ out_program._op_start_times = topological_start_times
+
+ return out_program
+
+ @deprecate_arg(
+ name="max_iteration",
+ since="0.25",
+ additional_msg="'max_iteration' can be set in the constructor.",
+ pending=True,
+ )
def append(
self,
- passes: PassSequence,
+ passes: Task | list[Task],
max_iteration: int = None,
**flow_controller_conditions: Any,
) -> None:
@@ -61,34 +118,56 @@ def append(
Args:
passes: A set of passes (a pass set) to be added to schedule. A pass set is a list of
- passes that are controlled by the same flow controller. If a single pass is
- provided, the pass set will only have that pass a single element.
- It is also possible to append a
- :class:`~qiskit.transpiler.runningpassmanager.FlowController` instance and the
- rest of the parameter will be ignored.
+ passes that are controlled by the same flow controller. If a single pass is
+ provided, the pass set will only have that pass a single element.
+ It is also possible to append a :class:`.BaseFlowController` instance and
+ the rest of the parameter will be ignored.
max_iteration: max number of iterations of passes.
- flow_controller_conditions: Dictionary of control flow plugins. Default:
+ flow_controller_conditions: Dictionary of control flow plugins.
+ Following built-in controllers are available by default:
- * do_while (callable property_set -> boolean): The passes repeat until the
- callable returns False.
- Default: `lambda x: False # i.e. passes run once`
+ * do_while: The passes repeat until the callable returns False. Corresponds to
+ :class:`.DoWhileController`.
+ * condition: The passes run only if the callable returns True. Corresponds to
+ :class:`.ConditionalController`.
- * condition (callable property_set -> boolean): The passes run only if the
- callable returns True.
- Default: `lambda x: True # i.e. passes run`
+ In general, you have more control simply by creating the controller you want and
+ passing it to :meth:`append`.
Raises:
TranspilerError: if a pass in passes is not a proper pass.
"""
if max_iteration:
- # TODO remove this argument from append
self.max_iteration = max_iteration
- super().append(passes, **flow_controller_conditions)
+ # Backward compatibility as of Terra 0.25
+ if isinstance(passes, Task):
+ passes = [passes]
+ self._pass_sets.append(
+ {
+ "passes": passes,
+ "flow_controllers": flow_controller_conditions,
+ }
+ )
+ if flow_controller_conditions:
+ passes = _legacy_build_flow_controller(
+ passes,
+ options={"max_iteration": self.max_iteration},
+ **flow_controller_conditions,
+ )
+
+ super().append(passes)
+
+ @deprecate_arg(
+ name="max_iteration",
+ since="0.25",
+ additional_msg="'max_iteration' can be set in the constructor.",
+ pending=True,
+ )
def replace(
self,
index: int,
- passes: PassSequence,
+ passes: Task | list[Task],
max_iteration: int = None,
**flow_controller_conditions: Any,
) -> None:
@@ -96,25 +175,72 @@ def replace(
Args:
index: Pass index to replace, based on the position in passes().
- passes: A pass set (as defined in :py:func:`qiskit.transpiler.PassManager.append`)
- to be added to the pass manager schedule.
+ passes: A pass set to be added to the pass manager schedule.
max_iteration: max number of iterations of passes.
- flow_controller_conditions: control flow plugins.
-
- Raises:
- TranspilerError: if a pass in passes is not a proper pass or index not found.
+ flow_controller_conditions: Dictionary of control flow plugins.
+ See :meth:`qiskit.transpiler.PassManager.append` for details.
"""
if max_iteration:
- # TODO remove this argument from append
self.max_iteration = max_iteration
- super().replace(index, passes, **flow_controller_conditions)
+
+ # Backward compatibility as of Terra 0.25
+ if isinstance(passes, Task):
+ passes = [passes]
+ try:
+ self._pass_sets[index] = {
+ "passes": passes,
+ "flow_controllers": flow_controller_conditions,
+ }
+ except IndexError as ex:
+ raise PassManagerError(f"Index to replace {index} does not exists") from ex
+ if flow_controller_conditions:
+ passes = _legacy_build_flow_controller(
+ passes,
+ options={"max_iteration": self.max_iteration},
+ **flow_controller_conditions,
+ )
+
+ super().replace(index, passes)
+
+ def remove(self, index: int) -> None:
+ super().remove(index)
+
+ # Backward compatibility as of Terra 0.25
+ del self._pass_sets[index]
+
+ def __getitem__(self, index):
+ new_passmanager = super().__getitem__(index)
+
+ # Backward compatibility as of Terra 0.25
+ _pass_sets = self._pass_sets[index]
+ if isinstance(_pass_sets, dict):
+ _pass_sets = [_pass_sets]
+ new_passmanager._pass_sets = _pass_sets
+ return new_passmanager
+
+ def __add__(self, other):
+ new_passmanager = super().__add__(other)
+
+ # Backward compatibility as of Terra 0.25
+ if isinstance(other, self.__class__):
+ new_passmanager._pass_sets = self._pass_sets
+ new_passmanager._pass_sets += other._pass_sets
+
+ # When other is not identical type, _pass_sets is also evaluated by self.append.
+ return new_passmanager
+
+ def to_flow_controller(self) -> RunningPassManager:
+ # For backward compatibility.
+ # This method will be resolved to the base class and return FlowControllerLinear
+ flatten_tasks = list(self._flatten_tasks(self._tasks))
+ return RunningPassManager(flatten_tasks)
# pylint: disable=arguments-differ
def run(
self,
circuits: _CircuitsT,
output_name: str | None = None,
- callback: Callable | None = None,
+ callback: Callable = None,
) -> _CircuitsT:
"""Run all the passes on the specified ``circuits``.
@@ -131,6 +257,12 @@ def run(
property_set (PropertySet): the property set
count (int): the index for the pass execution
+ .. note::
+
+ Beware that the keyword arguments here are different to those used by the
+ generic :class:`.BasePassManager`. This pass manager will translate those
+ arguments into the form described above.
+
The exact arguments pass expose the internals of the pass
manager and are subject to change as the pass manager internals
change. If you intend to reuse a callback function over
@@ -151,31 +283,15 @@ def callback_func(**kwargs):
Returns:
The transformed circuit(s).
"""
+ if callback is not None:
+ callback = _legacy_style_callback(callback)
+
return super().run(
in_programs=circuits,
callback=callback,
output_name=output_name,
)
- def _create_running_passmanager(self) -> RunningPassManager:
- running_passmanager = self.PASS_RUNNER(self.max_iteration)
- for pass_set in self._pass_sets:
- running_passmanager.append(pass_set["passes"], **pass_set["flow_controllers"])
- return running_passmanager
-
- def _run_single_circuit(
- self,
- input_program: QuantumCircuit,
- callback: Callable | None = None,
- **metadata,
- ) -> QuantumCircuit:
- pass_runner = self._create_running_passmanager()
- out_program = pass_runner.run(input_program, callback=callback, **metadata)
- # Store property set of pass runner for backward compatibility
- self.property_set = pass_runner.property_set
-
- return out_program
-
def draw(self, filename=None, style=None, raw=False):
"""Draw the pass manager.
@@ -218,19 +334,6 @@ def passes(self) -> list[dict[str, BasePass]]:
ret.append(item)
return ret
- def to_flow_controller(self) -> FlowController:
- """Linearize this manager into a single :class:`.FlowController`, so that it can be nested
- inside another :class:`.PassManager`."""
- return FlowController.controller_factory(
- [
- FlowController.controller_factory(
- pass_set["passes"], None, **pass_set["flow_controllers"]
- )
- for pass_set in self._pass_sets
- ],
- None,
- )
-
class StagedPassManager(PassManager):
"""A Pass manager pipeline built up of individual stages
@@ -354,10 +457,12 @@ def _generate_expanded_stages(self) -> Iterator[str]:
yield "post_" + stage
def _update_passmanager(self) -> None:
+ self._tasks = []
self._pass_sets = []
for stage in self.expanded_stages:
pm = getattr(self, stage, None)
if pm is not None:
+ self._tasks += pm._tasks
self._pass_sets.extend(pm._pass_sets)
def __setattr__(self, attr, value):
@@ -369,7 +474,7 @@ def __setattr__(self, attr, value):
def append(
self,
- passes: BasePass | Sequence[BasePass | FlowController],
+ passes: Task | list[Task],
max_iteration: int = None,
**flow_controller_conditions: Any,
) -> None:
@@ -394,6 +499,7 @@ def __getitem__(self, index):
# Do not inherit from the PassManager, i.e. super()
# It returns instance of self.__class__ which is StagedPassManager.
new_passmanager = PassManager(max_iteration=self.max_iteration)
+ new_passmanager._tasks = self._tasks[index]
_pass_sets = self._pass_sets[index]
if isinstance(_pass_sets, dict):
_pass_sets = [_pass_sets]
@@ -410,10 +516,6 @@ def __setitem__(self, index, item):
def __add__(self, other):
raise NotImplementedError
- def _create_running_passmanager(self) -> RunningPassManager:
- self._update_passmanager()
- return super()._create_running_passmanager()
-
def passes(self) -> list[dict[str, BasePass]]:
self._update_passmanager()
return super().passes()
@@ -458,3 +560,54 @@ def wrapper(*meth_args, **meth_kwargs):
continue
_wrapped = _replace_error(_method)
setattr(PassManager, _name, _wrapped)
+
+
+def _legacy_style_callback(callback: Callable):
+ def _wrapped_callable(task, passmanager_ir, property_set, running_time, count):
+ callback(
+ pass_=task,
+ dag=passmanager_ir,
+ time=running_time,
+ property_set=property_set,
+ count=count,
+ )
+
+ return _wrapped_callable
+
+
+def _legacy_build_flow_controller(
+ tasks: list[Task],
+ options: dict[str, Any],
+ **flow_controller_conditions,
+) -> BaseController:
+ """A legacy method to build flow controller with keyword arguments.
+
+ Args:
+ tasks: A list of tasks fed into custom flow controllers.
+ options: Option for flow controllers.
+ flow_controller_conditions: Callables keyed on the alias of the flow controller.
+
+ Returns:
+ A built controller.
+ """
+ warnings.warn(
+ "Building a flow controller with keyword arguments is going to be deprecated. "
+ "Custom controllers must be explicitly instantiated and appended to the task list.",
+ PendingDeprecationWarning,
+ stacklevel=3,
+ )
+ if isinstance(tasks, Task):
+ tasks = [tasks]
+ if any(not isinstance(t, Task) for t in tasks):
+ raise TypeError("Added tasks are not all valid pass manager task types.")
+ # Alias in higher hierarchy becomes outer controller.
+ for alias in FlowController.hierarchy[::-1]:
+ if alias not in flow_controller_conditions:
+ continue
+ class_type = FlowController.registered_controllers[alias]
+ init_kwargs = {
+ "options": options,
+ alias: flow_controller_conditions.pop(alias),
+ }
+ tasks = class_type(tasks, **init_kwargs)
+ return tasks
diff --git a/qiskit/transpiler/propertyset.py b/qiskit/transpiler/propertyset.py
index 6244a49d2c28..6ff160b14a43 100644
--- a/qiskit/transpiler/propertyset.py
+++ b/qiskit/transpiler/propertyset.py
@@ -14,9 +14,6 @@
about the current state of the circuit """
-from qiskit.passmanager import propertyset as passmanager_propertyset
-
-
-def __getattr__(name):
- # Just redirect to new module. This will be deprecated.
- return getattr(passmanager_propertyset, name)
+# For backward compatibility
+# pylint: disable=unused-import
+from qiskit.passmanager import PropertySet
diff --git a/qiskit/transpiler/runningpassmanager.py b/qiskit/transpiler/runningpassmanager.py
index 7962551e02d9..548b49833265 100644
--- a/qiskit/transpiler/runningpassmanager.py
+++ b/qiskit/transpiler/runningpassmanager.py
@@ -13,124 +13,89 @@
"""RunningPassManager class for the transpiler.
This object holds the state of a pass manager during running-time."""
from __future__ import annotations
-import logging
+
import inspect
-from functools import partial, wraps
+import logging
+from functools import wraps
from typing import Callable
from qiskit.circuit import QuantumCircuit
from qiskit.converters import circuit_to_dag, dag_to_circuit
-from qiskit.dagcircuit import DAGCircuit
-from qiskit.passmanager import BasePassRunner
+from qiskit.passmanager.compilation_status import PropertySet, WorkflowStatus, PassManagerState
+from qiskit.passmanager.base_tasks import Task
+from qiskit.passmanager.exceptions import PassManagerError
+from qiskit.utils.deprecation import deprecate_func
+
+# pylint: disable=unused-import
from qiskit.passmanager.flow_controllers import (
- PassSequence,
+ BaseController,
FlowController,
- DoWhileController,
+ FlowControllerLinear,
+ # for backward compatibility
ConditionalController,
+ DoWhileController,
)
-from qiskit.passmanager.exceptions import PassManagerError
-from qiskit.transpiler.basepasses import BasePass
+
from .exceptions import TranspilerError
-from .fencedobjs import FencedPropertySet, FencedDAGCircuit
from .layout import TranspileLayout
logger = logging.getLogger(__name__)
-class RunningPassManager(BasePassRunner):
- """A RunningPassManager is a running pass manager."""
+class RunningPassManager(FlowControllerLinear):
+ """A RunningPassManager is a running pass manager.
- IN_PROGRAM_TYPE = QuantumCircuit
- OUT_PROGRAM_TYPE = QuantumCircuit
- IR_TYPE = DAGCircuit
+ .. warning::
- def __init__(self, max_iteration: int):
- """Initialize an empty PassManager object (with no passes scheduled).
-
- Args:
- max_iteration: The schedule looping iterates until the condition is met or until
- max_iteration is reached.
- """
- super().__init__(max_iteration)
- self.fenced_property_set = FencedPropertySet(self.property_set)
+ :class:`.RunningPassManager` will be deprecated in the future release.
+ As of Qiskit Terra 0.25 this class becomes a subclass of the flow controller
+ with extra methods for backward compatibility.
+ Relying on a subclass of the running pass manager might break your code stack.
+ """
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg=(
+ "Building the pipline of the tasks is responsibility of PassManager. "
+ "RunningPassManager should not modify prepared pipeline at running time."
+ ),
+ )
def append(
self,
- passes: PassSequence,
+ passes: Task | list[Task],
**flow_controller_conditions,
):
"""Append a passes to the schedule of passes.
Args:
- passes: passes to be added to schedule
- flow_controller_conditions: See add_flow_controller(): Dictionary of
- control flow plugins. Default:
-
- * do_while (callable property_set -> boolean): The passes repeat until the
- callable returns False.
- Default: `lambda x: False # i.e. passes run once`
-
- * condition (callable property_set -> boolean): The passes run only if the
- callable returns True.
- Default: `lambda x: True # i.e. passes run`
+ passes: A set of passes (a pass set) to be added to schedule. A pass set is a list of
+ passes that are controlled by the same flow controller. If a single pass is
+ provided, the pass set will only have that pass a single element.
+ It is also possible to append a :class:`.BaseFlowController` instance and
+ the rest of the parameter will be ignored.
+ flow_controller_conditions: Dictionary of control flow plugins.
+ Following built-in controllers are available by default:
+
+ * do_while: The passes repeat until the callable returns False.
+ * condition: The passes run only if the callable returns True.
"""
- # attaches the property set to the controller so it has access to it.
- if isinstance(passes, ConditionalController):
- passes.condition = partial(passes.condition, self.fenced_property_set)
- elif isinstance(passes, DoWhileController):
- if not isinstance(passes.do_while, partial):
- passes.do_while = partial(passes.do_while, self.fenced_property_set)
+ if not isinstance(passes, BaseController):
+ normalized_controller = passes
else:
- flow_controller_conditions = self._normalize_flow_controller(flow_controller_conditions)
- passes = FlowController.controller_factory(
- passes, self.passmanager_options, **flow_controller_conditions
- )
- super().append(passes)
-
- def _normalize_flow_controller(self, flow_controller):
- for name, param in flow_controller.items():
- if callable(param):
- flow_controller[name] = partial(param, self.fenced_property_set)
- else:
- raise TranspilerError("The flow controller parameter %s is not callable" % name)
- return flow_controller
-
- def _to_passmanager_ir(self, in_program: QuantumCircuit) -> DAGCircuit:
- if not isinstance(in_program, QuantumCircuit):
- raise TranspilerError(f"Input {in_program.__class__} is not QuantumCircuit.")
- return circuit_to_dag(in_program)
-
- def _to_target(self, passmanager_ir: DAGCircuit, in_program: QuantumCircuit) -> QuantumCircuit:
- if not isinstance(passmanager_ir, DAGCircuit):
- raise TranspilerError(f"Input {passmanager_ir.__class__} is not DAGCircuit.")
-
- circuit = dag_to_circuit(passmanager_ir, copy_operations=False)
- circuit.name = self.metadata["output_name"]
-
- if self.property_set["layout"] is not None:
- circuit._layout = TranspileLayout(
- initial_layout=self.property_set["layout"],
- input_qubit_mapping=self.property_set["original_qubit_indices"],
- final_layout=self.property_set["final_layout"],
- _input_qubit_count=len(in_program.qubits),
- _output_qubit_list=circuit.qubits,
+ # Backward compatibility. Will be deprecated.
+ normalized_controller = FlowController.controller_factory(
+ passes=passes,
+ options=self._options,
+ **flow_controller_conditions,
)
- circuit._clbit_write_latency = self.property_set["clbit_write_latency"]
- circuit._conditional_latency = self.property_set["conditional_latency"]
-
- if self.property_set["node_start_time"]:
- # This is dictionary keyed on the DAGOpNode, which is invalidated once
- # dag is converted into circuit. So this schedule information is
- # also converted into list with the same ordering with circuit.data.
- topological_start_times = []
- start_times = self.property_set["node_start_time"]
- for dag_node in passmanager_ir.topological_op_nodes():
- topological_start_times.append(start_times[dag_node])
- circuit._op_start_times = topological_start_times
-
- return circuit
+ super().append(normalized_controller)
# pylint: disable=arguments-differ
+ @deprecate_func(
+ since="0.45.0",
+ additional_msg="Now RunningPassManager is a subclass of flow controller.",
+ pending=True,
+ )
def run(
self,
circuit: QuantumCircuit,
@@ -147,72 +112,40 @@ def run(
Returns:
QuantumCircuit: Transformed circuit.
"""
- return super().run(
- in_program=circuit,
- callback=_rename_callback_args(callback),
- output_name=output_name or circuit.name,
+ initial_status = WorkflowStatus()
+ property_set = PropertySet()
+ state = PassManagerState(workflow_status=initial_status, property_set=property_set)
+
+ passmanager_ir = circuit_to_dag(circuit)
+ passmanager_ir, state = super().execute(
+ passmanager_ir=passmanager_ir,
+ state=state,
+ callback=callback,
)
- def _run_base_pass(
- self,
- pass_: BasePass,
- passmanager_ir: DAGCircuit,
- ) -> DAGCircuit:
- """Do either a pass and its "requires" or FlowController.
-
- Args:
- pass_: A base pass to run.
- passmanager_ir: Pass manager IR, i.e. DAGCircuit for this class.
+ out_circuit = dag_to_circuit(passmanager_ir, copy_operations=False)
+ out_circuit.name = output_name
- Returns:
- The transformed dag in case of a transformation pass.
- The same input dag in case of an analysis pass.
+ if state.property_set["layout"] is not None:
+ circuit._layout = TranspileLayout(
+ initial_layout=state.property_set["layout"],
+ input_qubit_mapping=state.property_set["original_qubit_indices"],
+ final_layout=state.property_set["final_layout"],
+ )
+ circuit._clbit_write_latency = state.property_set["clbit_write_latency"]
+ circuit._conditional_latency = state.property_set["conditional_latency"]
- Raises:
- TranspilerError: When transform pass returns non DAGCircuit.
- TranspilerError: When pass is neither transform pass nor analysis pass.
- """
- pass_.property_set = self.property_set
-
- if pass_.is_transformation_pass:
- # Measure time if we have a callback or logging set
- new_dag = pass_.run(passmanager_ir)
- if isinstance(new_dag, DAGCircuit):
- new_dag.calibrations = passmanager_ir.calibrations
- else:
- raise TranspilerError(
- "Transformation passes should return a transformed dag."
- "The pass %s is returning a %s" % (type(pass_).__name__, type(new_dag))
- )
- passmanager_ir = new_dag
- elif pass_.is_analysis_pass:
- # Measure time if we have a callback or logging set
- pass_.run(FencedDAGCircuit(passmanager_ir))
- else:
- raise TranspilerError("I dont know how to handle this type of pass")
- return passmanager_ir
-
- def _update_valid_passes(self, pass_):
- super()._update_valid_passes(pass_)
- if not pass_.is_analysis_pass: # Analysis passes preserve all
- self.valid_passes.intersection_update(set(pass_.preserves))
-
-
-def _rename_callback_args(callback):
- """A helper function to run callback with conventional argument names."""
- if callback is None:
- return callback
-
- def _call_with_dag(pass_, passmanager_ir, time, property_set, count):
- callback(
- pass_=pass_,
- dag=passmanager_ir,
- time=time,
- property_set=property_set,
- count=count,
- )
+ if state.property_set["node_start_time"]:
+ # This is dictionary keyed on the DAGOpNode, which is invalidated once
+ # dag is converted into circuit. So this schedule information is
+ # also converted into list with the same ordering with circuit.data.
+ topological_start_times = []
+ start_times = state.property_set["node_start_time"]
+ for dag_node in passmanager_ir.topological_op_nodes():
+ topological_start_times.append(start_times[dag_node])
+ circuit._op_start_times = topological_start_times
- return _call_with_dag
+ return circuit
# A temporary error handling with slight overhead at class loading.
diff --git a/releasenotes/notes/add-passmanager-module-3ae30cff52cb83f1.yaml b/releasenotes/notes/add-passmanager-module-3ae30cff52cb83f1.yaml
index a3df013fcf40..9e2947b7e530 100644
--- a/releasenotes/notes/add-passmanager-module-3ae30cff52cb83f1.yaml
+++ b/releasenotes/notes/add-passmanager-module-3ae30cff52cb83f1.yaml
@@ -3,24 +3,96 @@ features:
- |
A new module :mod:`qiskit.passmanager` is added.
This module implements a generic pass manager and flow controllers,
- and provides an infrastructure to manage execution of transform passes.
- The pass manager is a baseclass and not aware of the input and output object types,
- and you need to create a subclass of the pass manager
- for a particular program data to optimize.
+ and provides infrastructure to manage execution of pass manager tasks.
+ The pass manager is a base class and not aware of the input and output object types,
+ and subclass must be created for a particular program type to optimize.
The :mod:`qiskit.transpiler` module is also reorganized to rebuild the existing
- quantum circuit pass manager based off of new generic pass manager.
+ pass manager based off of the generic pass manager.
See upgrade notes for more details.
upgrade:
- |
- :class:`qiskit.transpiler.PassManager` is now a subclass of
- :class:`qiskit.passmanager.BasePassManager`. There is no functional modification
- due to this class hierarchy change.
- - |
- New error baseclass :class:`~qiskit.passmanager.PassManagerError` is introduced.
- This will replace :class:`~qiskit.transpiler.TranspilerError` raised in the
- pass handling machinery. The TranspilerError is now only used for the errors
- related to the failure in handling the quantum circuit or DAG circuit object.
- Note that the TranspilerError can be caught by the PassManagerError
- because of their class hierarchy. For backward compatibility,
- :class:`qiskit.transpiler.PassManager` catches PassManagerError and
- re-raises the TranspilerError. This error replacement will be dropped in future.
+ One new base class for passes (:class:`.GenericPass`) and one for flow controllers (:class:`.BaseController`)
+ are introduced in the :mod:`qiskit.passmanager` module.
+ Because the flow controller is a collection of passes and a controller can be recursively
+ nested into the task pipeline, new classes are designed with the idea of
+ the composite pattern, and the interface class :class:`.passmanager.Task`
+ is also introduced. This class defines the signature of a :meth:`.Task.execute` method.
+ This unified design eliminates complexity of the conventional pass manager;
+ the execution logic dispatch and task structure renormalization
+ are no longer necessary, whom the :class:`.RunningPassManager` used to be responsible for.
+ Existing flow controllers :class:`.FlowControllerLinear`,
+ :class:`.ConditionalController`, and :class:`.DoWhileController` are now subclasses of
+ the :class:`.BaseController`.
+ Note that these controllers are no longer iterable, as they drop the implementation of
+ :meth:`~object.__iter__` method; they are now only iterable in the context of a flow-controller
+ execution, which threads the compilation state through after each inner task is executed.
+ - |
+ The :class:`.RunningPassManager` becomes largely an alias of :class:`.FlowControllerLinear`,
+ and this class will be completely replaced with the flow controller in the feature release.
+ This means the running pass manager becomes a stateless flow controller, and
+ the pass manager framework consists of :class:`.BasePassManager` and :class:`.BaseController`.
+ The pass manager is responsible for the construction of task pipeline,
+ while the controller is responsible for the execution of associated tasks.
+ Subclassing the :class:`.RunningPassManager` is no longer recommended.
+ - |
+ A new class :class:`.WorkflowStatus` is introduced to track the status of pass manager workflow.
+ This portable object is created when the pass manager is run,
+ and handed over to the underlying tasks.
+ Such status was previously managed by the :class:`.RunningPassManager` with instance variables,
+ however, now running pass manager becomes a controller object.
+ - |
+ The transpiler-specific (:func:`.transpile`) :class:`.transpiler.PassManager` is now a subclass of
+ the :class:`.passmanager.BasePassManager`.
+ There is no API break at public member level due to this class hierarchy change.
+ - |
+ A new exception :exc:`~qiskit.passmanager.PassManagerError` is introduced as the base class of
+ exceptions raised during pass-manager execution. The transpiler-specific
+ :class:`.transpile.PassManager` continues to raise :exc:`.TranspilerError`, which is now a
+ subclass of :exc:`.PassManagerError`, for errors raised by specific tasks. A generic failure of
+ the pass-manager machinery, typically indicating programmer error and not recoverable, will
+ raise :exc:`.PassManagerError` for general pass managers, but :class:`.transpile.PassManager`
+ will currently wrap this in its specific :exc:`.TranspilerError` for backwards compatibility.
+ This wrapping will be removed in the future.
+ - |
+ Use of :class:`.FencedObject` in the pass manager framework is removed.
+ These wrapper class cannot protect mutable object attribute from modification,
+ and protection doesn't matter as long as the code is properly implemented;
+ analysis passes should not modify an input IR,
+ controllers should not update the property set, and so forth.
+ Implementation of the proper code is the responsibility of pass manager developer.
+deprecations:
+ - |
+ The flow controller factory method :meth:`.FlowController.controller_factory` is deprecated
+ along with :meth:`.FlowController.add_flow_controller` and
+ :meth:`.FlowController.remove_flow_controller`,
+ as we are also going to deprecate task construction with
+ keyword arguments in the :meth:`.BasePassManager.append` method.
+ Controllers must be explicitly instantiated and appended to the pass manager.
+ For example, conventional syntax
+
+ .. code-block:: python
+
+ pm.append([task1, task2], condition=lambda x: x["value1"] > 10)
+
+ must be replaced with
+
+ .. code-block:: python
+
+ controller = ConditionalController([task1, task2], condition=lambda x: x["value1"] > 10)
+ pm.append(controller)
+
+ The latter allows more precise control on the order of controllers
+ especially when multiple keyword arguments are specified together, and allows for the
+ construction of general flow controllers that may have more than one pipeline or do not take a
+ single simple conditional function in their constructors.
+ - |
+ The :meth:`.FlowControllerLinear.append`, :meth:`.DoWhileController.append`,
+ and :meth:`.ConditionalController.append` methods are all deprecated immediately.
+ The construction of pass manager task pipeline is the role of :class:`.BasePassManager`,
+ and individual flow controller do not need to implement method like this. For a flow
+ controller, you should pass all the passes in one go directly to the constructor.
+ - |
+ The general attribute and variable name :code:`passes` is replaced with :code:`tasks`
+ all over the :mod:`qiskit.passmanager` module. Note that a task must indicate a union of pass
+ and controller, and the singular form `pass` conflicts with the Python keyword.
+ In this sense, use of `tasks` is much preferable.
diff --git a/test/python/transpiler/test_pass_scheduler.py b/test/python/transpiler/test_pass_scheduler.py
index 0edd49d4583c..a52ad8b1ece2 100644
--- a/test/python/transpiler/test_pass_scheduler.py
+++ b/test/python/transpiler/test_pass_scheduler.py
@@ -32,7 +32,6 @@
PassD_TP_NR_NP,
PassE_AP_NR_NP,
PassF_reduce_dag_property,
- PassI_Bad_AP,
PassJ_Bad_NoReturn,
PassK_check_fixed_point_property,
PassM_AP_NR_NP,
@@ -241,8 +240,9 @@ def test_conditional_and_loop(self):
def test_loop_and_conditional(self):
"""Run a loop first, then a conditional."""
- FlowController.remove_flow_controller("condition")
- FlowController.add_flow_controller("condition", ConditionalController)
+ with self.assertWarns(DeprecationWarning):
+ FlowController.remove_flow_controller("condition")
+ FlowController.add_flow_controller("condition", ConditionalController)
self.passmanager.append(PassK_check_fixed_point_property())
self.passmanager.append(
@@ -339,23 +339,6 @@ def test_non_idempotent_pass(self):
],
)
- def test_fenced_dag(self):
- """Analysis passes are not allowed to modified the DAG."""
- qr = QuantumRegister(2)
- circ = QuantumCircuit(qr)
- circ.cx(qr[0], qr[1])
- circ.cx(qr[0], qr[1])
- circ.cx(qr[1], qr[0])
- circ.cx(qr[1], qr[0])
-
- self.passmanager.append(PassI_Bad_AP())
- self.assertSchedulerRaises(
- circ,
- self.passmanager,
- ["run analysis pass PassI_Bad_AP", "cx_runs: {(4, 5, 6, 7)}"],
- TranspilerError,
- )
-
def test_analysis_pass_is_idempotent(self):
"""Analysis passes are idempotent."""
passmanager = PassManager()
@@ -609,13 +592,16 @@ def test_nested_conditional_in_loop(self):
class DoXTimesController(FlowController):
"""A control-flow plugin for running a set of passes an X amount of times."""
- def __init__(self, passes, options, do_x_times=0, **_):
- self.do_x_times = do_x_times()
- super().__init__(passes, options)
+ def __init__(self, passes, options, do_x_times, **_):
+ super().__init__(options)
+ self.passes = passes
+ self.do_x_times = do_x_times
- def __iter__(self):
- for _ in range(self.do_x_times):
- yield from self.passes
+ # pylint: disable=missing-function-docstring
+ def iter_tasks(self, metadata):
+ for _ in range(self.do_x_times(metadata.property_set)):
+ for pass_ in self.passes:
+ metadata = yield pass_
class TestControlFlowPlugin(SchedulerTestCase):
@@ -628,7 +614,8 @@ def setUp(self):
def test_control_flow_plugin(self):
"""Adds a control flow plugin with a single parameter and runs it."""
- FlowController.add_flow_controller("do_x_times", DoXTimesController)
+ with self.assertWarns(DeprecationWarning):
+ FlowController.add_flow_controller("do_x_times", DoXTimesController)
self.passmanager.append([PassB_TP_RA_PA(), PassC_TP_RA_PA()], do_x_times=lambda x: 3)
self.assertScheduler(
self.circuit,
@@ -647,9 +634,11 @@ def test_control_flow_plugin(self):
def test_callable_control_flow_plugin(self):
"""Removes do_while, then adds it back. Checks max_iteration still working."""
controllers_length = len(FlowController.registered_controllers)
- FlowController.remove_flow_controller("do_while")
+ with self.assertWarns(DeprecationWarning):
+ FlowController.remove_flow_controller("do_while")
self.assertEqual(controllers_length - 1, len(FlowController.registered_controllers))
- FlowController.add_flow_controller("do_while", DoWhileController)
+ with self.assertWarns(DeprecationWarning):
+ FlowController.add_flow_controller("do_while", DoWhileController)
self.assertEqual(controllers_length, len(FlowController.registered_controllers))
self.passmanager.append(
[PassB_TP_RA_PA(), PassC_TP_RA_PA()],
@@ -671,16 +660,9 @@ def test_callable_control_flow_plugin(self):
def test_remove_nonexistent_plugin(self):
"""Tries to remove a plugin that does not exist."""
- self.assertRaises(KeyError, FlowController.remove_flow_controller, "foo")
-
- def test_bad_conditional(self):
- """Flow controller are not allowed to modify the property set."""
-
- def bad_condition(property_set):
- property_set["property"] = "forbidden write"
-
- self.passmanager.append(PassA_TP_NR_NP(), condition=bad_condition)
- self.assertRaises(TranspilerError, self.passmanager.run, self.circuit)
+ with self.assertRaises(KeyError):
+ with self.assertWarns(DeprecationWarning):
+ FlowController.remove_flow_controller("foo")
class TestDumpPasses(SchedulerTestCase):
@@ -725,7 +707,8 @@ def test_passes_in_linear(self):
def test_control_flow_plugin(self):
"""Dump passes in a custom flow controller."""
passmanager = PassManager()
- FlowController.add_flow_controller("do_x_times", DoXTimesController)
+ with self.assertWarns(DeprecationWarning):
+ FlowController.add_flow_controller("do_x_times", DoXTimesController)
passmanager.append([PassB_TP_RA_PA(), PassC_TP_RA_PA()], do_x_times=lambda x: 3)
expected = [
@@ -821,7 +804,8 @@ def test_passes_in_linear(self):
def test_control_flow_plugin(self):
"""Dump passes in a custom flow controller."""
passmanager = PassManager()
- FlowController.add_flow_controller("do_x_times", DoXTimesController)
+ with self.assertWarns(DeprecationWarning):
+ FlowController.add_flow_controller("do_x_times", DoXTimesController)
passmanager.append([PassB_TP_RA_PA(), PassC_TP_RA_PA()], do_x_times=lambda x: 3)
self.assertPassLog(
passmanager,
diff --git a/test/python/transpiler/test_passmanager.py b/test/python/transpiler/test_passmanager.py
index 871947937a5e..78d18f3fc4ed 100644
--- a/test/python/transpiler/test_passmanager.py
+++ b/test/python/transpiler/test_passmanager.py
@@ -21,7 +21,8 @@
from qiskit import QuantumRegister, QuantumCircuit
from qiskit.circuit.library import U2Gate
from qiskit.converters import circuit_to_dag
-from qiskit.transpiler import PassManager, PropertySet, TransformationPass, FlowController
+from qiskit.passmanager.flow_controllers import FlowControllerLinear
+from qiskit.transpiler import PassManager, PropertySet, TransformationPass
from qiskit.transpiler.passes import CommutativeCancellation
from qiskit.transpiler.passes import Optimize1qGates, Unroller
from qiskit.test import QiskitTestCase
@@ -147,7 +148,7 @@ def make_inner(prefix):
inner.append(DummyPass(f"{prefix} 4"), do_while=repeat(1))
return inner.to_flow_controller()
- self.assertIsInstance(make_inner("test"), FlowController)
+ self.assertIsInstance(make_inner("test"), FlowControllerLinear)
outer = PassManager()
outer.append(make_inner("first"))