Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1036 | Add program upload #1041

Merged
merged 8 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 127 additions & 1 deletion client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import tarfile
import time
from pathlib import Path
from typing import Dict, Any, Optional, List
from typing import Dict, Any, Optional, List, Union
from uuid import uuid4

import ray.runtime_env
Expand Down Expand Up @@ -73,6 +73,16 @@ def run(
"""Runs program."""
raise NotImplementedError

def upload(self, program: Program):
"""Uploads program."""
raise NotImplementedError

def run_existing(
self, program: Union[str, Program], arguments: Optional[Dict[str, Any]] = None
):
"""Executes existing program."""
raise NotImplementedError

def get(self, job_id) -> Optional["Job"]:
"""Returns job by job id"""
raise NotImplementedError
Expand All @@ -97,6 +107,10 @@ def result(self, job_id: str):
"""Return results."""
raise NotImplementedError

def get_programs(self, **kwargs):
"""Returns list of programs."""
raise NotImplementedError


class RayJobClient(BaseJobClient):
"""RayJobClient."""
Expand Down Expand Up @@ -152,6 +166,14 @@ def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None):
)
return Job(job_id=job_id, job_client=self)

def upload(self, program: Program):
raise NotImplementedError("Upload is not available for RayJobClient.")

def run_existing(
self, program: Union[str, Program], arguments: Optional[Dict[str, Any]] = None
):
raise NotImplementedError("Run existing is not available for RayJobClient.")


class GatewayJobClient(BaseJobClient):
"""GatewayJobClient."""
Expand Down Expand Up @@ -227,6 +249,93 @@ def run( # pylint: disable=too-many-locals

return Job(job_id, job_client=self)

def upload(self, program: Program):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", program.title)

url = f"{self.host}/api/{self.version}/programs/upload/"
artifact_file_path = os.path.join(program.working_dir, "artifact.tar")

# check if entrypoint exists
if not os.path.exists(
os.path.join(program.working_dir, program.entrypoint)
):
raise QuantumServerlessException(
f"Entrypoint file [{program.entrypoint}] does not exist "
f"in [{program.working_dir}] working directory."
)

with tarfile.open(artifact_file_path, "w") as tar:
for filename in os.listdir(program.working_dir):
fpath = os.path.join(program.working_dir, filename)
tar.add(fpath, arcname=filename)

# check file size
size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2
if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB:
raise QuantumServerlessException(
f"{artifact_file_path} is {int(size_in_mb)} Mb, "
f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. "
f"Try to reduce size of `working_dir`."
)

with open(artifact_file_path, "rb") as file:
response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps({}),
"dependencies": json.dumps(program.dependencies or []),
},
files={"artifact": file},
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
)
program_title = response_data.get("title", "na")
span.set_attribute("program.title", program_title)

if os.path.exists(artifact_file_path):
os.remove(artifact_file_path)

return program_title

def run_existing(
self, program: Union[str, Program], arguments: Optional[Dict[str, Any]] = None
):
if isinstance(program, Program):
title = program.title
else:
title = str(program)

tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run_existing") as span:
span.set_attribute("program", title)
span.set_attribute("arguments", str(arguments))

url = f"{self.host}/api/{self.version}/programs/run_existing/"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": title,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
},
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
)
job_id = response_data.get("id")
span.set_attribute("job.id", job_id)

return Job(job_id, job_client=self)

def status(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.status"):
Expand Down Expand Up @@ -319,6 +428,23 @@ def list(self, **kwargs) -> List["Job"]:
for job in response_data.get("results", [])
]

