Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker config options #1051

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ spec:
value: {{ .Values.application.auth.keycloak.realm | quote }}
- name: SETTINGS_KEYCLOAK_CLIENT_SECRET
value: {{ .Values.application.auth.keycloak.clientSecret | quote }}
- name: RAY_CLUSTER_WORKER_REPLICAS
value: {{ .Values.application.ray.replicas | quote }}
- name: RAY_CLUSTER_WORKER_MIN_REPLICAS
value: {{ .Values.application.ray.minReplicas | quote }}
- name: RAY_CLUSTER_WORKER_MAX_REPLICAS
value: {{ .Values.application.ray.maxReplicas | quote }}
{{- if .Values.application.superuser.enable }}
- name: DJANGO_SUPERUSER_USERNAME
valueFrom:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ data:
{{- if .Values.application.ray.scrapeWithPrometheus }}
headServiceAnnotations:
prometheus.io/scrape: "true"
{{- end }}
{{- end }}
enableInTreeAutoscaling: {{`{{ auto_scaling }}`}}
headGroupSpec:
rayStartParams:
dashboard-host: 0.0.0.0
Expand Down Expand Up @@ -193,11 +194,11 @@ data:
claimName: {{ .Values.cos.claimName }}
workerGroupSpecs:
- groupName: g
maxReplicas: {{ .Values.application.ray.maxReplicas }}
minReplicas: {{ .Values.application.ray.minReplicas }}
maxReplicas: {{`{{ max_workers }}`}}
minReplicas: {{`{{ min_workers }}`}}
rayStartParams:
block: 'true'
replicas: {{ .Values.application.ray.replicas }}
replicas: {{`{{ workers }}`}}
template:
{{- if .Values.application.ray.scrapeWithPrometheus }}
metadata:
Expand Down
3 changes: 2 additions & 1 deletion client/quantum_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
QuantumServerlessException
get_auto_discovered_provider
"""

# pylint: disable=W0404
from importlib_metadata import version as metadata_version, PackageNotFoundError

from .core import (
Expand All @@ -36,6 +36,7 @@
RayProvider,
LocalProvider,
save_result,
Configuration,
)
from .quantum_serverless import (
QuantumServerless,
Expand Down
2 changes: 1 addition & 1 deletion client/quantum_serverless/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@
LocalJobClient,
Job,
save_result,
Configuration,
)
from .pattern import (
QiskitPattern,
Program,
ProgramStorage,
ProgramRepository,
download_and_unpack_artifact,
Expand Down
89 changes: 69 additions & 20 deletions client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
RuntimeEnv
Job
"""
# pylint: disable=duplicate-code
import json
import logging
import os
Expand All @@ -36,6 +37,7 @@
from pathlib import Path
from typing import Dict, Any, Optional, List, Union
from uuid import uuid4
from dataclasses import asdict, dataclass

import subprocess
from subprocess import Popen
Expand All @@ -58,6 +60,7 @@
MAX_ARTIFACT_FILE_SIZE_MB,
ENV_JOB_ARGUMENTS,
)

from quantum_serverless.core.pattern import QiskitPattern
from quantum_serverless.exception import QuantumServerlessException
from quantum_serverless.serializers.program_serializers import (
Expand All @@ -69,11 +72,31 @@
RuntimeEnv = ray.runtime_env.RuntimeEnv


@dataclass
class Configuration: # pylint: disable=too-many-instance-attributes
"""Program Configuration.

Args:
workers: number of worker pod when auto scaling is NOT enabled
auto_scaling: set True to enable auto scating of the workers
min_workers: minimum number of workers when auto scaling is enabled
max_workers: maxmum number of workers when auto scaling is enabled
"""

workers: Optional[int] = None
min_workers: Optional[int] = None
max_workers: Optional[int] = None
auto_scaling: Optional[bool] = False


class BaseJobClient:
"""Base class for Job clients."""

def run(
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> "Job":
"""Runs program."""
raise NotImplementedError
Expand All @@ -86,6 +109,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
"""Executes existing program."""
raise NotImplementedError
Expand Down Expand Up @@ -151,7 +175,12 @@ def list(self, **kwargs) -> List["Job"]:
Job(job.job_id, job_client=self) for job in self._job_client.list_jobs()
]

def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None):
def run(
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
arguments = arguments or {}
entrypoint = f"python {program.entrypoint}"

Expand Down Expand Up @@ -180,6 +209,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
raise NotImplementedError("Run existing is not available for RayJobClient.")

Expand Down Expand Up @@ -214,7 +244,12 @@ def get(self, job_id) -> Optional["Job"]:
def list(self, **kwargs) -> List["Job"]:
return [job["job"] for job in list(self._jobs.values())]

def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None):
def run(
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if program.dependencies:
for dependency in program.dependencies:
subprocess.check_call(
Expand Down Expand Up @@ -266,10 +301,11 @@ def upload(self, program: QiskitPattern):
}
return program.title

def run_existing(
def run_existing( # pylint: disable=too-many-locals
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if isinstance(program, QiskitPattern):
title = program.title
Expand Down Expand Up @@ -333,7 +369,10 @@ def __init__(self, host: str, token: str, version: str):
self._token = token

def run( # pylint: disable=too-many-locals
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> "Job":
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
Expand Down Expand Up @@ -367,21 +406,26 @@ def run( # pylint: disable=too-many-locals
)

with open(artifact_file_path, "rb") as file:
data = {
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder),
"dependencies": json.dumps(program.dependencies or []),
}
if config:
data["config"] = json.dumps(asdict(config))
else:
data["config"] = "{}"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
"dependencies": json.dumps(program.dependencies or []),
},
data=data,
files={"artifact": file},
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
),
verbose=True,
)
job_id = response_data.get("id")
span.set_attribute("job.id", job_id)
Expand Down Expand Up @@ -449,6 +493,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if isinstance(program, QiskitPattern):
title = program.title
Expand All @@ -462,15 +507,19 @@ def run_existing(

url = f"{self.host}/api/{self.version}/programs/run_existing/"

data = {
"title": title,
"arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder),
}
if config:
data["config"] = json.dumps(asdict(config))
else:
data["config"] = "{}"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": title,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
},
data=data,
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
Expand Down
14 changes: 10 additions & 4 deletions client/quantum_serverless/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ComputeResource
ServerlessProvider
"""
# pylint: disable=duplicate-code
import logging
import warnings
import os.path
Expand Down Expand Up @@ -53,6 +54,7 @@
GatewayJobClient,
LocalJobClient,
BaseJobClient,
Configuration,
)
from quantum_serverless.core.pattern import QiskitPattern
from quantum_serverless.core.tracing import _trace_env_vars
Expand Down Expand Up @@ -265,6 +267,7 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
"""Execute a program as a async job.

