diff --git a/client/quantum_serverless/__init__.py b/client/quantum_serverless/__init__.py index f1d78d06b..d8cb2bc88 100644 --- a/client/quantum_serverless/__init__.py +++ b/client/quantum_serverless/__init__.py @@ -34,6 +34,7 @@ ServerlessProvider, IBMServerlessProvider, RayProvider, + LocalProvider, save_result, ) from .quantum_serverless import ( diff --git a/client/quantum_serverless/core/__init__.py b/client/quantum_serverless/core/__init__.py index c808caee8..53d9edf3a 100644 --- a/client/quantum_serverless/core/__init__.py +++ b/client/quantum_serverless/core/__init__.py @@ -31,6 +31,7 @@ IBMServerlessProvider BaseProvider RayProvider + LocalProvider ComputeResource Job GatewayJobClient @@ -60,12 +61,20 @@ Provider, ServerlessProvider, IBMServerlessProvider, + LocalProvider, RayProvider, ) -from .job import BaseJobClient, RayJobClient, GatewayJobClient, Job, save_result + +from .job import ( + BaseJobClient, + RayJobClient, + GatewayJobClient, + LocalJobClient, + Job, + save_result, +) from .pattern import ( QiskitPattern, - Program, ProgramStorage, ProgramRepository, download_and_unpack_artifact, diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index 8e693e4c2..fb57b1d4c 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -32,10 +32,15 @@ import os import tarfile import time +import sys from pathlib import Path from typing import Dict, Any, Optional, List, Union from uuid import uuid4 +import subprocess +from subprocess import Popen +import re + import ray.runtime_env import requests from ray.dashboard.modules.job.sdk import JobSubmissionClient @@ -179,6 +184,139 @@ def run_existing( raise NotImplementedError("Run existing is not available for RayJobClient.") +class LocalJobClient(BaseJobClient): + """LocalJobClient.""" + + def __init__(self): + """Local job client. + + Args: + """ + self._jobs = {} + self._patterns = {} + + def status(self, job_id: str): + return self._jobs[job_id]["status"] + + def stop(self, job_id: str): + """Stops job/program.""" + return f"job:{job_id} has already stopped" + + def logs(self, job_id: str): + return self._jobs[job_id]["logs"] + + def result(self, job_id: str): + return self._jobs[job_id]["result"] + + def get(self, job_id) -> Optional["Job"]: + return self._jobs[job_id]["job"] + + def list(self, **kwargs) -> List["Job"]: + return [job["job"] for job in list(self._jobs.values())] + + def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None): + if program.dependencies: + for dependency in program.dependencies: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", dependency] + ) + arguments = arguments or {} + env_vars = { + **(program.env_vars or {}), + **{OT_PROGRAM_NAME: program.title}, + **{"PATH": os.environ["PATH"]}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + with Popen( + ["python", program.working_dir + program.entrypoint], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + env=env_vars, + ) as pipe: + status = "SUCCEEDED" + if pipe.wait(): + status = "FAILED" + output, _ = pipe.communicate() + results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) + result = "" + if results: + result = results.group(1) + + job = Job(job_id=str(uuid4()), job_client=self) + entry = {"status": status, "logs": output, "result": result, "job": job} + self._jobs[job.job_id] = entry + return job + + def upload(self, program: QiskitPattern): + # check if entrypoint exists + if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)): + raise QuantumServerlessException( + f"Entrypoint file [{program.entrypoint}] does not exist " + f"in [{program.working_dir}] working directory." + ) + self._patterns[program.title] = { + "title": program.title, + "entrypoint": program.entrypoint, + "working_dir": program.working_dir, + "env_vars": program.env_vars, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + } + return program.title + + def run_existing( + self, + program: Union[str, QiskitPattern], + arguments: Optional[Dict[str, Any]] = None, + ): + if isinstance(program, QiskitPattern): + title = program.title + else: + title = str(program) + + saved_program = self._patterns[title] + if saved_program["dependencies"]: + dept = json.loads(saved_program["dependencies"]) + for dependency in dept: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", dependency] + ) + arguments = arguments or {} + env_vars = { + **(saved_program["env_vars"] or {}), + **{OT_PROGRAM_NAME: saved_program["title"]}, + **{"PATH": os.environ["PATH"]}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + with Popen( + ["python", saved_program["working_dir"] + saved_program["entrypoint"]], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + env=env_vars, + ) as pipe: + status = "SUCCEEDED" + if pipe.wait(): + status = "FAILED" + output, _ = pipe.communicate() + results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) + result = "" + if results: + result = results.group(1) + + job = Job(job_id=str(uuid4()), job_client=self) + entry = {"status": status, "logs": output, "result": result, "job": job} + self._jobs[job.job_id] = entry + return job + + def get_programs(self, **kwargs): + """Returns list of programs.""" + raise NotImplementedError + + class GatewayJobClient(BaseJobClient): """GatewayJobClient.""" @@ -535,6 +673,8 @@ def save_result(result: Dict[str, Any]): "authorization token in the environment." ) logging.info("Result: %s", result) + result_record = json.dumps(result or {}, cls=QiskitObjectsEncoder) + print(f"\nSaved Result:{result_record}:End Saved Result\n") return False if not is_jsonable(result, cls=QiskitObjectsEncoder): diff --git a/client/quantum_serverless/core/provider.py b/client/quantum_serverless/core/provider.py index 4f7e8cba4..146877f0a 100644 --- a/client/quantum_serverless/core/provider.py +++ b/client/quantum_serverless/core/provider.py @@ -51,6 +51,7 @@ Job, RayJobClient, GatewayJobClient, + LocalJobClient, BaseJobClient, ) from quantum_serverless.core.pattern import QiskitPattern @@ -594,3 +595,38 @@ def get_job_by_id(self, job_id: str) -> Optional[Job]: def get_jobs(self, **kwargs) -> List[Job]: return self.client.list() + + +class LocalProvider(BaseProvider): + """RayProvider.""" + + def __init__(self): + """Local provider + + Args: + + Example: + >>> local = LocalProvider()) + """ + super().__init__("local-provier") + self.client = LocalJobClient() + + def run( + self, + program: Union[QiskitPattern, str], + arguments: Optional[Dict[str, Any]] = None, + ) -> Job: + if isinstance(program, QiskitPattern) and program.entrypoint is not None: + job = self.client.run(program, arguments) + else: + job = self.client.run_existing(program, arguments) + return job + + def get_job_by_id(self, job_id: str) -> Optional[Job]: + return self.client.get(job_id) + + def get_jobs(self, **kwargs) -> List[Job]: + return self.client.list() + + def upload(self, program: QiskitPattern): + return self.client.upload(program)