def get_programs(self, **kwargs):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.list"):
limit = kwargs.get("limit", 10)
offset = kwargs.get("offset", 0)
response_data = safe_json_request(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs/?limit={limit}&offset={offset}",
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return [
Program(program.get("title"), raw_data=program)
for program in response_data.get("results", [])
]


class Job:
"""Job."""
Expand Down
11 changes: 9 additions & 2 deletions client/quantum_serverless/core/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,27 @@ class Program: # pylint: disable=too-many-instance-attributes
"""

title: str
entrypoint: str
working_dir: str = "./"
entrypoint: Optional[str] = None
working_dir: Optional[str] = "./"
env_vars: Optional[Dict[str, str]] = None
dependencies: Optional[List[str]] = None
description: Optional[str] = None
version: Optional[str] = None
tags: Optional[List[str]] = None
raw_data: Optional[Dict[str, Any]] = None

@classmethod
def from_json(cls, data: Dict[str, Any]):
"""Reconstructs Program from dictionary."""
field_names = set(f.name for f in dataclasses.fields(Program))
return Program(**{k: v for k, v in data.items() if k in field_names})

def __str__(self):
return f"Program({self.title})"

def __repr__(self):
return self.__str__()


class ProgramStorage(ABC):
"""Base program backend to save and load programs from."""
Expand Down
61 changes: 50 additions & 11 deletions client/quantum_serverless/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import warnings
import os.path
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Union

import ray
import requests
Expand Down Expand Up @@ -260,7 +260,9 @@ def get_job_by_id(self, job_id: str) -> Optional[Job]:
return None
return Job(job_id=job_id, job_client=job_client)

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job:
def run(
self, program: Union[Program, str], arguments: Optional[Dict[str, Any]] = None
) -> Job:
"""Execute a program as a async job.

Example:
Expand Down Expand Up @@ -292,26 +294,44 @@ def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> J

return job_client.run(program, arguments)

def upload(self, program: Program):
"""Uploads program."""
raise NotImplementedError

def files(self) -> List[str]:
"""Returns list of available files produced by programs to download."""
raise NotImplementedError

def download(self, file: str, download_location: str):
def download(self, file: str, download_location: str = "./"):
"""Download file."""
warnings.warn(
"`download` method has been deprecated. "
"And will be removed in future releases. "
"Please, use `file_download` instead.",
DeprecationWarning,
)
return self.file_download(file, download_location)

def file_download(self, file: str, download_location: str):
Copy link
Collaborator

@psschwei psschwei Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anyone has implemented their own provider, so this probably is more academic than anything else, but for sake of completeness: should we deprecate the download function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is "experimental" feature :)

but anyway it is a good practice. Deprecated download in 6e690da

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to deprecate the other functions too? (*hides*)

Copy link
Member Author

@IceKhan13 IceKhan13 Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not, upload though. That is why I though it is a strange thing to deprecate half and not the other

"""Download file."""
raise NotImplementedError

def delete(self, file: str):
def file_delete(self, file: str):
"""Deletes file uploaded or produced by the programs,"""
raise NotImplementedError

def upload(self, file: str):
def file_upload(self, file: str):
"""Upload file."""
raise NotImplementedError

def widget(self):
"""Widget for information about provider and jobs."""
return Widget(self).show()

def get_programs(self, **kwargs):
"""Returns list of available programs."""
raise NotImplementedError


class ServerlessProvider(BaseProvider):
"""
Expand Down Expand Up @@ -388,27 +408,41 @@ def delete_compute_resource(self, resource) -> int:
def get_job_by_id(self, job_id: str) -> Optional[Job]:
return self._job_client.get(job_id)

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job:
def run(
self, program: Union[Program, str], arguments: Optional[Dict[str, Any]] = None
) -> Job:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("Provider.run"):
job = self._job_client.run(program, arguments)
if isinstance(program, Program) and program.entrypoint is not None:
job = self._job_client.run(program, arguments)
else:
job = self._job_client.run_existing(program, arguments)
return job

def upload(self, program: Program):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("Provider.upload"):
response = self._job_client.upload(program)
return response

def get_jobs(self, **kwargs) -> List[Job]:
return self._job_client.list(**kwargs)

def files(self) -> List[str]:
return self._files_client.list()

def download(self, file: str, download_location: str = "./"):
def file_download(self, file: str, download_location: str = "./"):
return self._files_client.download(file, download_location)

def delete(self, file: str):
def file_delete(self, file: str):
return self._files_client.delete(file)

def upload(self, file: str):
def file_upload(self, file: str):
return self._files_client.upload(file)

def get_programs(self, **kwargs) -> List[Program]:
return self._job_client.get_programs(**kwargs)

def _fetch_token(self, username: str, password: str):
response_data = safe_json_request(
request=lambda: requests.post(
Expand Down Expand Up @@ -493,7 +527,12 @@ def __init__(self, host: str):
super().__init__("ray-provider", host)
self.client = RayJobClient(JobSubmissionClient(host))

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job:
def run(
self, program: Union[Program, str], arguments: Optional[Dict[str, Any]] = None
) -> Job:
if isinstance(program, str):
raise NotImplementedError("Ray provider only supports full Programs.")

return self.client.run(program, arguments)

def get_job_by_id(self, job_id: str) -> Optional[Job]:
Expand Down
Loading