Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to indicate they are done submitting jobs to a session #1123

Closed
wants to merge 18 commits into from
5 changes: 2 additions & 3 deletions docs/how_to/backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,8 @@ If you are using a runtime session, add the ``backend`` option when starting you
with Session(service=service, backend="ibmq_qasm_simulator") as session:
estimator = Estimator(session=session, options=options)
job = estimator.run(circuit, observable)
result = job.result()
# Close the session only if all jobs are finished, and you don't need to run more in the session
session.close() # Closes the session

result = job.result()

display(circuit.draw("mpl"))
print(f" > Observable: {observable.paulis}")
Expand Down
4 changes: 0 additions & 4 deletions docs/how_to/error-mitigation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ The Estimator interface lets users seamlessly work with the variety of error mit
estimator = Estimator(session=session, options=options)
job = estimator.run(circuits=[psi1], observables=[H1], parameter_values=[theta1])
psi1_H1 = job.result()
# Close the session only if all jobs are finished, and you don't need to run more in the session
session.close()

.. note::
As you increase the resilience level, you will be able to use additional methods to improve the accuracy of your result. However, because the methods become more advanced with each level, they require additional sampling overhead (time) to generate more accurate expectation values.
Expand Down Expand Up @@ -236,6 +234,4 @@ Example of adding ``resilience_options`` into your estimator session
estimator = Estimator(session=session, options=options)
job = estimator.run(circuits=[psi1], observables=[H1], parameter_values=[theta1])
psi1_H1 = job.result()
# Close the session only if all jobs are finished, and you don't need to run more in the session
session.close()

2 changes: 0 additions & 2 deletions docs/how_to/error-suppression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ Example: configure Estimator with optimization levels
estimator = Estimator(session=session, options=options)
job = estimator.run(circuits=[psi], observables=[H], parameter_values=[theta])
psi1_H1 = job.result()
# Close the session only if all jobs are finished, and you don't need to run more in the session
session.close()

.. note::
If optimization level is not specified, the service uses ``optimization_level = 3``.
Expand Down
11 changes: 3 additions & 8 deletions docs/how_to/run_session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,17 @@ There is also an interactive timeout value (5 minutes), which is not configurabl
Close a session
---------------

When jobs are all done, it is recommended that you use `session.close()` to close the session. This allows the scheduler to run the next job without waiting for the session timeout, therefore making it easier for everyone. You cannot submit jobs to a closed session.
When the context manager exits or when `session.close()` is explicitly called, the session won't accept any more jobs. Any queued jobs will run until completion or until the max time expires.

.. warning::
Close a session only after all session jobs **complete**, rather than immediately after they have all been submitted. Session jobs that are not completed will fail.
When a session that is not accepting jobs has run out of jobs to run, it is immediately closed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beckykd should docs updates be in a separate PR? I'll probably need help with the wording on a lot of these - it will also be a bit tricky since once this is released, older versions of qiskit-ibm-runtime will behave differently

.. code-block:: python

with Session(service=service, backend=backend) as session:
estimator = Estimator()
job = estimator.run(...)
# Do not close here, the job might not be completed!
# results can also be retrieved after the context manager exits
result = job.result()
# job.result() is blocking, so this job is now finished and the session can be safely closed.
session.close()

Full example
------------
Expand All @@ -126,8 +123,6 @@ In this example, we start a session, run an Estimator job, and output the result
estimator = Estimator(options=options)
job = estimator.run(circuit, observable)
result = job.result()
# Close the session only if all jobs are finished, and you don't need to run more in the session
session.close()

display(circuit.draw("mpl"))
print(f" > Observable: {observable.paulis}")
Expand Down
2 changes: 0 additions & 2 deletions qiskit_ibm_runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
circuits=[psi], observables=[H1], parameter_values=[theta]
)
print(f"Estimator results: {job.result()}")
# Close the session only if all jobs are finished and you don't need to run more in the session.
session.close()

Backend data
------------
Expand Down
8 changes: 6 additions & 2 deletions qiskit_ibm_runtime/api/clients/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,16 @@ def job_metadata(self, job_id: str) -> Dict[str, Any]:
"""
return self._api.program_job(job_id).metadata()

def close_session(self, session_id: str) -> None:
"""Close the runtime session.
def cancel_session(self, session_id: str) -> None:
"""Close all jobs in the runtime session.

Args:
session_id: Session ID.
"""
self._api.runtime_session(session_id=session_id).cancel()

def close_session(self, session_id: str) -> None:
"""Update session so jobs can no longer be submitted."""
self._api.runtime_session(session_id=session_id).close()

