Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add open batches to pulser-pasqal #701

Merged
merged 33 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b92789c
rework sdk to not require batch_id as an argument
Jul 18, 2024
019afa8
rework sdk to not require batch_id as an argument
Jul 18, 2024
869f646
rework sdk to not require batch_id as an argument
Jul 18, 2024
46d197d
rework sdk to not require batch_id as an argument
Jul 18, 2024
21d4182
rework sdk to not require batch_id as an argument
Jul 18, 2024
78b430a
change to context manager interface for open batches
Jul 9, 2024
ce767d4
fix rebase
Jul 19, 2024
5d8f7a5
fix rebase, and linting
Jul 19, 2024
2c15015
fix type
Jul 19, 2024
5fbc030
complete test coverage for method calls
Jul 19, 2024
266a66a
context management class, update tests
Jul 22, 2024
be75021
inside return is ignored with _
Jul 23, 2024
2004ce2
mr feedback
Jul 23, 2024
b158850
boolean condition for open batch support
Jul 24, 2024
41fef08
test coverage
Jul 24, 2024
11bc771
flake8
Jul 24, 2024
d307051
MR feedback
Jul 31, 2024
36afb03
comment on arg name
Jul 31, 2024
bef4074
support complete -> open keyword change for batches
Sep 6, 2024
96aed7a
support complete -> open keyword change for batches
Sep 6, 2024
8431b57
lint
Sep 6, 2024
14d95b1
Merge branch 'develop' into og/open-submissions
HGSilveri Sep 16, 2024
14ba4a9
Bump pasqal-cloud to v0.12
HGSilveri Sep 16, 2024
c92a02d
Include only the new jobs in the RemoteResults of each call to submit()
HGSilveri Sep 17, 2024
305745c
Merge branch 'develop' into og/open-submissions
HGSilveri Sep 17, 2024
35938a9
Merge branch 'develop' into og/open-submissions
HGSilveri Sep 18, 2024
8fb595c
Merge branch 'develop' into og/open-submissions
HGSilveri Sep 18, 2024
f98a1f6
Give stored batch ID to get available results
HGSilveri Sep 19, 2024
6d4a764
Submission -> Batch outside of RemoteResults
HGSilveri Sep 19, 2024
d89132d
Including backend specific kwargs to RemoteConnection.submit() when o…
HGSilveri Sep 19, 2024
547c6d1
Fully deprecate 'submission' for 'batch'
HGSilveri Sep 20, 2024
0b841d5
Relax `pasqal-cloud` requirement
HGSilveri Sep 20, 2024
c1f0e2a
Consistency updates to the tutorial
HGSilveri Sep 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ dist/
env*
*.egg-info/
__venv__/
venv
5 changes: 1 addition & 4 deletions pulser-core/pulser/backend/qpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ def run(
self.validate_job_params(
job_params or [], self._sequence.device.max_runs
)
results = self._connection.submit(
self._sequence, job_params=job_params, wait=wait
)
return cast(RemoteResults, results)
return cast(RemoteResults, super().run(job_params, wait))

@staticmethod
def validate_job_params(
Expand Down
157 changes: 131 additions & 26 deletions pulser-core/pulser/backend/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base classes for remote backend execution."""

from __future__ import annotations

import typing
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Any, Mapping, TypedDict
from types import TracebackType
from typing import Any, Mapping, Type, TypedDict, cast

from pulser.backend.abc import Backend
from pulser.devices import Device
Expand All @@ -44,6 +46,22 @@ class SubmissionStatus(Enum):
PAUSED = auto()


class BatchStatus(Enum):
"""Status of a batch.

Same as SubmissionStatus, this was needed because we renamed
Submission -> Batch.
"""

PENDING = auto()
RUNNING = auto()
DONE = auto()
CANCELED = auto()
TIMED_OUT = auto()
ERROR = auto()
PAUSED = auto()


class JobStatus(Enum):
"""Status of a remote job."""

Expand All @@ -66,7 +84,7 @@ class RemoteResults(Results):

Args:
submission_id: The ID that identifies the submission linked to
the results.
the results (aka the batch ID).
connection: The remote connection over which to get the submission's
status and fetch the results.
job_ids: If given, specifies which jobs within the submission should
Expand All @@ -81,14 +99,14 @@ def __init__(
job_ids: list[str] | None = None,
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved
):
"""Instantiates a new collection of remote results."""
self._submission_id = submission_id
self._batch_id = submission_id
self._connection = connection
if job_ids is not None and not set(job_ids).issubset(
all_job_ids := self._connection._get_job_ids(self._submission_id)
all_job_ids := self._connection._get_job_ids(self._batch_id)
):
unknown_ids = [id_ for id_ in job_ids if id_ not in all_job_ids]
raise RuntimeError(
f"Submission {self._submission_id!r} does not contain jobs "
f"Submission {self._batch_id!r} does not contain jobs "
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved
f"{unknown_ids}."
)
self._job_ids = job_ids
Expand All @@ -98,23 +116,30 @@ def results(self) -> tuple[Result, ...]:
"""The actual results, obtained after execution is done."""
return self._results

@property
def _submission_id(self) -> str:
"""The same as the batch ID, kept for backwards compatibility."""
return self._batch_id

@property
def batch_id(self) -> str:
"""The ID of the batch containing these results."""
return self._submission_id
return self._batch_id

@property
def job_ids(self) -> list[str]:
"""The IDs of the jobs within this results submission."""
"""The IDs of the jobs within these results' submission."""
if self._job_ids is None:
return self._connection._get_job_ids(self._submission_id)
return self._connection._get_job_ids(self._batch_id)
return self._job_ids

def get_status(self) -> SubmissionStatus:
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved
"""Gets the status of the remote submission."""
return self._connection._get_submission_status(self._submission_id)
return SubmissionStatus[
self._connection._get_batch_status(self._batch_id).name
]

def get_available_results(self, submission_id: str) -> dict[str, Result]:
def get_available_results(self) -> dict[str, Result]:
"""Returns the available results of a submission.

Unlike the `results` property, this method does not raise an error if
Expand All @@ -127,7 +152,7 @@ def get_available_results(self, submission_id: str) -> dict[str, Result]:
results = {
k: v[1]
for k, v in self._connection._query_job_progress(
submission_id
self.batch_id
).items()
if v[1] is not None
}
Expand All @@ -141,7 +166,7 @@ def __getattr__(self, name: str) -> Any:
try:
self._results = tuple(
self._connection._fetch_result(
self._submission_id, self._job_ids
self.batch_id, self._job_ids
)
)
return self._results
Expand All @@ -161,42 +186,43 @@ class RemoteConnection(ABC):

@abstractmethod
def submit(
self, sequence: Sequence, wait: bool = False, **kwargs: Any
self,
sequence: Sequence,
wait: bool = False,
open: bool = True,
batch_id: str | None = None,
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved
**kwargs: Any,
) -> RemoteResults | tuple[RemoteResults, ...]:
"""Submit a job for execution."""
pass

@abstractmethod
def _fetch_result(
self, submission_id: str, job_ids: list[str] | None
self, batch_id: str, job_ids: list[str] | None
) -> typing.Sequence[Result]:
"""Fetches the results of a completed submission."""
"""Fetches the results of a completed batch."""
pass

@abstractmethod
def _query_job_progress(
self, submission_id: str
self, batch_id: str
) -> Mapping[str, tuple[JobStatus, Result | None]]:
"""Fetches the status and results of all the jobs in a submission.
"""Fetches the status and results of all the jobs in a batch.

Unlike `_fetch_result`, this method does not raise an error if some
jobs associated to the submission do not have results.
jobs in the batch do not have results.

It returns a dictionnary mapping the job ID to its status and results.
"""
pass

@abstractmethod
def _get_submission_status(self, submission_id: str) -> SubmissionStatus:
"""Gets the status of a submission from its ID.

Not all SubmissionStatus values must be covered, but at least
SubmissionStatus.DONE is expected.
"""
def _get_batch_status(self, batch_id: str) -> BatchStatus:
"""Gets the status of a batch from its ID."""
pass

def _get_job_ids(self, submission_id: str) -> list[str]:
"""Gets all the job IDs within a submission."""
def _get_job_ids(self, batch_id: str) -> list[str]:
"""Gets all the job IDs within a batch."""
raise NotImplementedError(
"Unable to find job IDs through this remote connection."
)
Expand All @@ -208,6 +234,17 @@ def fetch_available_devices(self) -> dict[str, Device]:
"remote connection."
)

def _close_batch(self, batch_id: str) -> None:
"""Closes a batch using its ID."""
raise NotImplementedError( # pragma: no cover
"Unable to close batch through this remote connection"
)

@abstractmethod
def supports_open_batch(self) -> bool:
"""Flag to confirm this class can support creating an open batch."""
pass
HGSilveri marked this conversation as resolved.
Show resolved Hide resolved


class RemoteBackend(Backend):
"""A backend for sequence execution through a remote connection.
Expand All @@ -234,6 +271,39 @@ def __init__(
"'connection' must be a valid RemoteConnection instance."
)
self._connection = connection
self._batch_id: str | None = None

def run(
self, job_params: list[JobParams] | None = None, wait: bool = False
) -> RemoteResults | tuple[RemoteResults, ...]:
"""Runs the sequence on the remote backend and returns the result.

Args:
job_params: A list of parameters for each job to execute. Each
mapping must contain a defined 'runs' field specifying
the number of times to run the same sequence. If the sequence
is parametrized, the values for all the variables necessary
to build the sequence must be given in it's own mapping, for
each job, under the 'variables' field.
wait: Whether to wait until the results of the jobs become
available. If set to False, the call is non-blocking and the
obtained results' status can be checked using their `status`
property.

Returns:
The results, which can be accessed once all sequences have been
successfully executed.
"""
return self._connection.submit(
self._sequence,
job_params=job_params,
wait=wait,
**self._submit_kwargs(),
)

def _submit_kwargs(self) -> dict[str, Any]:
"""Keyword arguments given to any call to RemoteConnection.submit()."""
return dict(batch_id=self._batch_id)

@staticmethod
def _type_check_job_params(job_params: list[JobParams] | None) -> None:
Expand All @@ -247,3 +317,38 @@ def _type_check_job_params(job_params: list[JobParams] | None) -> None:
"All elements of 'job_params' must be dictionaries; "
f"got {type(d)} instead."
)

def open_batch(self) -> _OpenBatchContextManager:
"""Creates an open batch within a context manager object."""
if not self._connection.supports_open_batch():
raise NotImplementedError(
"Unable to execute open_batch using this remote connection"
)
return _OpenBatchContextManager(self)


class _OpenBatchContextManager:
def __init__(self, backend: RemoteBackend) -> None:
self.backend = backend

def __enter__(self) -> _OpenBatchContextManager:
batch = cast(
RemoteResults,
self.backend._connection.submit(
self.backend._sequence,
open=True,
**self.backend._submit_kwargs(),
),
)
self.backend._batch_id = batch.batch_id
return self

def __exit__(
self,
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if self.backend._batch_id:
self.backend._connection._close_batch(self.backend._batch_id)
self.backend._batch_id = None
12 changes: 7 additions & 5 deletions pulser-pasqal/pulser_pasqal/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

from dataclasses import fields
from typing import ClassVar
from typing import Any, ClassVar

import pasqal_cloud

Expand Down Expand Up @@ -88,12 +88,14 @@ def run(
"All elements of 'job_params' must specify 'runs'" + suffix
)

return self._connection.submit(
self._sequence,
job_params=job_params,
return super().run(job_params, wait)

def _submit_kwargs(self) -> dict[str, Any]:
"""Keyword arguments given to any call to RemoteConnection.submit()."""
return dict(
batch_id=self._batch_id,
emulator=self.emulator,
config=self._config,
wait=wait,
mimic_qpu=self._mimic_qpu,
)

Expand Down
Loading