Expand Down Expand Up @@ -295,7 +298,7 @@ def run(
)
return None

return job_client.run(program, arguments)
return job_client.run(program, arguments, config)

def upload(self, program: QiskitPattern):
"""Uploads program."""
Expand Down Expand Up @@ -415,13 +418,14 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("Provider.run"):
if isinstance(program, QiskitPattern) and program.entrypoint is not None:
job = self._job_client.run(program, arguments)
job = self._job_client.run(program, arguments, config)
else:
job = self._job_client.run_existing(program, arguments)
job = self._job_client.run_existing(program, arguments, config)
return job

def upload(self, program: QiskitPattern):
Expand Down Expand Up @@ -584,11 +588,12 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
if isinstance(program, str):
raise NotImplementedError("Ray provider only supports full Programs.")

return self.client.run(program, arguments)
return self.client.run(program, arguments, config)

def get_job_by_id(self, job_id: str) -> Optional[Job]:
return self.client.get(job_id)
Expand All @@ -615,6 +620,7 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
if isinstance(program, QiskitPattern) and program.entrypoint is not None:
job = self.client.run(program, arguments)
Expand Down
10 changes: 7 additions & 3 deletions client/quantum_serverless/quantum_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

from quantum_serverless.core.job import Job
from quantum_serverless.core.job import Job, Configuration
from quantum_serverless.core.pattern import QiskitPattern
from quantum_serverless.core.provider import BaseProvider, ComputeResource
from quantum_serverless.exception import QuantumServerlessException
Expand Down Expand Up @@ -103,7 +103,10 @@ def job_client(self):
return self._selected_provider.job_client()

def run(
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Optional[Job]:
"""Execute a program as a async job

Expand All @@ -119,13 +122,14 @@ def run(
Args:
arguments: arguments to run program with
program: Program object
config: Configuration object

Returns:
Job
"""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("QuantumServerless.run"):
job = self._selected_provider.run(program, arguments)
job = self._selected_provider.run(program, arguments, config)
return job

def upload(self, program: QiskitPattern):
Expand Down
7 changes: 5 additions & 2 deletions gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ docker build -t qiskit/quantum-serverless-gateway:<VERSION> .
| RAY_CLUSTER_TEMPLATE_CPU | default compute kuberay template cpu setting |
| RAY_CLUSTER_TEMPLATE_MEM | default compute kuberay template memory setting |
| RAY_CLUSTER_WORKER_REPLICAS | worker replicas per cluster |
| RAY_CLUSTER_WORKER_MIN_REPLICAS | min worker replicas per cluster |
| RAY_CLUSTER_WORKER_MAX_REPLICAS | max replicas per cluster |
| RAY_CLUSTER_WORKER_REPLICAS_MAX | maximum number of worker replicas per cluster |
| RAY_CLUSTER_WORKER_MIN_REPLICAS | min worker replicas per cluster for auto scaling |
| RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX | maximum number of min worker replicas per cluster for auto scaling |
| RAY_CLUSTER_WORKER_MAX_REPLICAS | max replicas per cluster for auto scaling |
| RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX | maximum number of max worker replicas per cluster for auto scaling |
| RAY_CLUSTER_MAX_READINESS_TIME | max time in seconds to wait for cluster readiness. Will fail job if cluster is not ready in time. |
| QISKIT_IBM_CHANNEL | Channel that will be set in env variables in jobs for QiskitRuntimeService client |
| QISKIT_IBM_URL | Authentication url for QiskitRuntimeService that will be set for each job |
Loading