diff --git a/client/quantum_serverless/__init__.py b/client/quantum_serverless/__init__.py index 86303d964..5d9f6e74a 100644 --- a/client/quantum_serverless/__init__.py +++ b/client/quantum_serverless/__init__.py @@ -34,6 +34,7 @@ ServerlessProvider, IBMServerlessProvider, RayProvider, + LocalProvider, save_result, ) from .quantum_serverless import ( diff --git a/client/quantum_serverless/core/__init__.py b/client/quantum_serverless/core/__init__.py index 79d9b7fdb..c7c245aee 100644 --- a/client/quantum_serverless/core/__init__.py +++ b/client/quantum_serverless/core/__init__.py @@ -31,6 +31,7 @@ IBMServerlessProvider BaseProvider RayProvider + LocalProvider ComputeResource Job GatewayJobClient @@ -58,9 +59,17 @@ Provider, ServerlessProvider, IBMServerlessProvider, + LocalProvider, RayProvider, ) -from .job import BaseJobClient, RayJobClient, GatewayJobClient, Job, save_result +from .job import ( + BaseJobClient, + RayJobClient, + GatewayJobClient, + LocalJobClient, + Job, + save_result, +) from .program import ( Program, ProgramStorage, diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index fe3b393fb..96200ecdb 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -32,10 +32,15 @@ import os import tarfile import time +import sys from pathlib import Path from typing import Dict, Any, Optional, List from uuid import uuid4 +import subprocess +from subprocess import Popen +import re + import ray.runtime_env import requests from ray.dashboard.modules.job.sdk import JobSubmissionClient @@ -153,6 +158,71 @@ def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None): return Job(job_id=job_id, job_client=self) +class LocalJobClient(BaseJobClient): + """LocalJobClient.""" + + def __init__(self): + """Local job client. + + Args: + """ + self._jobs = {} + + def status(self, job_id: str): + return self._jobs[job_id]["status"] + + def stop(self, job_id: str): + """Stops job/program.""" + return f"job:{job_id} has already stopped" + + def logs(self, job_id: str): + return self._jobs[job_id]["logs"] + + def result(self, job_id: str): + return self._jobs[job_id]["result"] + + def get(self, job_id) -> Optional["Job"]: + return self._jobs[job_id]["job"] + + def list(self, **kwargs) -> List["Job"]: + return [job["job"] for job in list(self._jobs.values())] + + def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None): + if program.dependencies: + for dependency in program.dependencies: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", dependency] + ) + arguments = arguments or {} + env_vars = { + **(program.env_vars or {}), + **{OT_PROGRAM_NAME: program.title}, + **{"PATH": os.environ["PATH"]}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + with Popen( + ["python", program.working_dir + program.entrypoint], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + env=env_vars, + ) as pipe: + status = "SUCCEEDED" + if pipe.wait(): + status = "FAILED" + output, _ = pipe.communicate() + results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) + result = "" + if results: + result = results.group(1) + + job = Job(job_id=uuid4(), job_client=self) + entry = {"status": status, "logs": output, "result": result, "job": job} + self._jobs[job.job_id] = entry + return job + + class GatewayJobClient(BaseJobClient): """GatewayJobClient.""" @@ -403,6 +473,7 @@ def save_result(result: Dict[str, Any]): "authorization token in the environment." ) logging.info("Result: %s", result) + print(f"\nSaved Result:{result}:End Saved Result\n") return False if not is_jsonable(result, cls=QiskitObjectsEncoder): diff --git a/client/quantum_serverless/core/provider.py b/client/quantum_serverless/core/provider.py index 10d07ab45..d85463f66 100644 --- a/client/quantum_serverless/core/provider.py +++ b/client/quantum_serverless/core/provider.py @@ -51,6 +51,7 @@ Job, RayJobClient, GatewayJobClient, + LocalJobClient, BaseJobClient, ) from quantum_serverless.core.program import Program @@ -501,3 +502,27 @@ def get_job_by_id(self, job_id: str) -> Optional[Job]: def get_jobs(self, **kwargs) -> List[Job]: return self.client.list() + + +class LocalProvider(BaseProvider): + """RayProvider.""" + + def __init__(self): + """Local provider + + Args: + + Example: + >>> local = LocalProvider()) + """ + super().__init__("local-provier") + self.client = LocalJobClient() + + def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job: + return self.client.run(program, arguments) + + def get_job_by_id(self, job_id: str) -> Optional[Job]: + return self.client.get(job_id) + + def get_jobs(self, **kwargs) -> List[Job]: + return self.client.list()