def list_backends(
Expand Down
57 changes: 57 additions & 0 deletions qiskit_ibm_runtime/api/rest/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Base REST adapter."""

from ..session import RetrySession


class RestAdapterBase:
"""Base class for REST adapters."""

URL_MAP = {} # type: ignore[var-annotated]
"""Mapping between the internal name of an endpoint and the actual URL."""

_HEADER_JSON_CONTENT = {"Content-Type": "application/json"}

def __init__(self, session: RetrySession, prefix_url: str = "") -> None:
"""RestAdapterBase constructor.

Args:
session: Session to be used in the adapter.
prefix_url: String to be prepend to all URLs.
"""
self.session = session
self.prefix_url = prefix_url

def get_url(self, identifier: str) -> str:
"""Return the resolved URL for the specified identifier.

Args:
identifier: Internal identifier of the endpoint.

Returns:
The resolved URL of the endpoint (relative to the session base URL).
"""
return "{}{}".format(self.prefix_url, self.URL_MAP[identifier])

def get_prefixed_url(self, prefix: str, identifier: str) -> str:
"""Return an adjusted URL for the specified identifier.

Args:
prefix: string to be prepended to the URL.
identifier: Internal identifier of the endpoint.

Returns:
The resolved facade URL of the endpoint.
"""
return "{}{}{}".format(prefix, self.prefix_url, self.URL_MAP[identifier])
2 changes: 1 addition & 1 deletion qiskit_ibm_runtime/api/rest/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

from qiskit_ibm_provider.api.rest.base import RestAdapterBase
from qiskit_ibm_provider.api.rest.program_job import ProgramJob
from qiskit_ibm_provider.api.rest.runtime_session import RuntimeSession
from qiskit_ibm_provider.utils import local_to_utc

from .runtime_session import RuntimeSession
from .program import Program
from ...utils import RuntimeEncoder
from .cloud_backend import CloudBackend
Expand Down
46 changes: 46 additions & 0 deletions qiskit_ibm_runtime/api/rest/runtime_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Runtime Session REST adapter."""

from .base import RestAdapterBase
from ..session import RetrySession


class RuntimeSession(RestAdapterBase):
"""Rest adapter for session related endpoints."""

URL_MAP = {
"self": "",
"close": "/close",
}

def __init__(self, session: RetrySession, session_id: str, url_prefix: str = "") -> None:
"""Job constructor.

Args:
session: RetrySession to be used in the adapter.
session_id: Job ID of the first job in a runtime session.
url_prefix: Prefix to use in the URL.
"""
super().__init__(session, "{}/sessions/{}".format(url_prefix, session_id))

def cancel(self) -> None:
"""Cancel all jobs in the session."""
url = self.get_url("close")
self.session.delete(url)

def close(self) -> None:
"""Set accepting_jobs flag to false, so no more jobs can be submitted."""
payload = {"accepting_jobs": False}
url = self.get_url("self")
self.session.patch(url, json=payload)
3 changes: 0 additions & 3 deletions qiskit_ibm_runtime/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ class Estimator(BasePrimitive, BaseEstimator):
parameter_values=[theta1]*2
)
print(psi1_H23.result())
# Close the session only if all jobs are finished
# and you don't need to run more in the session
session.close()
"""

_PROGRAM_ID = "estimator"
Expand Down
3 changes: 0 additions & 3 deletions qiskit_ibm_runtime/qiskit_runtime_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ class QiskitRuntimeService(Provider):
circuits=[psi], observables=[H1], parameter_values=[theta]
)
print(f"Estimator results: {job.result()}")
# Close the session only if all jobs are finished
# and you don't need to run more in the session.
session.close()

The example above uses the dedicated :class:`~qiskit_ibm_runtime.Sampler`
and :class:`~qiskit_ibm_runtime.Estimator` classes. You can also
Expand Down
4 changes: 0 additions & 4 deletions qiskit_ibm_runtime/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ class Sampler(BasePrimitive, BaseSampler):
print(f"Job result: {job.result()}")

# You can run more jobs inside the session

# Close the session only if all jobs are finished
# and you don't need to run more in the session.
session.close()
"""

