Skip to content

Commit

Permalink
add workers config
Browse files Browse the repository at this point in the history
Signed-off-by: Akihiko Kuroda <akihikokuroda2020@gmail.com>
  • Loading branch information
akihikokuroda committed Oct 29, 2023
1 parent e60d0ee commit 9d48845
Show file tree
Hide file tree
Showing 19 changed files with 419 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ spec:
value: {{ .Values.application.auth.keycloak.realm | quote }}
- name: SETTINGS_KEYCLOAK_CLIENT_SECRET
value: {{ .Values.application.auth.keycloak.clientSecret | quote }}
- name: RAY_CLUSTER_WORKER_REPLICAS
value: {{ .Values.application.ray.replicas | quote }}
- name: RAY_CLUSTER_WORKER_MIN_REPLICAS
value: {{ .Values.application.ray.minReplicas | quote }}
- name: RAY_CLUSTER_WORKER_MAX_REPLICAS
value: {{ .Values.application.ray.maxReplicas | quote }}
{{- if .Values.application.superuser.enable }}
- name: DJANGO_SUPERUSER_USERNAME
valueFrom:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ data:
{{- if .Values.application.ray.scrapeWithPrometheus }}
headServiceAnnotations:
prometheus.io/scrape: "true"
{{- end }}
{{- end }}
enableInTreeAutoscaling: {{`{{ auto_scaling }}`}}
headGroupSpec:
rayStartParams:
dashboard-host: 0.0.0.0
Expand Down Expand Up @@ -193,11 +194,11 @@ data:
claimName: {{ .Values.cos.claimName }}
workerGroupSpecs:
- groupName: g
maxReplicas: {{ .Values.application.ray.maxReplicas }}
minReplicas: {{ .Values.application.ray.minReplicas }}
maxReplicas: {{`{{ max_workers }}`}}
minReplicas: {{`{{ min_workers }}`}}
rayStartParams:
block: 'true'
replicas: {{ .Values.application.ray.replicas }}
replicas: {{`{{ workers }}`}}
template:
{{- if .Values.application.ray.scrapeWithPrometheus }}
metadata:
Expand Down
5 changes: 3 additions & 2 deletions client/quantum_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
QuantumServerlessException
get_auto_discovered_provider
"""

# pylint: disable=W0404
from importlib_metadata import version as metadata_version, PackageNotFoundError

from .core import (
Expand All @@ -35,13 +35,14 @@
IBMServerlessProvider,
RayProvider,
save_result,
Configuration,
)
from .quantum_serverless import (
QuantumServerless,
get_auto_discovered_provider,
QuantumServerlessException,
)
from .core.pattern import QiskitPattern
from .core.pattern import QiskitPattern, Configuration
from .serializers import get_arguments

try:
Expand Down
3 changes: 1 addition & 2 deletions client/quantum_serverless/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
RayJobClient
save_result
QiskitPattern
Program
ProgramStorage
ProgramRepository
download_and_unpack_artifact
Expand All @@ -65,10 +64,10 @@
from .job import BaseJobClient, RayJobClient, GatewayJobClient, Job, save_result
from .pattern import (
QiskitPattern,
Program,
ProgramStorage,
ProgramRepository,
download_and_unpack_artifact,
Configuration,
)
from .decorators import (
remote,
Expand Down
62 changes: 43 additions & 19 deletions client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
RuntimeEnv
Job
"""
# pylint: disable=duplicate-code
import json
import logging
import os
Expand All @@ -35,6 +36,7 @@
from pathlib import Path
from typing import Dict, Any, Optional, List, Union
from uuid import uuid4
from dataclasses import asdict

import ray.runtime_env
import requests
Expand All @@ -53,7 +55,8 @@
MAX_ARTIFACT_FILE_SIZE_MB,
ENV_JOB_ARGUMENTS,
)
from quantum_serverless.core.pattern import QiskitPattern

from quantum_serverless.core.pattern import QiskitPattern, Configuration
from quantum_serverless.exception import QuantumServerlessException
from quantum_serverless.serializers.program_serializers import (
QiskitObjectsEncoder,
Expand All @@ -68,7 +71,10 @@ class BaseJobClient:
"""Base class for Job clients."""

def run(
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> "Job":
"""Runs program."""
raise NotImplementedError
Expand Down Expand Up @@ -146,7 +152,12 @@ def list(self, **kwargs) -> List["Job"]:
Job(job.job_id, job_client=self) for job in self._job_client.list_jobs()
]

def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None):
def run(
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
arguments = arguments or {}
entrypoint = f"python {program.entrypoint}"

Expand Down Expand Up @@ -195,7 +206,10 @@ def __init__(self, host: str, token: str, version: str):
self._token = token

def run( # pylint: disable=too-many-locals
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> "Job":
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
Expand Down Expand Up @@ -229,21 +243,26 @@ def run( # pylint: disable=too-many-locals
)

with open(artifact_file_path, "rb") as file:
data = {
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder),
"dependencies": json.dumps(program.dependencies or []),
}
if config:
data["config"] = json.dumps(asdict(config))
else:
data["config"] = "{}"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
"dependencies": json.dumps(program.dependencies or []),
},
data=data,
files={"artifact": file},
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
),
verbose=True,
)
job_id = response_data.get("id")
span.set_attribute("job.id", job_id)
Expand Down Expand Up @@ -311,6 +330,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if isinstance(program, QiskitPattern):
title = program.title
Expand All @@ -324,15 +344,19 @@ def run_existing(

url = f"{self.host}/api/{self.version}/programs/run_existing/"

data = {
"title": title,
"arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder),
}
if config:
data["config"] = json.dumps(asdict(config))
else:
data["config"] = "{}"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": title,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
},
data=data,
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
Expand Down
19 changes: 19 additions & 0 deletions client/quantum_serverless/core/pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ def __repr__(self):
return self.__str__()


