Skip to content

Commit

Permalink
Issue 1013 | Client: Ray provider (#1021)
Browse files Browse the repository at this point in the history
  • Loading branch information
IceKhan13 authored Oct 6, 2023
1 parent d02efa3 commit 7ed0aab
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
1 change: 1 addition & 0 deletions client/quantum_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Provider,
ServerlessProvider,
IBMServerlessProvider,
RayProvider,
save_result,
)
from .quantum_serverless import (
Expand Down
2 changes: 2 additions & 0 deletions client/quantum_serverless/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ServerlessProvider
IBMServerlessProvider
BaseProvider
RayProvider
ComputeResource
Job
GatewayJobClient
Expand Down Expand Up @@ -57,6 +58,7 @@
Provider,
ServerlessProvider,
IBMServerlessProvider,
RayProvider,
)
from .job import BaseJobClient, RayJobClient, GatewayJobClient, Job, save_result
from .program import (
Expand Down
16 changes: 5 additions & 11 deletions client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
ENV_GATEWAY_PROVIDER_VERSION,
GATEWAY_PROVIDER_VERSION_DEFAULT,
MAX_ARTIFACT_FILE_SIZE_MB,
ENV_JOB_ARGUMENTS,
)
from quantum_serverless.core.program import Program
from quantum_serverless.exception import QuantumServerlessException
Expand Down Expand Up @@ -131,21 +132,13 @@ def list(self, **kwargs) -> List["Job"]:

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None):
arguments = arguments or {}
arguments_string = ""
if arguments is not None:
arg_list = []
for key, value in arguments.items():
if isinstance(value, dict):
arg_list.append(f"--{key}='{json.dumps(value)}'")
else:
arg_list.append(f"--{key}={value}")
arguments_string = " ".join(arg_list)
entrypoint = f"python {program.entrypoint} {arguments_string}"
entrypoint = f"python {program.entrypoint}"

# set program name so OT can use it as parent span name
env_vars = {
**(program.env_vars or {}),
**{OT_PROGRAM_NAME: program.title},
**{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)},
}

job_id = self._job_client.submit_job(
Expand Down Expand Up @@ -405,10 +398,11 @@ def save_result(result: Dict[str, Any]):
token = os.environ.get(ENV_JOB_GATEWAY_TOKEN)
if token is None:
logging.warning(
"Results will not be saved as "
"Results will be saved as logs since"
"there is no information about the"
"authorization token in the environment."
)
logging.info("Result: %s", result)
return False

if not is_jsonable(result, cls=QiskitObjectsEncoder):
Expand Down
25 changes: 25 additions & 0 deletions client/quantum_serverless/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,28 @@ def create_compute_resource(self, resource) -> int:

def delete_compute_resource(self, resource) -> int:
raise NotImplementedError("GatewayProvider does not support resources api yet.")


class RayProvider(BaseProvider):
"""RayProvider."""

def __init__(self, host: str):
"""Ray provider
Args:
host: ray head node host
Example:
>>> ray_provider = RayProvider("http://localhost:8265")
"""
super().__init__("ray-provider", host)
self.client = RayJobClient(JobSubmissionClient(host))

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job:
return self.client.run(program, arguments)

def get_job_by_id(self, job_id: str) -> Optional[Job]:
return self.client.get(job_id)

def get_jobs(self, **kwargs) -> List[Job]:
return self.client.list()

0 comments on commit 7ed0aab

Please sign in to comment.