def __init__(
Expand Down
17 changes: 12 additions & 5 deletions qiskit_ibm_runtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ class Session:
job = sampler.run(ReferenceCircuits.bell())
print(f"Sampler job ID: {job.job_id()}")
print(f"Sampler job result: {job.result()}")
# Close the session only if all jobs are finished and
# you don't need to run more in the session.
session.close()

"""

Expand Down Expand Up @@ -179,10 +176,19 @@ def run(

return job

def close(self) -> None:
"""Close the session."""
def cancel(self) -> None:
"""Cancel all jobs in a session."""
self._active = False
if self._session_id:
self._service._api_client.cancel_session(self._session_id)

def close(self) -> None:
"""Update the session so new jobs will not be accepted, but existing
queued or running jobs will run to completion. The session will be closed when there
are no more jobs to run."""
backend = self._service.backend(self._backend)
# There is a 500 internal error on IQP if the backend is a simulator
if not backend.simulator and self._session_id:
self._service._api_client.close_session(self._session_id)

def backend(self) -> Optional[str]:
Expand Down Expand Up @@ -235,6 +241,7 @@ def __exit__(
exc_tb: Optional[TracebackType],
) -> None:
set_cm_session(None)
self.close()


# Default session
Expand Down
14 changes: 14 additions & 0 deletions releasenotes/notes/sessions-accepting-jobs-d77e3bba150a20f5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
upgrade:
- |
:meth:`qiskit_ibm_runtime.Session.close` has been updated to mark a ``Session`` as no longer
accepting new jobs. The session won't accept more jobs but it will continue to run any
queued jobs until they are done or the max time expires. This will also happen
automatically when the session context manager is exited. When a session that is not accepting
jobs has run out of jobs to run, it's immediately closed, freeing up the backend to run more jobs rather
than wait for the interactive timeout.

The old close method behaviour has been moved to a new method,
:meth:`qiskit_ibm_runtime.Session.cancel`, where all jobs within a session are
cancelled and terminated.

3 changes: 0 additions & 3 deletions test/integration/test_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def test_estimator_session(self, service):
self.assertIsInstance(result5, EstimatorResult)
self.assertEqual(len(result5.values), len(circuits5))
self.assertEqual(len(result5.metadata), len(circuits5))
session.close()

@run_integration_test
def test_estimator_callback(self, service):
Expand All @@ -131,7 +130,6 @@ def _callback(job_id_, result_):
self.assertTrue((result.values == ws_result_values).all())
self.assertEqual(len(job_ids), 1)
self.assertEqual(job.job_id(), job_ids.pop())
session.close()

@run_integration_test
def test_estimator_coeffs(self, service):
Expand Down Expand Up @@ -181,7 +179,6 @@ def test_estimator_coeffs(self, service):

chsh1_runtime = job1.result()
chsh2_runtime = job2.result()
session.close()

self.assertTrue(np.allclose(chsh1_terra.values, chsh1_runtime.values, rtol=0.3))
self.assertTrue(np.allclose(chsh2_terra.values, chsh2_runtime.values, rtol=0.3))
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_noise_model(self, service):
@run_integration_test
def test_simulator_transpile(self, service):
"""Test simulator transpile options."""
backend = service.backends(simulator=True)[0]
backend = service.backend("ibmq_qasm_simulator")
self.log.info("Using backend %s", backend.name)

circ = QuantumCircuit(2, 2)
Expand Down
5 changes: 0 additions & 5 deletions test/integration/test_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def test_sampler_non_parameterized_circuits(self, service):
for i in range(len(circuits3)):
self.assertAlmostEqual(result3.quasi_dists[i][3], 0.5, delta=0.1)
self.assertAlmostEqual(result3.quasi_dists[i][0], 0.5, delta=0.1)
session.close()

@run_integration_test
def test_sampler_primitive_parameterized_circuits(self, service):
Expand Down Expand Up @@ -98,7 +97,6 @@ def test_sampler_primitive_parameterized_circuits(self, service):
self.assertIsInstance(result, SamplerResult)
self.assertEqual(len(result.quasi_dists), len(circuits0))
self.assertEqual(len(result.metadata), len(circuits0))
session.close()

@run_integration_test
def test_sampler_skip_transpile(self, service):
Expand All @@ -114,7 +112,6 @@ def test_sampler_skip_transpile(self, service):
sampler.run(circuits=circ, skip_transpilation=True).result()
# If transpilation not skipped the error would be something about cannot expand.
self.assertIn("invalid instructions", err.exception.message)
session.close()

@run_integration_test
def test_sampler_optimization_level(self, service):
Expand All @@ -129,7 +126,6 @@ def test_sampler_optimization_level(self, service):
)
self.assertAlmostEqual(result.quasi_dists[0][3], 0.5, delta=0.1)
self.assertAlmostEqual(result.quasi_dists[0][0], 0.5, delta=0.1)
session.close()

@run_integration_test
def test_sampler_callback(self, service):
Expand All @@ -154,7 +150,6 @@ def _callback(job_id_, result_):
self.assertEqual(result.quasi_dists, ws_result_quasi)
self.assertEqual(len(job_ids), 1)
self.assertEqual(job.job_id(), job_ids.pop())
session.close()

@run_integration_test
def test_sampler_no_session(self, service):
Expand Down
1 change: 0 additions & 1 deletion test/integration/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def test_session_from_id(self, service):
sampler = Sampler(session=session)
job = sampler.run(ReferenceCircuits.bell(), shots=400)
session_id = job.session_id
session.close()
new_session = Session.from_id(backend=backend, session_id=session_id)
sampler = Sampler(session=new_session)
job = sampler.run(ReferenceCircuits.bell(), shots=400)
Expand Down
Loading
Loading