@dataclass
class Configuration: # pylint: disable=too-many-instance-attributes
"""Program Configuration.
Args:
workers: number of worker pod when auto scaling is NOT enabled
auto_scaling: set True to enable auto scating of the workers
min_workers: minimum number of workers when auto scaling is enabled
max_workers: maxmum number of workers when auto scaling is enabled
"""

workers: Optional[int] = None
min_workers: Optional[int] = None
max_workers: Optional[int] = None
auto_scaling: Optional[bool] = False
worker_cpu: Optional[int] = None
worker_mem: Optional[int] = None


class ProgramStorage(ABC):
"""Base program backend to save and load programs from."""

Expand Down
14 changes: 9 additions & 5 deletions client/quantum_serverless/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ComputeResource
ServerlessProvider
"""
# pylint: disable=duplicate-code
import logging
import warnings
import os.path
Expand Down Expand Up @@ -53,7 +54,7 @@
GatewayJobClient,
BaseJobClient,
)
from quantum_serverless.core.pattern import QiskitPattern
from quantum_serverless.core.pattern import QiskitPattern, Configuration
from quantum_serverless.core.tracing import _trace_env_vars
from quantum_serverless.exception import QuantumServerlessException
from quantum_serverless.utils import JsonSerializable
Expand Down Expand Up @@ -264,6 +265,7 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
"""Execute a program as a async job.
Expand Down Expand Up @@ -294,7 +296,7 @@ def run(
)
return None

return job_client.run(program, arguments)
return job_client.run(program, arguments, config)

def upload(self, program: QiskitPattern):
"""Uploads program."""
Expand Down Expand Up @@ -414,13 +416,14 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("Provider.run"):
if isinstance(program, QiskitPattern) and program.entrypoint is not None:
job = self._job_client.run(program, arguments)
job = self._job_client.run(program, arguments, config)
else:
job = self._job_client.run_existing(program, arguments)
job = self._job_client.run_existing(program, arguments, config)
return job

def upload(self, program: QiskitPattern):
Expand Down Expand Up @@ -583,11 +586,12 @@ def run(
self,
program: Union[QiskitPattern, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
if isinstance(program, str):
raise NotImplementedError("Ray provider only supports full Programs.")

return self.client.run(program, arguments)
return self.client.run(program, arguments, config)

def get_job_by_id(self, job_id: str) -> Optional[Job]:
return self.client.get(job_id)
Expand Down
10 changes: 7 additions & 3 deletions client/quantum_serverless/quantum_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from opentelemetry.instrumentation.requests import RequestsInstrumentor

from quantum_serverless.core.job import Job
from quantum_serverless.core.pattern import QiskitPattern
from quantum_serverless.core.pattern import QiskitPattern, Configuration
from quantum_serverless.core.provider import BaseProvider, ComputeResource
from quantum_serverless.exception import QuantumServerlessException

Expand Down Expand Up @@ -103,7 +103,10 @@ def job_client(self):
return self._selected_provider.job_client()

def run(
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Optional[Job]:
"""Execute a program as a async job
Expand All @@ -119,13 +122,14 @@ def run(
Args:
arguments: arguments to run program with
program: Program object
config: Configuration object
Returns:
Job
"""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("QuantumServerless.run"):
job = self._selected_provider.run(program, arguments)
job = self._selected_provider.run(program, arguments, config)
return job

def upload(self, program: QiskitPattern):
Expand Down
7 changes: 5 additions & 2 deletions gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ docker build -t qiskit/quantum-serverless-gateway:<VERSION> .
| RAY_CLUSTER_TEMPLATE_CPU | default compute kuberay template cpu setting |
| RAY_CLUSTER_TEMPLATE_MEM | default compute kuberay template memory setting |
| RAY_CLUSTER_WORKER_REPLICAS | worker replicas per cluster |
| RAY_CLUSTER_WORKER_MIN_REPLICAS | min worker replicas per cluster |
| RAY_CLUSTER_WORKER_MAX_REPLICAS | max replicas per cluster |
| RAY_CLUSTER_WORKER_REPLICAS_MAX | maximum number of worker replicas per cluster |
| RAY_CLUSTER_WORKER_MIN_REPLICAS | min worker replicas per cluster for auto scaling |
| RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX | maximum number of min worker replicas per cluster for auto scaling |
| RAY_CLUSTER_WORKER_MAX_REPLICAS | max replicas per cluster for auto scaling |
| RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX | maximum number of max worker replicas per cluster for auto scaling |
| RAY_CLUSTER_MAX_READINESS_TIME | max time in seconds to wait for cluster readiness. Will fail job if cluster is not ready in time. |
| QISKIT_IBM_CHANNEL | Channel that will be set in env variables in jobs for QiskitRuntimeService client |
| QISKIT_IBM_URL | Authentication url for QiskitRuntimeService that will be set for each job |
Loading

0 comments on commit 9d48845

Please sign in to comment.