From 7ed0aab8c658dc7cd777640bbea79dd0d2da85bf Mon Sep 17 00:00:00 2001 From: Iskandar Sitdikov Date: Fri, 6 Oct 2023 11:59:12 -0400 Subject: [PATCH] Issue 1013 | Client: Ray provider (#1021) --- client/quantum_serverless/__init__.py | 1 + client/quantum_serverless/core/__init__.py | 2 ++ client/quantum_serverless/core/job.py | 16 +++++--------- client/quantum_serverless/core/provider.py | 25 ++++++++++++++++++++++ 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/client/quantum_serverless/__init__.py b/client/quantum_serverless/__init__.py index 1554f92d6..86303d964 100644 --- a/client/quantum_serverless/__init__.py +++ b/client/quantum_serverless/__init__.py @@ -33,6 +33,7 @@ Provider, ServerlessProvider, IBMServerlessProvider, + RayProvider, save_result, ) from .quantum_serverless import ( diff --git a/client/quantum_serverless/core/__init__.py b/client/quantum_serverless/core/__init__.py index e41d9f985..79d9b7fdb 100644 --- a/client/quantum_serverless/core/__init__.py +++ b/client/quantum_serverless/core/__init__.py @@ -30,6 +30,7 @@ ServerlessProvider IBMServerlessProvider BaseProvider + RayProvider ComputeResource Job GatewayJobClient @@ -57,6 +58,7 @@ Provider, ServerlessProvider, IBMServerlessProvider, + RayProvider, ) from .job import BaseJobClient, RayJobClient, GatewayJobClient, Job, save_result from .program import ( diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index ae75fd59e..fe3b393fb 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -51,6 +51,7 @@ ENV_GATEWAY_PROVIDER_VERSION, GATEWAY_PROVIDER_VERSION_DEFAULT, MAX_ARTIFACT_FILE_SIZE_MB, + ENV_JOB_ARGUMENTS, ) from quantum_serverless.core.program import Program from quantum_serverless.exception import QuantumServerlessException @@ -131,21 +132,13 @@ def list(self, **kwargs) -> List["Job"]: def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None): arguments = arguments or {} - arguments_string = "" - if arguments is not None: - arg_list = [] - for key, value in arguments.items(): - if isinstance(value, dict): - arg_list.append(f"--{key}='{json.dumps(value)}'") - else: - arg_list.append(f"--{key}={value}") - arguments_string = " ".join(arg_list) - entrypoint = f"python {program.entrypoint} {arguments_string}" + entrypoint = f"python {program.entrypoint}" # set program name so OT can use it as parent span name env_vars = { **(program.env_vars or {}), **{OT_PROGRAM_NAME: program.title}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, } job_id = self._job_client.submit_job( @@ -405,10 +398,11 @@ def save_result(result: Dict[str, Any]): token = os.environ.get(ENV_JOB_GATEWAY_TOKEN) if token is None: logging.warning( - "Results will not be saved as " + "Results will be saved as logs since" "there is no information about the" "authorization token in the environment." ) + logging.info("Result: %s", result) return False if not is_jsonable(result, cls=QiskitObjectsEncoder): diff --git a/client/quantum_serverless/core/provider.py b/client/quantum_serverless/core/provider.py index 2c8139dc8..10d07ab45 100644 --- a/client/quantum_serverless/core/provider.py +++ b/client/quantum_serverless/core/provider.py @@ -476,3 +476,28 @@ def create_compute_resource(self, resource) -> int: def delete_compute_resource(self, resource) -> int: raise NotImplementedError("GatewayProvider does not support resources api yet.") + + +class RayProvider(BaseProvider): + """RayProvider.""" + + def __init__(self, host: str): + """Ray provider + + Args: + host: ray head node host + + Example: + >>> ray_provider = RayProvider("http://localhost:8265") + """ + super().__init__("ray-provider", host) + self.client = RayJobClient(JobSubmissionClient(host)) + + def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job: + return self.client.run(program, arguments) + + def get_job_by_id(self, job_id: str) -> Optional[Job]: + return self.client.get(job_id) + + def get_jobs(self, **kwargs) -> List[Job]: + return self.client.list()