diff --git a/qiskit/pulse/builder.py b/qiskit/pulse/builder.py index f0ed02eaa006..4656c442dc64 100644 --- a/qiskit/pulse/builder.py +++ b/qiskit/pulse/builder.py @@ -444,6 +444,7 @@ import contextvars import functools import itertools +import sys import uuid import warnings from contextlib import contextmanager @@ -481,6 +482,12 @@ from qiskit.pulse.schedule import Schedule, ScheduleBlock from qiskit.pulse.transforms.alignments import AlignmentKind +if sys.version_info >= (3, 8): + from functools import singledispatchmethod # pylint: disable=no-name-in-module +else: + from singledispatchmethod import singledispatchmethod + + #: contextvars.ContextVar[BuilderContext]: active builder BUILDER_CONTEXTVAR = contextvars.ContextVar("backend") @@ -585,8 +592,7 @@ def __init__( if isinstance(block, ScheduleBlock): root_block = block elif isinstance(block, Schedule): - root_block = ScheduleBlock() - root_block.append(instructions.Call(subroutine=block)) + root_block = self._naive_typecast_schedule(block) else: raise exceptions.PulseError( f"Input `block` type {block.__class__.__name__} is " @@ -650,7 +656,7 @@ def get_context(self) -> ScheduleBlock: """Get current context. Notes: - New instruction can be added by `.append_block` or `.append_instruction` method. + New instruction can be added by `.append_subroutine` or `.append_instruction` method. Use above methods rather than directly accessing to the current context. """ return self._context_stack[-1] @@ -692,7 +698,7 @@ def compile(self) -> ScheduleBlock: while len(self._context_stack) > 1: current = self.pop_context() - self.append_block(current) + self.append_subroutine(current) return self._context_stack[0] @@ -706,7 +712,7 @@ def _compile_lazy_circuit(self): lazy_circuit = self._lazy_circuit # reset lazy circuit self._lazy_circuit = self._new_circuit() - self.call_subroutine(subroutine=self._compile_circuit(lazy_circuit)) + self.call_subroutine(self._compile_circuit(lazy_circuit)) def _compile_circuit(self, circ) -> Schedule: """Take a QuantumCircuit and output the pulse schedule associated with the circuit.""" @@ -731,17 +737,6 @@ def append_instruction(self, instruction: instructions.Instruction): """ self._context_stack[-1].append(instruction) - @_compile_lazy_circuit_before - def append_block(self, context_block: ScheduleBlock): - """Add a :class:`ScheduleBlock` to the builder's context schedule. - - Args: - context_block: ScheduleBlock to append to the current context block. - """ - # ignore empty context - if len(context_block) > 0: - self._context_stack[-1].append(context_block) - def append_reference(self, name: str, *extra_keys: str): """Add external program as a :class:`~qiskit.pulse.instructions.Reference` instruction. @@ -752,6 +747,32 @@ def append_reference(self, name: str, *extra_keys: str): inst = instructions.Reference(name, *extra_keys) self.append_instruction(inst) + @_compile_lazy_circuit_before + def append_subroutine(self, subroutine: Union[Schedule, ScheduleBlock]): + """Append a :class:`ScheduleBlock` to the builder's context schedule. + + This operation doesn't create a reference. Subroutine is directly + appended to current context schedule. + + Args: + subroutine: ScheduleBlock to append to the current context block. + + Raises: + PulseError: When subroutine is not Schedule nor ScheduleBlock. + """ + if not isinstance(subroutine, (ScheduleBlock, Schedule)): + raise exceptions.PulseError( + f"'{subroutine.__class__.__name__}' is not valid data format in the builder. " + "'Schedule' and 'ScheduleBlock' can be appended to the builder context." + ) + + if len(subroutine) == 0: + return + if isinstance(subroutine, Schedule): + subroutine = self._naive_typecast_schedule(subroutine) + self._context_stack[-1].append(subroutine) + + @singledispatchmethod def call_subroutine( self, subroutine: Union[circuit.QuantumCircuit, Schedule, ScheduleBlock], @@ -778,34 +799,37 @@ def call_subroutine( Raises: PulseError: - - When specified parameter is not contained in the subroutine - When input subroutine is not valid data format. """ - if isinstance(subroutine, circuit.QuantumCircuit): - self._compile_lazy_circuit() - subroutine = self._compile_circuit(subroutine) - - if not isinstance(subroutine, (Schedule, ScheduleBlock)): - raise exceptions.PulseError( - f"Subroutine type {subroutine.__class__.__name__} is " - "not valid data format. Call QuantumCircuit, " - "Schedule, or ScheduleBlock." - ) + raise exceptions.PulseError( + f"Subroutine type {subroutine.__class__.__name__} is " + "not valid data format. Call QuantumCircuit, " + "Schedule, or ScheduleBlock." + ) - if len(subroutine) == 0: + @call_subroutine.register + def _( + self, + target_block: ScheduleBlock, + name: Optional[str] = None, + value_dict: Optional[Dict[ParameterExpression, ParameterValueType]] = None, + **kw_params: ParameterValueType, + ): + if len(target_block) == 0: return # Create local parameter assignment local_assignment = dict() for param_name, value in kw_params.items(): - params = subroutine.get_parameters(param_name) + params = target_block.get_parameters(param_name) if not params: raise exceptions.PulseError( f"Parameter {param_name} is not defined in the target subroutine. " - f'{", ".join(map(str, subroutine.parameters))} can be specified.' + f'{", ".join(map(str, target_block.parameters))} can be specified.' ) for param in params: local_assignment[param] = value + if value_dict: if local_assignment.keys() & value_dict.keys(): warnings.warn( @@ -816,22 +840,54 @@ def call_subroutine( ) local_assignment.update(value_dict) - if isinstance(subroutine, ScheduleBlock): - # If subroutine is schedule block, use reference mechanism. - if local_assignment: - subroutine = subroutine.assign_parameters(local_assignment, inplace=False) - if name is None: - # Add unique string, not to accidentally override existing reference entry. - keys = (subroutine.name, uuid.uuid4().hex) - else: - keys = (name,) - self.append_reference(*keys) - self.get_context().assign_references({keys: subroutine}, inplace=True) + if local_assignment: + target_block = target_block.assign_parameters(local_assignment, inplace=False) + + if name is None: + # Add unique string, not to accidentally override existing reference entry. + keys = (target_block.name, uuid.uuid4().hex) else: - # If subroutine is schedule, use Call instruction. - name = name or subroutine.name - call_instruction = instructions.Call(subroutine, local_assignment, name) - self.append_instruction(call_instruction) + keys = (name,) + + self.append_reference(*keys) + self.get_context().assign_references({keys: target_block}, inplace=True) + + @call_subroutine.register + def _( + self, + target_schedule: Schedule, + name: Optional[str] = None, + value_dict: Optional[Dict[ParameterExpression, ParameterValueType]] = None, + **kw_params: ParameterValueType, + ): + if len(target_schedule) == 0: + return + + self.call_subroutine( + self._naive_typecast_schedule(target_schedule), + name=name, + value_dict=value_dict, + **kw_params, + ) + + @call_subroutine.register + def _( + self, + target_circuit: circuit.QuantumCircuit, + name: Optional[str] = None, + value_dict: Optional[Dict[ParameterExpression, ParameterValueType]] = None, + **kw_params: ParameterValueType, + ): + if len(target_circuit) == 0: + return + + self._compile_lazy_circuit() + self.call_subroutine( + self._compile_circuit(target_circuit), + name=name, + value_dict=value_dict, + **kw_params, + ) @_requires_backend def call_gate(self, gate: circuit.Gate, qubits: Tuple[int, ...], lazy: bool = True): @@ -870,6 +926,21 @@ def _call_gate(self, gate, qargs): self._lazy_circuit.append(gate, qargs=qargs) + @staticmethod + def _naive_typecast_schedule(schedule: Schedule): + # Naively convert into ScheduleBlock + from qiskit.pulse.transforms import inline_subroutines, flatten, pad + + preprocessed_schedule = inline_subroutines(flatten(schedule)) + pad(preprocessed_schedule, inplace=True, pad_with=instructions.TimeBlockade) + + # default to left alignment, namely ASAP scheduling + target_block = ScheduleBlock(name=schedule.name) + for _, inst in preprocessed_schedule.instructions: + target_block.append(inst, inplace=True) + + return target_block + def build( backend=None, @@ -973,21 +1044,9 @@ def append_schedule(schedule: Union[Schedule, ScheduleBlock]): """Call a schedule by appending to the active builder's context block. Args: - schedule: Schedule to append. - - Raises: - PulseError: When input `schedule` is invalid data format. + schedule: Schedule or ScheduleBlock to append. """ - if isinstance(schedule, Schedule): - _active_builder().append_instruction(instructions.Call(subroutine=schedule)) - elif isinstance(schedule, ScheduleBlock): - _active_builder().append_block(schedule) - else: - raise exceptions.PulseError( - f"Input program {schedule.__class__.__name__} is not " - "acceptable program format. Input `Schedule` or " - "`ScheduleBlock`." - ) + _active_builder().append_subroutine(schedule) def append_instruction(instruction: instructions.Instruction): @@ -1181,7 +1240,7 @@ def align_left() -> ContextManager[None]: yield finally: current = builder.pop_context() - builder.append_block(current) + builder.append_subroutine(current) @contextmanager @@ -1219,7 +1278,7 @@ def align_right() -> AlignmentKind: yield finally: current = builder.pop_context() - builder.append_block(current) + builder.append_subroutine(current) @contextmanager @@ -1257,7 +1316,7 @@ def align_sequential() -> AlignmentKind: yield finally: current = builder.pop_context() - builder.append_block(current) + builder.append_subroutine(current) @contextmanager @@ -1308,7 +1367,7 @@ def align_equispaced(duration: Union[int, ParameterExpression]) -> AlignmentKind yield finally: current = builder.pop_context() - builder.append_block(current) + builder.append_subroutine(current) @contextmanager @@ -1369,7 +1428,7 @@ def udd10_pos(j): yield finally: current = builder.pop_context() - builder.append_block(current) + builder.append_subroutine(current) @contextmanager @@ -1395,7 +1454,7 @@ def general_transforms(alignment_context: AlignmentKind) -> ContextManager[None] yield finally: current = builder.pop_context() - builder.append_block(current) + builder.append_subroutine(current) @contextmanager @@ -1986,16 +2045,8 @@ def call( the parameters having the same name are all updated together. If you want to avoid name collision, use ``value_dict`` with :class:`~.Parameter` objects instead. - - Raises: - exceptions.PulseError: If the input ``target`` type is not supported. """ - if not isinstance(target, (circuit.QuantumCircuit, Schedule, ScheduleBlock)): - raise exceptions.PulseError(f"'{target.__class__.__name__}' is not a valid target object.") - - _active_builder().call_subroutine( - subroutine=target, name=name, value_dict=value_dict, **kw_params - ) + _active_builder().call_subroutine(target, name, value_dict, **kw_params) def reference(name: str, *extra_keys: str): @@ -2237,7 +2288,10 @@ def measure( # note this is not a subroutine. # just a macro to automate combination of stimulus and acquisition. - _active_builder().call_subroutine(measure_sched) + # prepare unique reference name based on qubit and memory slot index. + qubits_repr = "&".join(map(str, qubits)) + mslots_repr = "&".join(map(lambda r: str(r.index), registers)) + _active_builder().call_subroutine(measure_sched, name=f"measure_{qubits_repr}..{mslots_repr}") if len(qubits) == 1: return registers[0] @@ -2283,7 +2337,7 @@ def measure_all() -> List[chans.MemorySlot]: # note this is not a subroutine. # just a macro to automate combination of stimulus and acquisition. - _active_builder().call_subroutine(measure_sched) + _active_builder().call_subroutine(measure_sched, name="measure_all") return registers diff --git a/qiskit/pulse/instructions/__init__.py b/qiskit/pulse/instructions/__init__.py index 142a6e96e857..3e0de5ab6f29 100644 --- a/qiskit/pulse/instructions/__init__.py +++ b/qiskit/pulse/instructions/__init__.py @@ -45,11 +45,13 @@ Reference Delay Play + RelativeBarrier SetFrequency ShiftFrequency SetPhase ShiftPhase Snapshot + TimeBlockade These are all instances of the same base class: @@ -57,7 +59,7 @@ """ from .acquire import Acquire from .delay import Delay -from .directives import Directive, RelativeBarrier +from .directives import Directive, RelativeBarrier, TimeBlockade from .call import Call from .instruction import Instruction from .frequency import SetFrequency, ShiftFrequency diff --git a/qiskit/pulse/instructions/directives.py b/qiskit/pulse/instructions/directives.py index d09b69823332..4ca5656ad3a7 100644 --- a/qiskit/pulse/instructions/directives.py +++ b/qiskit/pulse/instructions/directives.py @@ -11,12 +11,12 @@ # that they have been altered from the originals. """Directives are hints to the pulse compiler for how to process its input programs.""" - from abc import ABC from typing import Optional, Tuple from qiskit.pulse import channels as chans from qiskit.pulse.instructions import instruction +from qiskit.pulse.exceptions import PulseError class Directive(instruction.Instruction, ABC): @@ -55,3 +55,91 @@ def channels(self) -> Tuple[chans.Channel]: def __eq__(self, other): """Verify two barriers are equivalent.""" return isinstance(other, type(self)) and set(self.channels) == set(other.channels) + + +class TimeBlockade(Directive): + """Pulse ``TimeBlockade`` directive. + + This instruction is intended to be used internally within the pulse builder, + to convert :class:`.Schedule` into :class:`.ScheduleBlock`. + Because :class:`.ScheduleBlock` cannot take an absolute instruction time interval, + this directive helps the block representation to find the starting time of an instruction. + + Example: + + This schedule plays constant pulse at t0 = 120. + + .. code-block:: python + + schedule = Schedule() + schedule.insert(120, Play(Constant(10, 0.1), DriveChannel(0))) + + This schedule block is expected to be identical to above at a time of execution. + + .. code-block:: python + + block = ScheduleBlock() + block.append(TimeBlockade(120, DriveChannel(0))) + block.append(Play(Constant(10, 0.1), DriveChannel(0))) + + Such conversion may be done by + + .. code-block:: python + + from qiskit.pulse.transforms import block_to_schedule, remove_directives + + schedule = remove_directives(block_to_schedule(block)) + + + .. note:: + + The TimeBlockade instruction behaves almost identically + to :class:`~qiskit.pulse.instructions.Delay` instruction. + However, the TimeBlockade is just a compiler directive and must be removed before execution. + This may be done by :func:`~qiskit.pulse.transforms.remove_directives` transform. + Once these directives are removed, occupied timeslots are released and + user can insert another instruction without timing overlap. + """ + + def __init__( + self, + duration: int, + channel: chans.Channel, + name: Optional[str] = None, + ): + """Create a time blockade directive. + + Args: + duration: Length of time of the occupation in terms of dt. + channel: The channel that will be the occupied. + name: Name of the time blockade for display purposes. + """ + super().__init__(operands=(duration, channel), name=name) + + def _validate(self): + """Called after initialization to validate instruction data. + + Raises: + PulseError: If the input ``duration`` is not integer value. + """ + if not isinstance(self.duration, int): + raise PulseError( + "TimeBlockade duration cannot be parameterized. Specify an integer duration value." + ) + + @property + def channel(self) -> chans.Channel: + """Return the :py:class:`~qiskit.pulse.channels.Channel` that this instruction is + scheduled on. + """ + return self.operands[1] + + @property + def channels(self) -> Tuple[chans.Channel]: + """Returns the channels that this schedule uses.""" + return (self.channel,) + + @property + def duration(self) -> int: + """Duration of this instruction.""" + return self.operands[0] diff --git a/qiskit/pulse/transforms/canonicalization.py b/qiskit/pulse/transforms/canonicalization.py index 4e2409ffa354..7cb7236b5910 100644 --- a/qiskit/pulse/transforms/canonicalization.py +++ b/qiskit/pulse/transforms/canonicalization.py @@ -13,7 +13,7 @@ import warnings from collections import defaultdict -from typing import List, Optional, Iterable, Union +from typing import List, Optional, Iterable, Union, Type import numpy as np @@ -457,6 +457,7 @@ def pad( channels: Optional[Iterable[chans.Channel]] = None, until: Optional[int] = None, inplace: bool = False, + pad_with: Optional[Type[instructions.Instruction]] = None, ) -> Schedule: """Pad the input Schedule with ``Delay``s on all unoccupied timeslots until ``schedule.duration`` or ``until`` if not ``None``. @@ -468,37 +469,49 @@ def pad( of ``schedule`` it will be added. until: Time to pad until. Defaults to ``schedule.duration`` if not provided. inplace: Pad this schedule by mutating rather than returning a new schedule. + pad_with: Pulse ``Instruction`` subclass to be used for padding. + Default to :class:`~qiskit.pulse.instructions.Delay` instruction. Returns: The padded schedule. + + Raises: + PulseError: When non pulse instruction is set to `pad_with`. """ until = until or schedule.duration channels = channels or schedule.channels + if pad_with: + if issubclass(pad_with, instructions.Instruction): + pad_cls = pad_with + else: + raise PulseError( + f"'{pad_with.__class__.__name__}' is not valid pulse instruction to pad with." + ) + else: + pad_cls = instructions.Delay + for channel in channels: if isinstance(channel, ClassicalIOChannel): continue + if channel not in schedule.channels: - schedule |= instructions.Delay(until, channel) + schedule = schedule.insert(0, instructions.Delay(until, channel), inplace=inplace) continue - curr_time = 0 - # Use the copy of timeslots. When a delay is inserted before the current interval, - # current timeslot is pointed twice and the program crashes with the wrong pointer index. - timeslots = schedule.timeslots[channel].copy() - # TODO: Replace with method of getting instructions on a channel - for interval in timeslots: - if curr_time >= until: + prev_time = 0 + timeslots = iter(schedule.timeslots[channel]) + to_pad = [] + while prev_time < until: + try: + t0, t1 = next(timeslots) + except StopIteration: + to_pad.append((prev_time, until - prev_time)) break - if interval[0] != curr_time: - end_time = min(interval[0], until) - schedule = schedule.insert( - curr_time, instructions.Delay(end_time - curr_time, channel), inplace=inplace - ) - curr_time = interval[1] - if curr_time < until: - schedule = schedule.insert( - curr_time, instructions.Delay(until - curr_time, channel), inplace=inplace - ) + if prev_time < t0: + to_pad.append((prev_time, min(t0, until) - prev_time)) + prev_time = t1 + for t0, duration in to_pad: + schedule = schedule.insert(t0, pad_cls(duration, channel), inplace=inplace) return schedule diff --git a/qiskit/qpy/__init__.py b/qiskit/qpy/__init__.py index 9df5806e5f4e..f4279a9b73dc 100644 --- a/qiskit/qpy/__init__.py +++ b/qiskit/qpy/__init__.py @@ -207,6 +207,7 @@ - ``q``: :class:`~qiskit.pulse.instructions.SetPhase` instruction - ``r``: :class:`~qiskit.pulse.instructions.ShiftPhase` instruction - ``b``: :class:`~qiskit.pulse.instructions.RelativeBarrier` instruction +- ``t``: :class:`~qiskit.pulse.instructions.TimeBlockade` instruction The operands of these instances can be serialized through the standard QPY value serialization mechanism, however there are special object types that only appear in the schedule operands. diff --git a/qiskit/qpy/type_keys.py b/qiskit/qpy/type_keys.py index 38b22addf8d0..53093b96dbb7 100644 --- a/qiskit/qpy/type_keys.py +++ b/qiskit/qpy/type_keys.py @@ -44,6 +44,7 @@ SetPhase, ShiftPhase, RelativeBarrier, + TimeBlockade, ) from qiskit.pulse.library import Waveform, SymbolicPulse from qiskit.pulse.schedule import ScheduleBlock @@ -231,10 +232,11 @@ class ScheduleInstruction(TypeKeyBase): SET_PHASE = b"q" SHIFT_PHASE = b"r" BARRIER = b"b" + TIME_BLOCKADE = b"t" # 's' is reserved by ScheduleBlock, i.e. block can be nested as an element. # Call instructon is not supported by QPY. - # This instruction is excluded from ScheduleBlock instructions with + # This instruction has been excluded from ScheduleBlock instructions with # qiskit-terra/#8005 and new instruction Reference will be added instead. # Call is only applied to Schedule which is not supported by QPY. # Also snapshot is not suppored because of its limited usecase. @@ -257,6 +259,8 @@ def assign(cls, obj): return cls.SHIFT_PHASE if isinstance(obj, RelativeBarrier): return cls.BARRIER + if isinstance(obj, TimeBlockade): + return cls.TIME_BLOCKADE raise exceptions.QpyError( f"Object type '{type(obj)}' is not supported in {cls.__name__} namespace." @@ -280,6 +284,8 @@ def retrieve(cls, type_key): return ShiftPhase if type_key == cls.BARRIER: return RelativeBarrier + if type_key == cls.TIME_BLOCKADE: + return TimeBlockade raise exceptions.QpyError( f"A class corresponding to type key '{type_key}' is not found in {cls.__name__} namespace." diff --git a/qiskit/transpiler/passes/calibration/rzx_builder.py b/qiskit/transpiler/passes/calibration/rzx_builder.py index b17af6e4c51a..8b1312a37d0c 100644 --- a/qiskit/transpiler/passes/calibration/rzx_builder.py +++ b/qiskit/transpiler/passes/calibration/rzx_builder.py @@ -12,18 +12,17 @@ """RZX calibration builders.""" -import math +import enum import warnings +from math import pi, erf # pylint: disable=no-name-in-module from typing import List, Tuple, Union -import enum import numpy as np from qiskit.circuit import Instruction as CircuitInst from qiskit.circuit.library.standard_gates import RZXGate from qiskit.exceptions import QiskitError from qiskit.pulse import ( Play, - Delay, Schedule, ScheduleBlock, ControlChannel, @@ -31,8 +30,9 @@ GaussianSquare, Waveform, ) +from qiskit.pulse import builder from qiskit.pulse.filters import filter_instructions -from qiskit.pulse.instruction_schedule_map import InstructionScheduleMap, CalibrationPublisher +from qiskit.pulse.instruction_schedule_map import InstructionScheduleMap from .base_builder import CalibrationBuilder from .exceptions import CalibrationNotAvailable @@ -104,8 +104,10 @@ def supported(self, node_op: CircuitInst, qubits: List) -> bool: return isinstance(node_op, RZXGate) and self._inst_map.has("cx", qubits) @staticmethod - def rescale_cr_inst(instruction: Play, theta: float, sample_mult: int = 16) -> Play: - """ + @builder.macro + def rescale_cr_inst(instruction: Play, theta: float, sample_mult: int = 16) -> int: + """A builder macro to play stretched pulse. + Args: instruction: The instruction from which to create a new shortened or lengthened pulse. theta: desired angle, pi/2 is assumed to be the angle that the pulse in the given @@ -113,8 +115,7 @@ def rescale_cr_inst(instruction: Play, theta: float, sample_mult: int = 16) -> P sample_mult: All pulses must be a multiple of sample_mult. Returns: - qiskit.pulse.Play: The play instruction with the stretched compressed - GaussianSquare pulse. + Duration of stretched pulse. Raises: QiskitError: if rotation angle is not assigned. @@ -125,32 +126,37 @@ def rescale_cr_inst(instruction: Play, theta: float, sample_mult: int = 16) -> P raise QiskitError("Target rotation angle is not assigned.") from ex # This method is called for instructions which are guaranteed to play GaussianSquare pulse - amp = instruction.pulse.amp - width = instruction.pulse.width - sigma = instruction.pulse.sigma - n_sigmas = (instruction.pulse.duration - width) / sigma + params = instruction.pulse.parameters.copy() + risefall_sigma_ratio = (params["duration"] - params["width"]) / params["sigma"] # The error function is used because the Gaussian may have chopped tails. - gaussian_area = abs(amp) * sigma * np.sqrt(2 * np.pi) * math.erf(n_sigmas) - area = gaussian_area + abs(amp) * width - - target_area = abs(theta) / (np.pi / 2.0) * area - sign = np.sign(theta) - - if target_area > gaussian_area: - width = (target_area - gaussian_area) / abs(amp) - duration = round((width + n_sigmas * sigma) / sample_mult) * sample_mult - return Play( - GaussianSquare(amp=sign * amp, width=width, sigma=sigma, duration=duration), - channel=instruction.channel, - ) + # Area is normalized by amplitude. + # This makes widths robust to the rounding error. + risefall_area = params["sigma"] * np.sqrt(2 * pi) * erf(risefall_sigma_ratio) + full_area = params["width"] + risefall_area + + # Get estimate of target area. Assume this is pi/2 controlled rotation. + cal_angle = pi / 2 + target_area = abs(theta) / cal_angle * full_area + new_width = target_area - risefall_area + + if new_width >= 0: + width = new_width + params["amp"] *= np.sign(theta) else: - amp_scale = sign * target_area / gaussian_area - duration = round(n_sigmas * sigma / sample_mult) * sample_mult - return Play( - GaussianSquare(amp=amp * amp_scale, width=0, sigma=sigma, duration=duration), - channel=instruction.channel, - ) + width = 0 + params["amp"] *= np.sign(theta) * target_area / risefall_area + + round_duration = ( + round((width + risefall_sigma_ratio * params["sigma"]) / sample_mult) * sample_mult + ) + params["duration"] = round_duration + params["width"] = width + + stretched_pulse = GaussianSquare(**params) + builder.play(stretched_pulse, instruction.channel) + + return round_duration def get_calibration(self, node_op: CircuitInst, qubits: List) -> Union[Schedule, ScheduleBlock]: """Builds the calibration schedule for the RZXGate(theta) with echos. @@ -175,11 +181,8 @@ def get_calibration(self, node_op: CircuitInst, qubits: List) -> Union[Schedule, except TypeError as ex: raise QiskitError("Target rotation angle is not assigned.") from ex - rzx_theta = Schedule(name="rzx(%.3f)" % theta) - rzx_theta.metadata["publisher"] = CalibrationPublisher.QISKIT - if np.isclose(theta, 0.0): - return rzx_theta + return ScheduleBlock(name="rzx(0.000)") cx_sched = self._inst_map.get("cx", qubits=qubits) cal_type, cr_tones, comp_tones = _check_calibration_type(cx_sched) @@ -202,57 +205,45 @@ def get_calibration(self, node_op: CircuitInst, qubits: List) -> Union[Schedule, # Determine native direction, assuming only single drive channel per qubit. # This guarantees channel and qubit index equality. - is_native = comp_tones[0].channel.index == qubits[1] - - stretched_cr_tones = list(map(lambda p: self.rescale_cr_inst(p, theta), cr_tones)) - stretched_comp_tones = list(map(lambda p: self.rescale_cr_inst(p, theta), comp_tones)) - - if is_native: + if comp_tones[0].channel.index == qubits[1]: xgate = self._inst_map.get("x", qubits[0]) - - for cr, comp in zip(stretched_cr_tones, stretched_comp_tones): - current_dur = rzx_theta.duration - rzx_theta.insert(current_dur, cr, inplace=True) - rzx_theta.insert(current_dur, comp, inplace=True) - rzx_theta.append(xgate, inplace=True) - - else: - # Add hadamard gate to flip - xgate = self._inst_map.get("x", qubits[1]) - szc = self._inst_map.get("rz", qubits[1], np.pi / 2) - sxc = self._inst_map.get("sx", qubits[1]) - szt = self._inst_map.get("rz", qubits[0], np.pi / 2) - sxt = self._inst_map.get("sx", qubits[0]) - - # Hadamard to control - rzx_theta.insert(0, szc, inplace=True) - rzx_theta.insert(0, sxc, inplace=True) - rzx_theta.insert(sxc.duration, szc, inplace=True) - - # Hadamard to target - rzx_theta.insert(0, szt, inplace=True) - rzx_theta.insert(0, sxt, inplace=True) - rzx_theta.insert(sxt.duration, szt, inplace=True) - - for cr, comp in zip(stretched_cr_tones, stretched_comp_tones): - current_dur = rzx_theta.duration - rzx_theta.insert(current_dur, cr, inplace=True) - rzx_theta.insert(current_dur, comp, inplace=True) - rzx_theta.append(xgate, inplace=True) - - current_dur = rzx_theta.duration - - # Hadamard to control - rzx_theta.insert(current_dur, szc, inplace=True) - rzx_theta.insert(current_dur, sxc, inplace=True) - rzx_theta.insert(current_dur + sxc.duration, szc, inplace=True) - - # Hadamard to target - rzx_theta.insert(current_dur, szt, inplace=True) - rzx_theta.insert(current_dur, sxt, inplace=True) - rzx_theta.insert(current_dur + sxt.duration, szt, inplace=True) - - return rzx_theta + with builder.build( + default_alignment="sequential", name="rzx(%.3f)" % theta + ) as rzx_theta_native: + for cr_tone, comp_tone in zip(cr_tones, comp_tones): + with builder.align_left(): + self.rescale_cr_inst(cr_tone, theta) + self.rescale_cr_inst(comp_tone, theta) + builder.call(xgate) + return rzx_theta_native + + # The direction is not native. Add Hadamard gates to flip the direction. + xgate = self._inst_map.get("x", qubits[1]) + szc = self._inst_map.get("rz", qubits[1], pi / 2) + sxc = self._inst_map.get("sx", qubits[1]) + szt = self._inst_map.get("rz", qubits[0], pi / 2) + sxt = self._inst_map.get("sx", qubits[0]) + with builder.build(name="hadamard") as hadamard: + # Control qubit + builder.call(szc, name="szc") + builder.call(sxc, name="sxc") + builder.call(szc, name="szc") + # Target qubit + builder.call(szt, name="szt") + builder.call(sxt, name="sxt") + builder.call(szt, name="szt") + + with builder.build( + default_alignment="sequential", name="rzx(%.3f)" % theta + ) as rzx_theta_flip: + builder.call(hadamard, name="hadamard") + for cr_tone, comp_tone in zip(cr_tones, comp_tones): + with builder.align_left(): + self.rescale_cr_inst(cr_tone, theta) + self.rescale_cr_inst(comp_tone, theta) + builder.call(xgate) + builder.call(hadamard, name="hadamard") + return rzx_theta_flip class RZXCalibrationBuilderNoEcho(RZXCalibrationBuilder): @@ -293,11 +284,8 @@ def get_calibration(self, node_op: CircuitInst, qubits: List) -> Union[Schedule, except TypeError as ex: raise QiskitError("Target rotation angle is not assigned.") from ex - rzx_theta = Schedule(name="rzx(%.3f)" % theta) - rzx_theta.metadata["publisher"] = CalibrationPublisher.QISKIT - if np.isclose(theta, 0.0): - return rzx_theta + return ScheduleBlock(name="rzx(0.000)") cx_sched = self._inst_map.get("cx", qubits=qubits) cal_type, cr_tones, comp_tones = _check_calibration_type(cx_sched) @@ -320,21 +308,12 @@ def get_calibration(self, node_op: CircuitInst, qubits: List) -> Union[Schedule, # Determine native direction, assuming only single drive channel per qubit. # This guarantees channel and qubit index equality. - is_native = comp_tones[0].channel.index == qubits[1] - - stretched_cr_tone = self.rescale_cr_inst(cr_tones[0], 2 * theta) - stretched_comp_tone = self.rescale_cr_inst(comp_tones[0], 2 * theta) - - if is_native: - # Placeholder to make pulse gate work - delay = Delay(stretched_cr_tone.duration, DriveChannel(qubits[0])) - - # This doesn't remove unwanted instruction such as ZI - # These terms are eliminated along with other gates around the pulse gate. - rzx_theta = rzx_theta.insert(0, stretched_cr_tone, inplace=True) - rzx_theta = rzx_theta.insert(0, stretched_comp_tone, inplace=True) - rzx_theta = rzx_theta.insert(0, delay, inplace=True) - + if comp_tones[0].channel.index == qubits[1]: + with builder.build(default_alignment="left", name="rzx(%.3f)" % theta) as rzx_theta: + stretched_dur = self.rescale_cr_inst(cr_tones[0], 2 * theta) + self.rescale_cr_inst(comp_tones[0], 2 * theta) + # Placeholder to make pulse gate work + builder.delay(stretched_dur, DriveChannel(qubits[0])) return rzx_theta raise QiskitError("RZXCalibrationBuilderNoEcho only supports hardware-native RZX gates.") diff --git a/releasenotes/notes/add-timeblockade-instruction-9469a5e9e0218adc.yaml b/releasenotes/notes/add-timeblockade-instruction-9469a5e9e0218adc.yaml new file mode 100644 index 000000000000..ff458906a46e --- /dev/null +++ b/releasenotes/notes/add-timeblockade-instruction-9469a5e9e0218adc.yaml @@ -0,0 +1,10 @@ +--- +features: + - | + New pulse directive :class:`~qiskit.pulse.instructions.TimeBlockade` has been added. + This instruction is QPY compatible. This directive behaves almost identically to + the delay instruction, but will be removed before execution. + This directive is intended to be used internally within the pulse builder + and helps :class:`.ScheduleBlock` represent instructions with + absolute time intervals. This allows the pulse builder to convert + :class:`Schedule` into :class:`ScheduleBlock`, rather than wrapping with call instruction. diff --git a/releasenotes/notes/upgrade-pulse-builder-and-rzx-builder-033ac8ad8ad2a192.yaml b/releasenotes/notes/upgrade-pulse-builder-and-rzx-builder-033ac8ad8ad2a192.yaml new file mode 100644 index 000000000000..8023d54b383f --- /dev/null +++ b/releasenotes/notes/upgrade-pulse-builder-and-rzx-builder-033ac8ad8ad2a192.yaml @@ -0,0 +1,31 @@ +--- +upgrade: + - | + The behavior of the pulse builder when a :class:`.Schedule` is called + has been upgraded. Called schedules are internally converted into + :class:`.ScheduleBlock` representation and now reference mechanism is + always applied rather than appending the schedules wrapped by + the :class:`~qiskit.pulse.instructions.Call` instruction. + Note that the converted block doesn't necessary recover the original alignment context. + This is simply an ASAP aligned sequence of pulse instructions with absolute time intervals. + This is an upgrade of internal representation of called pulse programs and thus no API changes. + However the :class:`~qiskit.pulse.instructions.Call` instruction and :class:`.Schedule` + no longer appear in the builder's pulse program. + This change guarantees the generated schedule blocks are always QPY compatible. + If you are filtering the output schedule instructions by :class:`~qiskit.pulse.instructions.Call`, + you can access to the :attr:`.ScheduleBlock.references` instead to retrieve the called program. + - | + :class:`~qiskit.transpiler.passes.RZXCalibrationBuilder` + and :class:`~qiskit.transpiler.passes.RZXCalibrationBuilderNoEcho` transpiler pass + have been upgraded to generate :class:`.ScheduleBlock`. + This change guarantees the transpiled circuits are always QPY compatible. + If you are directly using :meth:`~qiskit.transpiler.passes.RZXCalibrationBuilder.rescale_cr_inst`, + method from another program or a pass subclass to rescale cross resonance pulse of the device, + now this method is turned into a pulse builder macro, and you need to use this method + within the pulse builder context to adopts to new release. + The method call injects a play instruction to the context pulse program, + instead of returning a :class:`.Play` instruction with the stretched pulse. +fixes: + - | + The ECR pulse sequence misalignment of :class:`~qiskit.transpiler.passes.RZXCalibrationBuilder` + pass has been fixed. See Qiskit/qiskit-terra/#9013 for details. diff --git a/test/python/pulse/test_calibrationbuilder.py b/test/python/pulse/test_calibrationbuilder.py deleted file mode 100644 index ba108ca9f64b..000000000000 --- a/test/python/pulse/test_calibrationbuilder.py +++ /dev/null @@ -1,197 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2020. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Test the RZXCalibrationBuilderNoEcho.""" - -from math import erf, pi - -import numpy as np -from ddt import data, ddt - -from qiskit import circuit, schedule -from qiskit.pulse import ( - ControlChannel, - Delay, - DriveChannel, - GaussianSquare, - Waveform, - Play, - ShiftPhase, - InstructionScheduleMap, - Schedule, -) -from qiskit.test import QiskitTestCase -from qiskit.providers.fake_provider import FakeAthens -from qiskit.transpiler import PassManager -from qiskit.transpiler.passes.calibration.builders import ( - RZXCalibrationBuilder, - RZXCalibrationBuilderNoEcho, -) - - -class TestCalibrationBuilder(QiskitTestCase): - """Test the Calibration Builder.""" - - def setUp(self): - super().setUp() - self.backend = FakeAthens() - self.inst_map = self.backend.defaults().instruction_schedule_map - - -@ddt -class TestRZXCalibrationBuilder(TestCalibrationBuilder): - """Test RZXCalibrationBuilder.""" - - @data(np.pi / 4, np.pi / 2, np.pi) - def test_rzx_calibration_builder_duration(self, theta: float): - """Test that pulse durations are computed correctly.""" - width = 512.00000001 - sigma = 64 - n_sigmas = 4 - duration = width + n_sigmas * sigma - sample_mult = 16 - amp = 1.0 - pulse = GaussianSquare(duration=duration, amp=amp, sigma=sigma, width=width) - instruction = Play(pulse, ControlChannel(1)) - scaled = RZXCalibrationBuilder.rescale_cr_inst(instruction, theta, sample_mult=sample_mult) - gaussian_area = abs(amp) * sigma * np.sqrt(2 * np.pi) * erf(n_sigmas) - area = gaussian_area + abs(amp) * width - target_area = abs(theta) / (np.pi / 2.0) * area - width = (target_area - gaussian_area) / abs(amp) - expected_duration = round((width + n_sigmas * sigma) / sample_mult) * sample_mult - self.assertEqual(scaled.duration, expected_duration) - - def test_pass_alive_with_dcx_ish(self): - """Test if the pass is not terminated by error with direct CX input.""" - cx_sched = Schedule() - # Fake direct cr - cx_sched.insert(0, Play(GaussianSquare(800, 0.2, 64, 544), ControlChannel(1)), inplace=True) - # Fake direct compensation tone - # Compensation tone doesn't have dedicated pulse class. - # So it's reported as a waveform now. - compensation_tone = Waveform(0.1 * np.ones(800, dtype=complex)) - cx_sched.insert(0, Play(compensation_tone, DriveChannel(0)), inplace=True) - - inst_map = InstructionScheduleMap() - inst_map.add("cx", (1, 0), schedule=cx_sched) - - theta = pi / 3 - rzx_qc = circuit.QuantumCircuit(2) - rzx_qc.rzx(theta, 1, 0) - - pass_ = RZXCalibrationBuilder(instruction_schedule_map=inst_map) - with self.assertWarns(UserWarning): - # User warning that says q0 q1 is invalid - cal_qc = PassManager(pass_).run(rzx_qc) - self.assertEqual(cal_qc, rzx_qc) - - -class TestRZXCalibrationBuilderNoEcho(TestCalibrationBuilder): - """Test RZXCalibrationBuilderNoEcho.""" - - def test_rzx_calibration_builder(self): - """Test whether RZXCalibrationBuilderNoEcho scales pulses correctly.""" - - # Define a circuit with one RZX gate and an angle theta. - theta = pi / 3 - rzx_qc = circuit.QuantumCircuit(2) - rzx_qc.rzx(theta / 2, 1, 0) - - # Verify that there are no calibrations for this circuit yet. - self.assertEqual(rzx_qc.calibrations, {}) - - # apply the RZXCalibrationBuilderNoEcho. - pass_ = RZXCalibrationBuilderNoEcho( - instruction_schedule_map=self.backend.defaults().instruction_schedule_map - ) - cal_qc = PassManager(pass_).run(rzx_qc) - rzx_qc_duration = schedule(cal_qc, self.backend).duration - - # Check that the calibrations contain the correct instructions - # and pulses on the correct channels. - rzx_qc_instructions = cal_qc.calibrations["rzx"][((1, 0), (theta / 2,))].instructions - self.assertEqual(rzx_qc_instructions[0][1].channel, DriveChannel(0)) - self.assertTrue(isinstance(rzx_qc_instructions[0][1], Play)) - self.assertEqual(rzx_qc_instructions[0][1].pulse.pulse_type, "GaussianSquare") - self.assertEqual(rzx_qc_instructions[1][1].channel, DriveChannel(1)) - self.assertTrue(isinstance(rzx_qc_instructions[1][1], Delay)) - self.assertEqual(rzx_qc_instructions[2][1].channel, ControlChannel(1)) - self.assertTrue(isinstance(rzx_qc_instructions[2][1], Play)) - self.assertEqual(rzx_qc_instructions[2][1].pulse.pulse_type, "GaussianSquare") - - # Calculate the duration of one scaled Gaussian square pulse from the CX gate. - cx_sched = self.inst_map.get("cx", qubits=(1, 0)) - - crs = [] - for time, inst in cx_sched.instructions: - - # Identify the CR pulses. - if isinstance(inst, Play) and not isinstance(inst, ShiftPhase): - if isinstance(inst.channel, ControlChannel): - crs.append((time, inst)) - - pulse_ = crs[0][1].pulse - amp = pulse_.amp - width = pulse_.width - sigma = pulse_.sigma - n_sigmas = (pulse_.duration - width) / sigma - sample_mult = 16 - - gaussian_area = abs(amp) * sigma * np.sqrt(2 * np.pi) * erf(n_sigmas) - area = gaussian_area + abs(amp) * width - target_area = abs(theta) / (np.pi / 2.0) * area - width = (target_area - gaussian_area) / abs(amp) - duration = round((width + n_sigmas * sigma) / sample_mult) * sample_mult - - # Check whether the durations of the RZX pulse and - # the scaled CR pulse from the CX gate match. - self.assertEqual(rzx_qc_duration, duration) - - def test_pulse_amp_typecasted(self): - """Test if scaled pulse amplitude is complex type.""" - fake_play = Play( - GaussianSquare(duration=800, amp=0.1, sigma=64, risefall_sigma_ratio=2), - ControlChannel(0), - ) - fake_theta = circuit.Parameter("theta") - assigned_theta = fake_theta.assign(fake_theta, 0.01) - - scaled = RZXCalibrationBuilderNoEcho.rescale_cr_inst( - instruction=fake_play, theta=assigned_theta - ) - scaled_pulse = scaled.pulse - - self.assertIsInstance(scaled_pulse.amp, complex) - - def test_pass_alive_with_dcx_ish(self): - """Test if the pass is not terminated by error with direct CX input.""" - cx_sched = Schedule() - # Fake direct cr - cx_sched.insert(0, Play(GaussianSquare(800, 0.2, 64, 544), ControlChannel(1)), inplace=True) - # Fake direct compensation tone - # Compensation tone doesn't have dedicated pulse class. - # So it's reported as a waveform now. - compensation_tone = Waveform(0.1 * np.ones(800, dtype=complex)) - cx_sched.insert(0, Play(compensation_tone, DriveChannel(0)), inplace=True) - - inst_map = InstructionScheduleMap() - inst_map.add("cx", (1, 0), schedule=cx_sched) - - theta = pi / 3 - rzx_qc = circuit.QuantumCircuit(2) - rzx_qc.rzx(theta, 1, 0) - - pass_ = RZXCalibrationBuilderNoEcho(instruction_schedule_map=inst_map) - with self.assertWarns(UserWarning): - # User warning that says q0 q1 is invalid - cal_qc = PassManager(pass_).run(rzx_qc) - self.assertEqual(cal_qc, rzx_qc) diff --git a/test/python/qpy/test_block_load_from_qpy.py b/test/python/qpy/test_block_load_from_qpy.py index ef814aecce61..ea5dac578cc2 100644 --- a/test/python/qpy/test_block_load_from_qpy.py +++ b/test/python/qpy/test_block_load_from_qpy.py @@ -17,7 +17,7 @@ import numpy as np -from qiskit.pulse import builder +from qiskit.pulse import builder, Schedule from qiskit.pulse.library import ( SymbolicPulse, Gaussian, @@ -34,6 +34,7 @@ MemorySlot, RegisterSlot, ) +from qiskit.pulse.instructions import Play, TimeBlockade from qiskit.circuit import Parameter, QuantumCircuit, Gate from qiskit.test import QiskitTestCase from qiskit.qpy import dump, load @@ -151,6 +152,12 @@ def test_barrier(self): builder.barrier(DriveChannel(0), DriveChannel(1), ControlChannel(2)) self.assert_roundtrip_equal(test_sched) + def test_time_blockade(self): + """Test time blockade.""" + with builder.build() as test_sched: + builder.append_instruction(TimeBlockade(10, DriveChannel(0))) + self.assert_roundtrip_equal(test_sched) + def test_measure(self): """Test measurement.""" with builder.build() as test_sched: @@ -187,6 +194,20 @@ def test_nested_blocks(self): builder.delay(200, DriveChannel(1)) self.assert_roundtrip_equal(test_sched) + def test_called_schedule(self): + """Test referenced pulse Schedule object. + + Referenced object is naively converted into ScheduleBlock with TimeBlockade instructions. + Thus referenced Schedule is still QPY compatible. + """ + refsched = Schedule() + refsched.insert(20, Play(Constant(100, 0.1), DriveChannel(0))) + refsched.insert(50, Play(Constant(100, 0.1), DriveChannel(1))) + + with builder.build() as test_sched: + builder.call(refsched, name="test_ref") + self.assert_roundtrip_equal(test_sched) + def test_bell_schedule(self): """Test complex schedule to create a Bell state.""" with builder.build() as test_sched: diff --git a/test/python/qpy/test_circuit_load_from_qpy.py b/test/python/qpy/test_circuit_load_from_qpy.py new file mode 100644 index 000000000000..4174e59c792d --- /dev/null +++ b/test/python/qpy/test_circuit_load_from_qpy.py @@ -0,0 +1,69 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test cases for the schedule block qpy loading and saving.""" + +import io + +from ddt import ddt, data + +from qiskit.circuit import QuantumCircuit +from qiskit.providers.fake_provider import FakeHanoi +from qiskit.qpy import dump, load +from qiskit.test import QiskitTestCase +from qiskit.transpiler import PassManager +from qiskit.transpiler import passes + + +class QpyCircuitTestCase(QiskitTestCase): + """QPY schedule testing platform.""" + + def assert_roundtrip_equal(self, circuit): + """QPY roundtrip equal test.""" + qpy_file = io.BytesIO() + dump(circuit, qpy_file) + qpy_file.seek(0) + new_circuit = load(qpy_file)[0] + + self.assertEqual(circuit, new_circuit) + + +@ddt +class TestCalibrationPasses(QpyCircuitTestCase): + """QPY round-trip test case of transpiled circuits with pulse level optimization.""" + + def setUp(self): + super().setUp() + # This backend provides CX(0,1) with native ECR direction. + self.inst_map = FakeHanoi().defaults().instruction_schedule_map + + @data(0.1, 0.7, 1.5) + def test_rzx_calibration(self, angle): + """RZX builder calibration pass with echo.""" + pass_ = passes.RZXCalibrationBuilder(self.inst_map) + pass_manager = PassManager(pass_) + test_qc = QuantumCircuit(2) + test_qc.rzx(angle, 0, 1) + rzx_qc = pass_manager.run(test_qc) + + self.assert_roundtrip_equal(rzx_qc) + + @data(0.1, 0.7, 1.5) + def test_rzx_calibration_echo(self, angle): + """RZX builder calibration pass without echo.""" + pass_ = passes.RZXCalibrationBuilderNoEcho(self.inst_map) + pass_manager = PassManager(pass_) + test_qc = QuantumCircuit(2) + test_qc.rzx(angle, 0, 1) + rzx_qc = pass_manager.run(test_qc) + + self.assert_roundtrip_equal(rzx_qc) diff --git a/test/python/transpiler/test_calibrationbuilder.py b/test/python/transpiler/test_calibrationbuilder.py new file mode 100644 index 000000000000..f70c2b942851 --- /dev/null +++ b/test/python/transpiler/test_calibrationbuilder.py @@ -0,0 +1,335 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2020. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test the RZXCalibrationBuilderNoEcho.""" + +from math import pi, erf # pylint: disable=no-name-in-module + +import numpy as np +from ddt import data, ddt + +from qiskit import circuit, schedule +from qiskit.circuit.library.standard_gates import SXGate, RZGate +from qiskit.providers.fake_provider import FakeHanoi +from qiskit.pulse import ( + ControlChannel, + DriveChannel, + GaussianSquare, + Waveform, + Play, + InstructionScheduleMap, + Schedule, +) +from qiskit.pulse import builder +from qiskit.pulse.transforms import target_qobj_transform +from qiskit.test import QiskitTestCase +from qiskit.transpiler import PassManager +from qiskit.transpiler.passes.calibration.builders import ( + RZXCalibrationBuilder, + RZXCalibrationBuilderNoEcho, +) + + +class TestCalibrationBuilder(QiskitTestCase): + """Test the Calibration Builder.""" + + # CR parameters + __risefall = 4 + __angle = np.pi / 2 + __granularity = 16 + + def setUp(self): + super().setUp() + # This backend provides CX(0,1) with native ECR direction. + self.backend = FakeHanoi() + self.inst_map = self.backend.defaults().instruction_schedule_map + + # Get positive CR tone + def _get_cr(time_inst, name): + return isinstance(time_inst[1], Play) and time_inst[1].pulse.name.startswith(name) + + cx_sched = self.inst_map.get("cx", (0, 1)) + + # CR tone + self.u0p_play = cx_sched.filter(lambda elm: _get_cr(elm, "CR90p_u")).instructions[0][1] + self.u0m_play = cx_sched.filter(lambda elm: _get_cr(elm, "CR90m_u")).instructions[0][1] + + # Rotary tone + self.d1p_play = cx_sched.filter(lambda elm: _get_cr(elm, "CR90p_d")).instructions[0][1] + self.d1m_play = cx_sched.filter(lambda elm: _get_cr(elm, "CR90m_d")).instructions[0][1] + + def compute_stretch_duration(self, play_gaussian_square_pulse, theta): + """Compute duration of stretched Gaussian Square pulse.""" + pulse = play_gaussian_square_pulse.pulse + sigma = pulse.sigma + width = self.compute_stretch_width(play_gaussian_square_pulse, theta) + + duration = width + sigma * self.__risefall + return round(duration / self.__granularity) * self.__granularity + + def compute_stretch_width(self, play_gaussian_square_pulse, theta): + """Compute width of stretched Gaussian Square pulse.""" + pulse = play_gaussian_square_pulse.pulse + sigma = pulse.sigma + width = pulse.width + + risefall_area = sigma * np.sqrt(2 * np.pi) * erf(self.__risefall) + full_area = risefall_area + width + + target_area = abs(theta) / self.__angle * full_area + return max(0, target_area - risefall_area) + + +@ddt +class TestRZXCalibrationBuilder(TestCalibrationBuilder): + """Test RZXCalibrationBuilder.""" + + @data(-np.pi / 4, 0.1, np.pi / 4, np.pi / 2, np.pi) + def test_rzx_calibration_cr_pulse_stretch(self, theta: float): + """Test that cross resonance pulse durations are computed correctly.""" + with builder.build() as test_sched: + RZXCalibrationBuilder.rescale_cr_inst(self.u0p_play, theta) + + self.assertEqual(test_sched.duration, self.compute_stretch_duration(self.u0p_play, theta)) + + @data(-np.pi / 4, 0.1, np.pi / 4, np.pi / 2, np.pi) + def test_rzx_calibration_rotary_pulse_stretch(self, theta: float): + """Test that rotary pulse durations are computed correctly.""" + with builder.build() as test_sched: + RZXCalibrationBuilder.rescale_cr_inst(self.d1p_play, theta) + + self.assertEqual(test_sched.duration, self.compute_stretch_duration(self.d1p_play, theta)) + + def test_native_cr(self): + """Test that correct pulse sequence is generated for native CR pair.""" + # Sufficiently large angle to avoid minimum duration, i.e. amplitude rescaling + theta = np.pi / 4 + + qc = circuit.QuantumCircuit(2) + qc.rzx(theta, 0, 1) + + _pass = RZXCalibrationBuilder(self.inst_map) + test_qc = PassManager(_pass).run(qc) + + duration = self.compute_stretch_duration(self.u0p_play, theta) + with builder.build( + self.backend, + default_alignment="sequential", + default_transpiler_settings={"optimization_level": 0}, + ) as ref_sched: + with builder.align_left(): + # Positive CRs + u0p_params = self.u0p_play.pulse.parameters + u0p_params["duration"] = duration + u0p_params["width"] = self.compute_stretch_width(self.u0p_play, theta) + builder.play( + GaussianSquare(**u0p_params), + ControlChannel(0), + ) + d1p_params = self.d1p_play.pulse.parameters + d1p_params["duration"] = duration + d1p_params["width"] = self.compute_stretch_width(self.d1p_play, theta) + builder.play( + GaussianSquare(**d1p_params), + DriveChannel(1), + ) + builder.x(0) + with builder.align_left(): + # Negative CRs + u0m_params = self.u0m_play.pulse.parameters + u0m_params["duration"] = duration + u0m_params["width"] = self.compute_stretch_width(self.u0m_play, theta) + builder.play( + GaussianSquare(**u0m_params), + ControlChannel(0), + ) + d1m_params = self.d1m_play.pulse.parameters + d1m_params["duration"] = duration + d1m_params["width"] = self.compute_stretch_width(self.d1m_play, theta) + builder.play( + GaussianSquare(**d1m_params), + DriveChannel(1), + ) + builder.x(0) + + self.assertEqual(schedule(test_qc, self.backend), target_qobj_transform(ref_sched)) + + def test_non_native_cr(self): + """Test that correct pulse sequence is generated for non-native CR pair.""" + # Sufficiently large angle to avoid minimum duration, i.e. amplitude rescaling + theta = np.pi / 4 + + qc = circuit.QuantumCircuit(2) + qc.rzx(theta, 1, 0) + + _pass = RZXCalibrationBuilder(self.inst_map) + test_qc = PassManager(_pass).run(qc) + + duration = self.compute_stretch_duration(self.u0p_play, theta) + with builder.build( + self.backend, + default_alignment="sequential", + default_transpiler_settings={"optimization_level": 0}, + ) as ref_sched: + # Hadamard gates + builder.call_gate(RZGate(np.pi / 2), qubits=(0,)) + builder.call_gate(SXGate(), qubits=(0,)) + builder.call_gate(RZGate(np.pi / 2), qubits=(0,)) + builder.call_gate(RZGate(np.pi / 2), qubits=(1,)) + builder.call_gate(SXGate(), qubits=(1,)) + builder.call_gate(RZGate(np.pi / 2), qubits=(1,)) + with builder.align_left(): + # Positive CRs + u0p_params = self.u0p_play.pulse.parameters + u0p_params["duration"] = duration + u0p_params["width"] = self.compute_stretch_width(self.u0p_play, theta) + builder.play( + GaussianSquare(**u0p_params), + ControlChannel(0), + ) + d1p_params = self.d1p_play.pulse.parameters + d1p_params["duration"] = duration + d1p_params["width"] = self.compute_stretch_width(self.d1p_play, theta) + builder.play( + GaussianSquare(**d1p_params), + DriveChannel(1), + ) + builder.x(0) + with builder.align_left(): + # Negative CRs + u0m_params = self.u0m_play.pulse.parameters + u0m_params["duration"] = duration + u0m_params["width"] = self.compute_stretch_width(self.u0m_play, theta) + builder.play( + GaussianSquare(**u0m_params), + ControlChannel(0), + ) + d1m_params = self.d1m_play.pulse.parameters + d1m_params["duration"] = duration + d1m_params["width"] = self.compute_stretch_width(self.d1m_play, theta) + builder.play( + GaussianSquare(**d1m_params), + DriveChannel(1), + ) + builder.x(0) + # Hadamard gates + builder.call_gate(RZGate(np.pi / 2), qubits=(0,)) + builder.call_gate(SXGate(), qubits=(0,)) + builder.call_gate(RZGate(np.pi / 2), qubits=(0,)) + builder.call_gate(RZGate(np.pi / 2), qubits=(1,)) + builder.call_gate(SXGate(), qubits=(1,)) + builder.call_gate(RZGate(np.pi / 2), qubits=(1,)) + + self.assertEqual(schedule(test_qc, self.backend), target_qobj_transform(ref_sched)) + + def test_pass_alive_with_dcx_ish(self): + """Test if the pass is not terminated by error with direct CX input.""" + cx_sched = Schedule() + # Fake direct cr + cx_sched.insert(0, Play(GaussianSquare(800, 0.2, 64, 544), ControlChannel(1)), inplace=True) + # Fake direct compensation tone + # Compensation tone doesn't have dedicated pulse class. + # So it's reported as a waveform now. + compensation_tone = Waveform(0.1 * np.ones(800, dtype=complex)) + cx_sched.insert(0, Play(compensation_tone, DriveChannel(0)), inplace=True) + + inst_map = InstructionScheduleMap() + inst_map.add("cx", (1, 0), schedule=cx_sched) + + theta = pi / 3 + rzx_qc = circuit.QuantumCircuit(2) + rzx_qc.rzx(theta, 1, 0) + + pass_ = RZXCalibrationBuilder(instruction_schedule_map=inst_map) + with self.assertWarns(UserWarning): + # User warning that says q0 q1 is invalid + cal_qc = PassManager(pass_).run(rzx_qc) + self.assertEqual(cal_qc, rzx_qc) + + +class TestRZXCalibrationBuilderNoEcho(TestCalibrationBuilder): + """Test RZXCalibrationBuilderNoEcho.""" + + def test_native_cr(self): + """Test that correct pulse sequence is generated for native CR pair. + + .. notes:: + No echo builder only supports native direction. + """ + # Sufficiently large angle to avoid minimum duration, i.e. amplitude rescaling + theta = np.pi / 4 + + qc = circuit.QuantumCircuit(2) + qc.rzx(theta, 0, 1) + + _pass = RZXCalibrationBuilderNoEcho(self.inst_map) + test_qc = PassManager(_pass).run(qc) + + duration = self.compute_stretch_duration(self.u0p_play, 2.0 * theta) + with builder.build() as ref_sched: + # Positive CRs + u0p_params = self.u0p_play.pulse.parameters + u0p_params["duration"] = duration + u0p_params["width"] = self.compute_stretch_width(self.u0p_play, 2.0 * theta) + builder.play( + GaussianSquare(**u0p_params), + ControlChannel(0), + ) + d1p_params = self.d1p_play.pulse.parameters + d1p_params["duration"] = duration + d1p_params["width"] = self.compute_stretch_width(self.d1p_play, 2.0 * theta) + builder.play( + GaussianSquare(**d1p_params), + DriveChannel(1), + ) + builder.delay(duration, DriveChannel(0)) + + self.assertEqual(schedule(test_qc, self.backend), target_qobj_transform(ref_sched)) + + def test_pulse_amp_typecasted(self): + """Test if scaled pulse amplitude is complex type.""" + fake_play = Play( + GaussianSquare(duration=800, amp=0.1, sigma=64, risefall_sigma_ratio=2), + ControlChannel(0), + ) + fake_theta = circuit.Parameter("theta") + assigned_theta = fake_theta.assign(fake_theta, 0.01) + + with builder.build() as test_sched: + RZXCalibrationBuilderNoEcho.rescale_cr_inst(instruction=fake_play, theta=assigned_theta) + scaled_pulse = test_sched.blocks[0].blocks[0].pulse + + self.assertIsInstance(scaled_pulse.amp, complex) + + def test_pass_alive_with_dcx_ish(self): + """Test if the pass is not terminated by error with direct CX input.""" + cx_sched = Schedule() + # Fake direct cr + cx_sched.insert(0, Play(GaussianSquare(800, 0.2, 64, 544), ControlChannel(1)), inplace=True) + # Fake direct compensation tone + # Compensation tone doesn't have dedicated pulse class. + # So it's reported as a waveform now. + compensation_tone = Waveform(0.1 * np.ones(800, dtype=complex)) + cx_sched.insert(0, Play(compensation_tone, DriveChannel(0)), inplace=True) + + inst_map = InstructionScheduleMap() + inst_map.add("cx", (1, 0), schedule=cx_sched) + + theta = pi / 3 + rzx_qc = circuit.QuantumCircuit(2) + rzx_qc.rzx(theta, 1, 0) + + pass_ = RZXCalibrationBuilderNoEcho(instruction_schedule_map=inst_map) + with self.assertWarns(UserWarning): + # User warning that says q0 q1 is invalid + cal_qc = PassManager(pass_).run(rzx_qc) + self.assertEqual(cal_qc, rzx_qc)