Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy pub/sub code into qiskit-ibm-runtime #1349

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions qiskit_ibm_runtime/ibm_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@
from .api.exceptions import RequestsApiError
from .utils import local_to_utc, are_circuits_dynamic

# If using a new-enough version of the IBM Provider, access the pub/sub
# mechanism from it as a broker, but fall back to Qiskit if we're using
# an old version (in which case it will also be falling back to Qiskit).
try:
from qiskit_ibm_provider.utils.pubsub import Publisher
except ImportError:
from qiskit.tools.events.pubsub import Publisher # pylint: disable=ungrouped-imports
from .utils.pubsub import Publisher


logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from qiskit_ibm_runtime.transpiler.passes.scheduling import DynamicCircuitInstructionDurations
from qiskit_ibm_runtime.transpiler.passes.scheduling import ALAPScheduleAnalysis
from qiskit_ibm_runtime.transpiler.passes.scheduling import PadDelay
from qiskit.providers.fake_provider import FakeJakarta
from qiskit_ibm_runtime.fake_provider import FakeJakarta


backend = FakeJakarta()
Expand Down
1 change: 1 addition & 0 deletions qiskit_ibm_runtime/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@
)
from .utils import to_python_identifier, is_crn, get_runtime_api_base_url, resolve_crn
from .json import RuntimeEncoder, RuntimeDecoder, to_base64_string
from . import pubsub
182 changes: 182 additions & 0 deletions qiskit_ibm_runtime/utils/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Message broker for the Publisher / Subscriber mechanism
"""

from __future__ import annotations

import typing

from qiskit.exceptions import QiskitError

try:
from qiskit.tools.events.pubsub import _Broker as _QiskitBroker
except ImportError:
_QiskitBroker = None

_Callback = typing.Callable[..., None]


class _Broker:
"""The event/message broker. It's a singleton.

In order to keep consistency across all the components, it would be great to
have a specific format for new events, documenting their usage.
It's the responsibility of the component emitting an event to document it's usage in
the component docstring.

Event format::

"<namespace>.<component>.<action>"

Examples:

* "ibm.job.start"
"""

_instance: _Broker | None = None
_subscribers: dict[str, list[_Subscription]] = {}

@staticmethod
def __new__(cls: type[_Broker]) -> _Broker:
if _Broker._instance is None:
# Backwards compatibility for Qiskit pre-1.0; if the Qiskit-internal broker
# singleton exists then we use that instead of defining a new one, so that
# the event streams will be unified even if someone is still using the
# Qiskit entry points to subscribe.
#
# This dynamic switch assumes that the interface of this vendored `Broker`
# code remains identical to the Qiskit 0.45 version.
_Broker._instance = object.__new__(_QiskitBroker or cls)
return _Broker._instance

class _Subscription:
def __init__(self, event: str, callback: _Callback):
self.event: str = event
self.callback: _Callback = callback

def __eq__(self, other: object) -> bool:
"""Overrides the default implementation"""
if isinstance(other, self.__class__):
return self.event == other.event and id(self.callback) == id(
other.callback
) # Allow 1:N subscribers
return False

def subscribe(self, event: str, callback: _Callback) -> bool:
"""Subscribes to an event, so when it's emitted all the callbacks subscribed,
will be executed. We are not allowing double registration.

Args:
event (string): The event to subscribed in the form of:
"terra.<component>.<method>.<action>"
callback (callable): The callback that will be executed when an event is
emitted.
"""
if not callable(callback):
raise QiskitError("Callback is not a callable!")

if event not in self._subscribers:
self._subscribers[event] = []

new_subscription = self._Subscription(event, callback)
if new_subscription in self._subscribers[event]:
# We are not allowing double subscription
return False

self._subscribers[event].append(new_subscription)
return True

def dispatch(self, event: str, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Emits an event if there are any subscribers.

Args:
event (String): The event to be emitted
args: Arguments linked with the event
kwargs: Named arguments linked with the event
"""
# No event, no subscribers.
if event not in self._subscribers:
return

for subscriber in self._subscribers[event]:
subscriber.callback(*args, **kwargs)

def unsubscribe(self, event: str, callback: _Callback) -> bool:
"""Unsubscribe the specific callback to the event.

Args
event (String): The event to unsubscribe
callback (callable): The callback that won't be executed anymore

Returns
True: if we have successfully unsubscribed to the event
False: if there's no callback previously registered
"""

try:
self._subscribers[event].remove(self._Subscription(event, callback))
except KeyError:
return False

return True

def clear(self) -> None:
"""Unsubscribe everything, leaving the Broker without subscribers/events."""
self._subscribers.clear()


class Publisher:
"""Represents a "publisher".

Every component (class) can become a :class:`Publisher` and send events by
inheriting this class. Functions can call this class like::

Publisher().publish("event", args, ... )
"""

def __init__(self) -> None:
self._broker: _Broker = _Broker()

def publish(self, event: str, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Triggers an event, and associates some data to it, so if there are any
subscribers, their callback will be called synchronously."""
return self._broker.dispatch(event, *args, **kwargs)


class Subscriber:
"""Represents a "subscriber".

Every component (class) can become a :class:`Subscriber` and subscribe to events,
that will call callback functions when they are emitted.
"""

def __init__(self) -> None:
self._broker: _Broker = _Broker()

def subscribe(self, event: str, callback: _Callback) -> bool:
"""Subscribes to an event, associating a callback function to that event, so
when the event occurs, the callback will be called.

This is a blocking call, so try to keep callbacks as lightweight as possible."""
return self._broker.subscribe(event, callback)

def unsubscribe(self, event: str, callback: _Callback) -> bool:
"""Unsubscribe a pair event-callback, so the callback will not be called anymore
when the event occurs."""
return self._broker.unsubscribe(event, callback)

def clear(self) -> None:
"""Unsubscribe everything"""
self._broker.clear()
Loading