From 5c8fecb4c017787974638c699354e8fc7760af10 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Fri, 20 Oct 2023 20:08:36 -0400 Subject: [PATCH 1/5] add workers config Signed-off-by: Akihiko Kuroda --- .../charts/gateway/templates/deployment.yaml | 6 ++ .../gateway/templates/rayclustertemplate.yaml | 9 +-- client/quantum_serverless/__init__.py | 5 +- client/quantum_serverless/core/__init__.py | 2 +- client/quantum_serverless/core/job.py | 62 ++++++++++++----- client/quantum_serverless/core/pattern.py | 19 +++++ client/quantum_serverless/core/provider.py | 14 ++-- .../quantum_serverless/quantum_serverless.py | 10 ++- gateway/README.md | 7 +- .../0011_programconfig_program_config.py | 47 +++++++++++++ ...lter_programconfig_max_workers_and_more.py | 38 ++++++++++ ...lter_programconfig_max_workers_and_more.py | 69 +++++++++++++++++++ gateway/api/models.py | 52 +++++++++++++- gateway/api/ray.py | 16 ++++- gateway/api/schedule.py | 50 +++++++++++++- gateway/api/serializers.py | 11 ++- gateway/api/v1/serializers.py | 24 ++++++- gateway/api/views.py | 14 +++- gateway/main/settings.py | 12 ++++ 19 files changed, 419 insertions(+), 48 deletions(-) create mode 100644 gateway/api/migrations/0011_programconfig_program_config.py create mode 100644 gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py create mode 100644 gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py diff --git a/charts/quantum-serverless/charts/gateway/templates/deployment.yaml b/charts/quantum-serverless/charts/gateway/templates/deployment.yaml index bcd1ea858..3d1b97bc8 100644 --- a/charts/quantum-serverless/charts/gateway/templates/deployment.yaml +++ b/charts/quantum-serverless/charts/gateway/templates/deployment.yaml @@ -111,6 +111,12 @@ spec: value: {{ .Values.application.auth.keycloak.realm | quote }} - name: SETTINGS_KEYCLOAK_CLIENT_SECRET value: {{ .Values.application.auth.keycloak.clientSecret | quote }} + - name: RAY_CLUSTER_WORKER_REPLICAS + value: {{ .Values.application.ray.replicas | quote }} + - name: RAY_CLUSTER_WORKER_MIN_REPLICAS + value: {{ .Values.application.ray.minReplicas | quote }} + - name: RAY_CLUSTER_WORKER_MAX_REPLICAS + value: {{ .Values.application.ray.maxReplicas | quote }} {{- if .Values.application.superuser.enable }} - name: DJANGO_SUPERUSER_USERNAME valueFrom: diff --git a/charts/quantum-serverless/charts/gateway/templates/rayclustertemplate.yaml b/charts/quantum-serverless/charts/gateway/templates/rayclustertemplate.yaml index 27f21d9cd..bc7bc10c7 100644 --- a/charts/quantum-serverless/charts/gateway/templates/rayclustertemplate.yaml +++ b/charts/quantum-serverless/charts/gateway/templates/rayclustertemplate.yaml @@ -17,7 +17,8 @@ data: {{- if .Values.application.ray.scrapeWithPrometheus }} headServiceAnnotations: prometheus.io/scrape: "true" -{{- end }} +{{- end }} + enableInTreeAutoscaling: {{`{{ auto_scaling }}`}} headGroupSpec: rayStartParams: dashboard-host: 0.0.0.0 @@ -193,11 +194,11 @@ data: claimName: {{ .Values.cos.claimName }} workerGroupSpecs: - groupName: g - maxReplicas: {{ .Values.application.ray.maxReplicas }} - minReplicas: {{ .Values.application.ray.minReplicas }} + maxReplicas: {{`{{ max_workers }}`}} + minReplicas: {{`{{ min_workers }}`}} rayStartParams: block: 'true' - replicas: {{ .Values.application.ray.replicas }} + replicas: {{`{{ workers }}`}} template: {{- if .Values.application.ray.scrapeWithPrometheus }} metadata: diff --git a/client/quantum_serverless/__init__.py b/client/quantum_serverless/__init__.py index d8cb2bc88..057755bd1 100644 --- a/client/quantum_serverless/__init__.py +++ b/client/quantum_serverless/__init__.py @@ -20,7 +20,7 @@ QuantumServerlessException get_auto_discovered_provider """ - +# pylint: disable=W0404 from importlib_metadata import version as metadata_version, PackageNotFoundError from .core import ( @@ -36,13 +36,14 @@ RayProvider, LocalProvider, save_result, + Configuration, ) from .quantum_serverless import ( QuantumServerless, get_auto_discovered_provider, QuantumServerlessException, ) -from .core.pattern import QiskitPattern +from .core.pattern import QiskitPattern, Configuration from .serializers import get_arguments try: diff --git a/client/quantum_serverless/core/__init__.py b/client/quantum_serverless/core/__init__.py index 015c5b3c5..aa896df5e 100644 --- a/client/quantum_serverless/core/__init__.py +++ b/client/quantum_serverless/core/__init__.py @@ -74,10 +74,10 @@ ) from .pattern import ( QiskitPattern, - Program, ProgramStorage, ProgramRepository, download_and_unpack_artifact, + Configuration, ) from .decorators import ( remote, diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index 16d4c93cb..4a4acb8a1 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -27,6 +27,7 @@ RuntimeEnv Job """ +# pylint: disable=duplicate-code import json import logging import os @@ -36,6 +37,7 @@ from pathlib import Path from typing import Dict, Any, Optional, List, Union from uuid import uuid4 +from dataclasses import asdict import subprocess from subprocess import Popen @@ -58,7 +60,8 @@ MAX_ARTIFACT_FILE_SIZE_MB, ENV_JOB_ARGUMENTS, ) -from quantum_serverless.core.pattern import QiskitPattern + +from quantum_serverless.core.pattern import QiskitPattern, Configuration from quantum_serverless.exception import QuantumServerlessException from quantum_serverless.serializers.program_serializers import ( QiskitObjectsEncoder, @@ -73,7 +76,10 @@ class BaseJobClient: """Base class for Job clients.""" def run( - self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None + self, + program: QiskitPattern, + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> "Job": """Runs program.""" raise NotImplementedError @@ -151,7 +157,12 @@ def list(self, **kwargs) -> List["Job"]: Job(job.job_id, job_client=self) for job in self._job_client.list_jobs() ] - def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None): + def run( + self, + program: QiskitPattern, + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ): arguments = arguments or {} entrypoint = f"python {program.entrypoint}" @@ -333,7 +344,10 @@ def __init__(self, host: str, token: str, version: str): self._token = token def run( # pylint: disable=too-many-locals - self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None + self, + program: QiskitPattern, + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> "Job": tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.run") as span: @@ -367,21 +381,26 @@ def run( # pylint: disable=too-many-locals ) with open(artifact_file_path, "rb") as file: + data = { + "title": program.title, + "entrypoint": program.entrypoint, + "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), + "dependencies": json.dumps(program.dependencies or []), + } + if config: + data["config"] = json.dumps(asdict(config)) + else: + data["config"] = "{}" + response_data = safe_json_request( request=lambda: requests.post( url=url, - data={ - "title": program.title, - "entrypoint": program.entrypoint, - "arguments": json.dumps( - arguments or {}, cls=QiskitObjectsEncoder - ), - "dependencies": json.dumps(program.dependencies or []), - }, + data=data, files={"artifact": file}, headers={"Authorization": f"Bearer {self._token}"}, timeout=REQUESTS_TIMEOUT, - ) + ), + verbose=True, ) job_id = response_data.get("id") span.set_attribute("job.id", job_id) @@ -449,6 +468,7 @@ def run_existing( self, program: Union[str, QiskitPattern], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ): if isinstance(program, QiskitPattern): title = program.title @@ -462,15 +482,19 @@ def run_existing( url = f"{self.host}/api/{self.version}/programs/run_existing/" + data = { + "title": title, + "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), + } + if config: + data["config"] = json.dumps(asdict(config)) + else: + data["config"] = "{}" + response_data = safe_json_request( request=lambda: requests.post( url=url, - data={ - "title": title, - "arguments": json.dumps( - arguments or {}, cls=QiskitObjectsEncoder - ), - }, + data=data, headers={"Authorization": f"Bearer {self._token}"}, timeout=REQUESTS_TIMEOUT, ) diff --git a/client/quantum_serverless/core/pattern.py b/client/quantum_serverless/core/pattern.py index ae0fe7956..468a817c3 100644 --- a/client/quantum_serverless/core/pattern.py +++ b/client/quantum_serverless/core/pattern.py @@ -84,6 +84,25 @@ def __repr__(self): return self.__str__() +@dataclass +class Configuration: # pylint: disable=too-many-instance-attributes + """Program Configuration. + + Args: + workers: number of worker pod when auto scaling is NOT enabled + auto_scaling: set True to enable auto scating of the workers + min_workers: minimum number of workers when auto scaling is enabled + max_workers: maxmum number of workers when auto scaling is enabled + """ + + workers: Optional[int] = None + min_workers: Optional[int] = None + max_workers: Optional[int] = None + auto_scaling: Optional[bool] = False + worker_cpu: Optional[int] = None + worker_mem: Optional[int] = None + + class ProgramStorage(ABC): """Base program backend to save and load programs from.""" diff --git a/client/quantum_serverless/core/provider.py b/client/quantum_serverless/core/provider.py index 146877f0a..cd21ae805 100644 --- a/client/quantum_serverless/core/provider.py +++ b/client/quantum_serverless/core/provider.py @@ -26,6 +26,7 @@ ComputeResource ServerlessProvider """ +# pylint: disable=duplicate-code import logging import warnings import os.path @@ -54,7 +55,7 @@ LocalJobClient, BaseJobClient, ) -from quantum_serverless.core.pattern import QiskitPattern +from quantum_serverless.core.pattern import QiskitPattern, Configuration from quantum_serverless.core.tracing import _trace_env_vars from quantum_serverless.exception import QuantumServerlessException from quantum_serverless.utils import JsonSerializable @@ -265,6 +266,7 @@ def run( self, program: Union[QiskitPattern, str], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> Job: """Execute a program as a async job. @@ -295,7 +297,7 @@ def run( ) return None - return job_client.run(program, arguments) + return job_client.run(program, arguments, config) def upload(self, program: QiskitPattern): """Uploads program.""" @@ -415,13 +417,14 @@ def run( self, program: Union[QiskitPattern, str], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> Job: tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("Provider.run"): if isinstance(program, QiskitPattern) and program.entrypoint is not None: - job = self._job_client.run(program, arguments) + job = self._job_client.run(program, arguments, config) else: - job = self._job_client.run_existing(program, arguments) + job = self._job_client.run_existing(program, arguments, config) return job def upload(self, program: QiskitPattern): @@ -584,11 +587,12 @@ def run( self, program: Union[QiskitPattern, str], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> Job: if isinstance(program, str): raise NotImplementedError("Ray provider only supports full Programs.") - return self.client.run(program, arguments) + return self.client.run(program, arguments, config) def get_job_by_id(self, job_id: str) -> Optional[Job]: return self.client.get(job_id) diff --git a/client/quantum_serverless/quantum_serverless.py b/client/quantum_serverless/quantum_serverless.py index 6dfb5e0d2..65d6905bf 100644 --- a/client/quantum_serverless/quantum_serverless.py +++ b/client/quantum_serverless/quantum_serverless.py @@ -42,7 +42,7 @@ from opentelemetry.instrumentation.requests import RequestsInstrumentor from quantum_serverless.core.job import Job -from quantum_serverless.core.pattern import QiskitPattern +from quantum_serverless.core.pattern import QiskitPattern, Configuration from quantum_serverless.core.provider import BaseProvider, ComputeResource from quantum_serverless.exception import QuantumServerlessException @@ -103,7 +103,10 @@ def job_client(self): return self._selected_provider.job_client() def run( - self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None + self, + program: QiskitPattern, + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> Optional[Job]: """Execute a program as a async job @@ -119,13 +122,14 @@ def run( Args: arguments: arguments to run program with program: Program object + config: Configuration object Returns: Job """ tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("QuantumServerless.run"): - job = self._selected_provider.run(program, arguments) + job = self._selected_provider.run(program, arguments, config) return job def upload(self, program: QiskitPattern): diff --git a/gateway/README.md b/gateway/README.md index 9f5794f63..a3a61f76b 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -38,8 +38,11 @@ docker build -t qiskit/quantum-serverless-gateway: . | RAY_CLUSTER_TEMPLATE_CPU | default compute kuberay template cpu setting | | RAY_CLUSTER_TEMPLATE_MEM | default compute kuberay template memory setting | | RAY_CLUSTER_WORKER_REPLICAS | worker replicas per cluster | -| RAY_CLUSTER_WORKER_MIN_REPLICAS | min worker replicas per cluster | -| RAY_CLUSTER_WORKER_MAX_REPLICAS | max replicas per cluster | +| RAY_CLUSTER_WORKER_REPLICAS_MAX | maximum number of worker replicas per cluster | +| RAY_CLUSTER_WORKER_MIN_REPLICAS | min worker replicas per cluster for auto scaling | +| RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX | maximum number of min worker replicas per cluster for auto scaling | +| RAY_CLUSTER_WORKER_MAX_REPLICAS | max replicas per cluster for auto scaling | +| RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX | maximum number of max worker replicas per cluster for auto scaling | | RAY_CLUSTER_MAX_READINESS_TIME | max time in seconds to wait for cluster readiness. Will fail job if cluster is not ready in time. | | QISKIT_IBM_CHANNEL | Channel that will be set in env variables in jobs for QiskitRuntimeService client | | QISKIT_IBM_URL | Authentication url for QiskitRuntimeService that will be set for each job | diff --git a/gateway/api/migrations/0011_programconfig_program_config.py b/gateway/api/migrations/0011_programconfig_program_config.py new file mode 100644 index 000000000..68e0f5452 --- /dev/null +++ b/gateway/api/migrations/0011_programconfig_program_config.py @@ -0,0 +1,47 @@ +# Generated by Django 4.2.2 on 2023-10-19 16:02 + +from django.db import migrations, models +import django.db.models.deletion +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0010_job_version"), + ] + + operations = [ + migrations.CreateModel( + name="ProgramConfig", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ("created", models.DateTimeField(auto_now_add=True)), + ("auto_scaling", models.BooleanField(default=False, null=True)), + ("workers", models.IntegerField()), + ("min_workers", models.IntegerField()), + ("max_workers", models.IntegerField()), + ("worker_cpu", models.IntegerField()), + ("worker_mem", models.IntegerField()), + ], + ), + migrations.AddField( + model_name="program", + name="config", + field=models.ForeignKey( + blank=True, + default=None, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="api.programconfig", + ), + ), + ] diff --git a/gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py b/gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py new file mode 100644 index 000000000..baa7a6f0b --- /dev/null +++ b/gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py @@ -0,0 +1,38 @@ +# Generated by Django 4.2.2 on 2023-10-20 18:30 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0011_programconfig_program_config"), + ] + + operations = [ + migrations.AlterField( + model_name="programconfig", + name="max_workers", + field=models.IntegerField(null=True), + ), + migrations.AlterField( + model_name="programconfig", + name="min_workers", + field=models.IntegerField(null=True), + ), + migrations.AlterField( + model_name="programconfig", + name="worker_cpu", + field=models.IntegerField(null=True), + ), + migrations.AlterField( + model_name="programconfig", + name="worker_mem", + field=models.IntegerField(null=True), + ), + migrations.AlterField( + model_name="programconfig", + name="workers", + field=models.IntegerField(null=True), + ), + ] diff --git a/gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py b/gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py new file mode 100644 index 000000000..b1f75a584 --- /dev/null +++ b/gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py @@ -0,0 +1,69 @@ +# Generated by Django 4.2.2 on 2023-10-22 19:17 + +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0012_alter_programconfig_max_workers_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="programconfig", + name="max_workers", + field=models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(5), + ], + ), + ), + migrations.AlterField( + model_name="programconfig", + name="min_workers", + field=models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(5), + ], + ), + ), + migrations.AlterField( + model_name="programconfig", + name="worker_cpu", + field=models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(5), + ], + ), + ), + migrations.AlterField( + model_name="programconfig", + name="worker_mem", + field=models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(5), + ], + ), + ), + migrations.AlterField( + model_name="programconfig", + name="workers", + field=models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(5), + ], + ), + ), + ] diff --git a/gateway/api/models.py b/gateway/api/models.py index b78aa7a52..d71849cf2 100644 --- a/gateway/api/models.py +++ b/gateway/api/models.py @@ -2,7 +2,11 @@ import uuid from concurrency.fields import IntegerVersionField -from django.core.validators import FileExtensionValidator +from django.core.validators import ( + FileExtensionValidator, + MinValueValidator, + MaxValueValidator, +) from django.db import models from django.conf import settings from django_prometheus.models import ExportModelOperationsMixin @@ -13,6 +17,45 @@ def get_upload_path(instance, filename): return f"{instance.author.username}/{instance.id}/{filename}" +class ProgramConfig(models.Model): + """Program Configuration model.""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + created = models.DateTimeField(auto_now_add=True) + + auto_scaling = models.BooleanField(default=False, null=True) + workers = models.IntegerField( + null=True, + validators=[ + MinValueValidator(0), + MaxValueValidator(settings.RAY_CLUSTER_WORKER_REPLICAS_MAX), + ], + ) + min_workers = models.IntegerField( + null=True, + validators=[ + MinValueValidator(0), + MaxValueValidator(settings.RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX), + ], + ) + max_workers = models.IntegerField( + null=True, + validators=[ + MinValueValidator(0), + MaxValueValidator(settings.RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX), + ], + ) + worker_cpu = models.IntegerField( + null=True, validators=[MinValueValidator(0), MaxValueValidator(5)] + ) + worker_mem = models.IntegerField( + null=True, validators=[MinValueValidator(0), MaxValueValidator(5)] + ) + + def __str__(self): + return self.id + + class Program(ExportModelOperationsMixin("program"), models.Model): """Program model.""" @@ -35,6 +78,13 @@ class Program(ExportModelOperationsMixin("program"), models.Model): arguments = models.TextField(null=False, blank=True, default="{}") env_vars = models.TextField(null=False, blank=True, default="{}") dependencies = models.TextField(null=False, blank=True, default="[]") + config = models.ForeignKey( + to=ProgramConfig, + on_delete=models.CASCADE, + default=None, + null=True, + blank=True, + ) def __str__(self): return f"{self.title}" diff --git a/gateway/api/ray.py b/gateway/api/ray.py index c0e6be5a3..f9ac5c0e5 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -20,7 +20,7 @@ from opentelemetry import trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from api.models import ComputeResource, Job +from api.models import ComputeResource, Job, ProgramConfig from api.utils import try_json_loads, retry_function, decrypt_env_vars from main import settings @@ -157,7 +157,10 @@ def submit_job(job: Job) -> Job: def create_ray_cluster( - user: Any, cluster_name: Optional[str] = None, cluster_data: Optional[str] = None + user: Any, + cluster_name: Optional[str] = None, + cluster_data: Optional[str] = None, + program_config: Optional[ProgramConfig] = None, ) -> Optional[ComputeResource]: """Creates ray cluster. @@ -176,7 +179,14 @@ def create_ray_cluster( if not cluster_data: cluster = get_template("rayclustertemplate.yaml") manifest = cluster.render( - {"cluster_name": cluster_name, "user_id": user.username} + { + "cluster_name": cluster_name, + "user_id": user.username, + "workers": program_config.workers, + "min_workers": program_config.min_workers, + "max_workers": program_config.max_workers, + "auto_scaling": program_config.auto_scaling, + } ) cluster_data = yaml.safe_load(manifest) diff --git a/gateway/api/schedule.py b/gateway/api/schedule.py index 98b83fa7d..7285ab799 100644 --- a/gateway/api/schedule.py +++ b/gateway/api/schedule.py @@ -1,5 +1,6 @@ """Scheduling related functions.""" import logging +import json import random import uuid from typing import List @@ -13,23 +14,63 @@ from opentelemetry import trace -from api.models import Job, Program, ComputeResource +from api.models import Job, Program, ComputeResource, ProgramConfig from api.ray import submit_job, create_ray_cluster, kill_ray_cluster from main import settings as config + User: Model = get_user_model() logger = logging.getLogger("commands") +def save_programconfig(request) -> ProgramConfig: + """Save programconfig. + + Args: + request: request data. + + Returns: + saved programconfig + """ + programconfig = ProgramConfig( + workers=settings.RAY_CLUSTER_WORKER_REPLICAS, + min_workers=settings.RAY_CLUSTER_WORKER_MIN_REPLICAS, + max_workers=settings.RAY_CLUSTER_WORKER_MAX_REPLICAS, + worker_cpu=2, + worker_mem=3, + auto_scaling=settings.RAY_CLUSTER_WORKER_AUTO_SCALING, + ) + if request.data.get("config"): + config_data = json.loads(request.data.get("config")) + if "workers" in config_data and config_data["workers"]: + programconfig.workers = config_data["workers"] + if "min_workers" in config_data and config_data["min_workers"]: + programconfig.min_workers = config_data["min_workers"] + if "max_workers" in config_data and config_data["max_workers"]: + programconfig.max_workers = config_data["max_workers"] + if "worker_cpu" in config_data and config_data["worker_cpu"]: + programconfig.worker_cpu = config_data["worker_cpu"] + if "worker_mem" in config_data and config_data["worker_mem"]: + programconfig.worker_mem = config_data["worker_mem"] + if "auto_scaling" in config_data and config_data["auto_scaling"]: + programconfig.auto_scaling = config_data["auto_scaling"] + programconfig.full_clean() + programconfig.save() + return programconfig + + def save_program(serializer, request) -> Program: """Save program. Args: - serializer: program serializer with data attached. + request: request data. Returns: saved program """ + + programconfig = save_programconfig(request) + existing_program = ( Program.objects.filter(title=serializer.data.get("title"), author=request.user) .order_by("-created") @@ -46,6 +87,7 @@ def save_program(serializer, request) -> Program: program = Program(**serializer.data) program.artifact = request.FILES.get("artifact") program.author = request.user + program.config = programconfig program.save() return program @@ -97,7 +139,9 @@ def execute_job(job: Job) -> Job: job.status = Job.FAILED job.logs = "Compute resource was not found." else: - compute_resource = create_ray_cluster(job.author, cluster_name=cluster_name) + compute_resource = create_ray_cluster( + job.author, cluster_name=cluster_name, program_config=job.program.config + ) if compute_resource: # if compute resource was created in time with no problems job.compute_resource = compute_resource diff --git a/gateway/api/serializers.py b/gateway/api/serializers.py index cc8b7d3a6..97be7b862 100644 --- a/gateway/api/serializers.py +++ b/gateway/api/serializers.py @@ -8,7 +8,16 @@ from rest_framework import serializers -from .models import Program, Job +from .models import Program, Job, ProgramConfig + + +class ProgramConfigSerializer(serializers.ModelSerializer): + """ + Serializer for the Program Config model. + """ + + class Meta: + model = ProgramConfig class ProgramSerializer(serializers.ModelSerializer): diff --git a/gateway/api/v1/serializers.py b/gateway/api/v1/serializers.py index 19ed41698..0159cb89b 100644 --- a/gateway/api/v1/serializers.py +++ b/gateway/api/v1/serializers.py @@ -5,13 +5,35 @@ from api import serializers +class ProgramConfigSerializer(serializers.ProgramSerializer): + """ + Program Config serializer first version. Include basic fields from the initial model. + """ + + class Meta(serializers.ProgramConfigSerializer.Meta): + fields = [ + "workers", + "min_workers", + "max_workers", + "worker_cpu", + "worker_mem", + "auto_scaling", + ] + + class ProgramSerializer(serializers.ProgramSerializer): """ Program serializer first version. Include basic fields from the initial model. """ class Meta(serializers.ProgramSerializer.Meta): - fields = ["title", "entrypoint", "artifact", "dependencies", "arguments"] + fields = [ + "title", + "entrypoint", + "artifact", + "dependencies", + "arguments", + ] class JobSerializer(serializers.JobSerializer): diff --git a/gateway/api/views.py b/gateway/api/views.py index 54f2f0c3d..ee9d626ca 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -20,7 +20,8 @@ from dj_rest_auth.registration.views import SocialLoginView from django.conf import settings from django.contrib.auth import get_user_model -from django.http import StreamingHttpResponse +from django.core.exceptions import ValidationError +from django.http import StreamingHttpResponse, HttpResponseBadRequest from rest_framework import viewsets, permissions, status from rest_framework.decorators import action from rest_framework.generics import get_object_or_404 @@ -36,7 +37,7 @@ from .models import Program, Job from .ray import get_job_handler -from .schedule import save_program +from .schedule import save_program, save_programconfig from .serializers import JobSerializer, ExistingProgramSerializer from .utils import build_env_variables, encrypt_env_vars @@ -115,6 +116,10 @@ def run_existing(self, request): status=status.HTTP_404_NOT_FOUND, ) + programconfig = save_programconfig(request) + program.config = programconfig + program.save() + job = Job( program=program, arguments=serializer.data.get("arguments"), @@ -150,7 +155,10 @@ def run(self, request): if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - program = save_program(serializer=serializer, request=request) + try: + program = save_program(serializer=serializer, request=request) + except ValidationError as exp: + return HttpResponseBadRequest(f"Bad Request: {exp}") job = Job( program=program, arguments=program.arguments, diff --git a/gateway/main/settings.py b/gateway/main/settings.py index fd6ce8c53..a13290634 100644 --- a/gateway/main/settings.py +++ b/gateway/main/settings.py @@ -301,12 +301,24 @@ ), } RAY_CLUSTER_WORKER_REPLICAS = int(os.environ.get("RAY_CLUSTER_WORKER_REPLICAS", "0")) +RAY_CLUSTER_WORKER_REPLICAS_MAX = int( + os.environ.get("RAY_CLUSTER_WORKER_REPLICAS_MAX", "5") +) RAY_CLUSTER_WORKER_MIN_REPLICAS = int( os.environ.get("RAY_CLUSTER_WORKER_MIN_REPLICAS", "0") ) +RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX = int( + os.environ.get("RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX", "2") +) RAY_CLUSTER_WORKER_MAX_REPLICAS = int( os.environ.get("RAY_CLUSTER_WORKER_MAX_REPLICAS", "4") ) +RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX = int( + os.environ.get("RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX", "10") +) +RAY_CLUSTER_WORKER_AUTO_SCALING = bool( + os.environ.get("RAY_CLUSTER_WORKER_AUTO_SCALING", False) +) RAY_CLUSTER_MAX_READINESS_TIME = int( os.environ.get("RAY_CLUSTER_MAX_READINESS_TIME", "120") ) From bcc8ce1d62eabdf038d730cb81483adf072a2785 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Fri, 3 Nov 2023 17:35:27 -0400 Subject: [PATCH 2/5] change ProgramConfig to JobConfig Signed-off-by: Akihiko Kuroda --- client/quantum_serverless/__init__.py | 2 +- client/quantum_serverless/core/__init__.py | 2 +- client/quantum_serverless/core/job.py | 21 +++++- client/quantum_serverless/core/pattern.py | 19 ----- client/quantum_serverless/core/provider.py | 3 +- .../quantum_serverless/quantum_serverless.py | 4 +- .../migrations/0011_jobconfig_job_config.py | 73 +++++++++++++++++++ .../0011_programconfig_program_config.py | 47 ------------ ...lter_programconfig_max_workers_and_more.py | 38 ---------- ...lter_programconfig_max_workers_and_more.py | 69 ------------------ gateway/api/models.py | 25 +++---- gateway/api/ray.py | 12 +-- gateway/api/schedule.py | 35 ++++----- gateway/api/serializers.py | 8 +- gateway/api/v1/serializers.py | 8 +- gateway/api/views.py | 15 ++-- 16 files changed, 144 insertions(+), 237 deletions(-) create mode 100644 gateway/api/migrations/0011_jobconfig_job_config.py delete mode 100644 gateway/api/migrations/0011_programconfig_program_config.py delete mode 100644 gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py delete mode 100644 gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py diff --git a/client/quantum_serverless/__init__.py b/client/quantum_serverless/__init__.py index 057755bd1..36a491c14 100644 --- a/client/quantum_serverless/__init__.py +++ b/client/quantum_serverless/__init__.py @@ -43,7 +43,7 @@ get_auto_discovered_provider, QuantumServerlessException, ) -from .core.pattern import QiskitPattern, Configuration +from .core.pattern import QiskitPattern from .serializers import get_arguments try: diff --git a/client/quantum_serverless/core/__init__.py b/client/quantum_serverless/core/__init__.py index aa896df5e..2f0511922 100644 --- a/client/quantum_serverless/core/__init__.py +++ b/client/quantum_serverless/core/__init__.py @@ -71,13 +71,13 @@ LocalJobClient, Job, save_result, + Configuration, ) from .pattern import ( QiskitPattern, ProgramStorage, ProgramRepository, download_and_unpack_artifact, - Configuration, ) from .decorators import ( remote, diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index 4a4acb8a1..69acb5fde 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -37,7 +37,7 @@ from pathlib import Path from typing import Dict, Any, Optional, List, Union from uuid import uuid4 -from dataclasses import asdict +from dataclasses import asdict, dataclass import subprocess from subprocess import Popen @@ -61,7 +61,7 @@ ENV_JOB_ARGUMENTS, ) -from quantum_serverless.core.pattern import QiskitPattern, Configuration +from quantum_serverless.core.pattern import QiskitPattern from quantum_serverless.exception import QuantumServerlessException from quantum_serverless.serializers.program_serializers import ( QiskitObjectsEncoder, @@ -72,6 +72,23 @@ RuntimeEnv = ray.runtime_env.RuntimeEnv +@dataclass +class Configuration: # pylint: disable=too-many-instance-attributes + """Program Configuration. + + Args: + workers: number of worker pod when auto scaling is NOT enabled + auto_scaling: set True to enable auto scating of the workers + min_workers: minimum number of workers when auto scaling is enabled + max_workers: maxmum number of workers when auto scaling is enabled + """ + + workers: Optional[int] = None + min_workers: Optional[int] = None + max_workers: Optional[int] = None + auto_scaling: Optional[bool] = False + + class BaseJobClient: """Base class for Job clients.""" diff --git a/client/quantum_serverless/core/pattern.py b/client/quantum_serverless/core/pattern.py index 468a817c3..ae0fe7956 100644 --- a/client/quantum_serverless/core/pattern.py +++ b/client/quantum_serverless/core/pattern.py @@ -84,25 +84,6 @@ def __repr__(self): return self.__str__() -@dataclass -class Configuration: # pylint: disable=too-many-instance-attributes - """Program Configuration. - - Args: - workers: number of worker pod when auto scaling is NOT enabled - auto_scaling: set True to enable auto scating of the workers - min_workers: minimum number of workers when auto scaling is enabled - max_workers: maxmum number of workers when auto scaling is enabled - """ - - workers: Optional[int] = None - min_workers: Optional[int] = None - max_workers: Optional[int] = None - auto_scaling: Optional[bool] = False - worker_cpu: Optional[int] = None - worker_mem: Optional[int] = None - - class ProgramStorage(ABC): """Base program backend to save and load programs from.""" diff --git a/client/quantum_serverless/core/provider.py b/client/quantum_serverless/core/provider.py index cd21ae805..bb32f7d34 100644 --- a/client/quantum_serverless/core/provider.py +++ b/client/quantum_serverless/core/provider.py @@ -54,8 +54,9 @@ GatewayJobClient, LocalJobClient, BaseJobClient, + Configuration, ) -from quantum_serverless.core.pattern import QiskitPattern, Configuration +from quantum_serverless.core.pattern import QiskitPattern from quantum_serverless.core.tracing import _trace_env_vars from quantum_serverless.exception import QuantumServerlessException from quantum_serverless.utils import JsonSerializable diff --git a/client/quantum_serverless/quantum_serverless.py b/client/quantum_serverless/quantum_serverless.py index 65d6905bf..aaa578d5c 100644 --- a/client/quantum_serverless/quantum_serverless.py +++ b/client/quantum_serverless/quantum_serverless.py @@ -41,8 +41,8 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.instrumentation.requests import RequestsInstrumentor -from quantum_serverless.core.job import Job -from quantum_serverless.core.pattern import QiskitPattern, Configuration +from quantum_serverless.core.job import Job, Configuration +from quantum_serverless.core.pattern import QiskitPattern from quantum_serverless.core.provider import BaseProvider, ComputeResource from quantum_serverless.exception import QuantumServerlessException diff --git a/gateway/api/migrations/0011_jobconfig_job_config.py b/gateway/api/migrations/0011_jobconfig_job_config.py new file mode 100644 index 000000000..5a3856bad --- /dev/null +++ b/gateway/api/migrations/0011_jobconfig_job_config.py @@ -0,0 +1,73 @@ +# Generated by Django 4.2.2 on 2023-11-03 17:54 + +import django.core.validators +from django.db import migrations, models +import django.db.models.deletion +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0010_job_version"), + ] + + operations = [ + migrations.CreateModel( + name="JobConfig", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ("created", models.DateTimeField(auto_now_add=True)), + ("auto_scaling", models.BooleanField(default=False, null=True)), + ( + "workers", + models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(5), + ], + ), + ), + ( + "min_workers", + models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(2), + ], + ), + ), + ( + "max_workers", + models.IntegerField( + null=True, + validators=[ + django.core.validators.MinValueValidator(0), + django.core.validators.MaxValueValidator(10), + ], + ), + ), + ], + ), + migrations.AddField( + model_name="job", + name="config", + field=models.ForeignKey( + blank=True, + default=None, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="api.jobconfig", + ), + ), + ] diff --git a/gateway/api/migrations/0011_programconfig_program_config.py b/gateway/api/migrations/0011_programconfig_program_config.py deleted file mode 100644 index 68e0f5452..000000000 --- a/gateway/api/migrations/0011_programconfig_program_config.py +++ /dev/null @@ -1,47 +0,0 @@ -# Generated by Django 4.2.2 on 2023-10-19 16:02 - -from django.db import migrations, models -import django.db.models.deletion -import uuid - - -class Migration(migrations.Migration): - - dependencies = [ - ("api", "0010_job_version"), - ] - - operations = [ - migrations.CreateModel( - name="ProgramConfig", - fields=[ - ( - "id", - models.UUIDField( - default=uuid.uuid4, - editable=False, - primary_key=True, - serialize=False, - ), - ), - ("created", models.DateTimeField(auto_now_add=True)), - ("auto_scaling", models.BooleanField(default=False, null=True)), - ("workers", models.IntegerField()), - ("min_workers", models.IntegerField()), - ("max_workers", models.IntegerField()), - ("worker_cpu", models.IntegerField()), - ("worker_mem", models.IntegerField()), - ], - ), - migrations.AddField( - model_name="program", - name="config", - field=models.ForeignKey( - blank=True, - default=None, - null=True, - on_delete=django.db.models.deletion.CASCADE, - to="api.programconfig", - ), - ), - ] diff --git a/gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py b/gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py deleted file mode 100644 index baa7a6f0b..000000000 --- a/gateway/api/migrations/0012_alter_programconfig_max_workers_and_more.py +++ /dev/null @@ -1,38 +0,0 @@ -# Generated by Django 4.2.2 on 2023-10-20 18:30 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("api", "0011_programconfig_program_config"), - ] - - operations = [ - migrations.AlterField( - model_name="programconfig", - name="max_workers", - field=models.IntegerField(null=True), - ), - migrations.AlterField( - model_name="programconfig", - name="min_workers", - field=models.IntegerField(null=True), - ), - migrations.AlterField( - model_name="programconfig", - name="worker_cpu", - field=models.IntegerField(null=True), - ), - migrations.AlterField( - model_name="programconfig", - name="worker_mem", - field=models.IntegerField(null=True), - ), - migrations.AlterField( - model_name="programconfig", - name="workers", - field=models.IntegerField(null=True), - ), - ] diff --git a/gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py b/gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py deleted file mode 100644 index b1f75a584..000000000 --- a/gateway/api/migrations/0013_alter_programconfig_max_workers_and_more.py +++ /dev/null @@ -1,69 +0,0 @@ -# Generated by Django 4.2.2 on 2023-10-22 19:17 - -import django.core.validators -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("api", "0012_alter_programconfig_max_workers_and_more"), - ] - - operations = [ - migrations.AlterField( - model_name="programconfig", - name="max_workers", - field=models.IntegerField( - null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(5), - ], - ), - ), - migrations.AlterField( - model_name="programconfig", - name="min_workers", - field=models.IntegerField( - null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(5), - ], - ), - ), - migrations.AlterField( - model_name="programconfig", - name="worker_cpu", - field=models.IntegerField( - null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(5), - ], - ), - ), - migrations.AlterField( - model_name="programconfig", - name="worker_mem", - field=models.IntegerField( - null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(5), - ], - ), - ), - migrations.AlterField( - model_name="programconfig", - name="workers", - field=models.IntegerField( - null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(5), - ], - ), - ), - ] diff --git a/gateway/api/models.py b/gateway/api/models.py index d71849cf2..df27d3fd8 100644 --- a/gateway/api/models.py +++ b/gateway/api/models.py @@ -17,8 +17,8 @@ def get_upload_path(instance, filename): return f"{instance.author.username}/{instance.id}/{filename}" -class ProgramConfig(models.Model): - """Program Configuration model.""" +class JobConfig(models.Model): + """Job Configuration model.""" id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) created = models.DateTimeField(auto_now_add=True) @@ -45,12 +45,6 @@ class ProgramConfig(models.Model): MaxValueValidator(settings.RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX), ], ) - worker_cpu = models.IntegerField( - null=True, validators=[MinValueValidator(0), MaxValueValidator(5)] - ) - worker_mem = models.IntegerField( - null=True, validators=[MinValueValidator(0), MaxValueValidator(5)] - ) def __str__(self): return self.id @@ -78,13 +72,6 @@ class Program(ExportModelOperationsMixin("program"), models.Model): arguments = models.TextField(null=False, blank=True, default="{}") env_vars = models.TextField(null=False, blank=True, default="{}") dependencies = models.TextField(null=False, blank=True, default="[]") - config = models.ForeignKey( - to=ProgramConfig, - on_delete=models.CASCADE, - default=None, - null=True, - blank=True, - ) def __str__(self): return f"{self.title}" @@ -159,6 +146,14 @@ class Job(models.Model): version = IntegerVersionField() + config = models.ForeignKey( + to=JobConfig, + on_delete=models.CASCADE, + default=None, + null=True, + blank=True, + ) + def __str__(self): return f"" diff --git a/gateway/api/ray.py b/gateway/api/ray.py index f9ac5c0e5..3e2878504 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -20,7 +20,7 @@ from opentelemetry import trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from api.models import ComputeResource, Job, ProgramConfig +from api.models import ComputeResource, Job, JobConfig from api.utils import try_json_loads, retry_function, decrypt_env_vars from main import settings @@ -160,7 +160,7 @@ def create_ray_cluster( user: Any, cluster_name: Optional[str] = None, cluster_data: Optional[str] = None, - program_config: Optional[ProgramConfig] = None, + job_config: Optional[JobConfig] = None, ) -> Optional[ComputeResource]: """Creates ray cluster. @@ -182,10 +182,10 @@ def create_ray_cluster( { "cluster_name": cluster_name, "user_id": user.username, - "workers": program_config.workers, - "min_workers": program_config.min_workers, - "max_workers": program_config.max_workers, - "auto_scaling": program_config.auto_scaling, + "workers": job_config.workers, + "min_workers": job_config.min_workers, + "max_workers": job_config.max_workers, + "auto_scaling": job_config.auto_scaling, } ) cluster_data = yaml.safe_load(manifest) diff --git a/gateway/api/schedule.py b/gateway/api/schedule.py index 7285ab799..40f9241d3 100644 --- a/gateway/api/schedule.py +++ b/gateway/api/schedule.py @@ -14,7 +14,7 @@ from opentelemetry import trace -from api.models import Job, Program, ComputeResource, ProgramConfig +from api.models import Job, Program, ComputeResource, JobConfig from api.ray import submit_job, create_ray_cluster, kill_ray_cluster from main import settings as config @@ -23,40 +23,34 @@ logger = logging.getLogger("commands") -def save_programconfig(request) -> ProgramConfig: - """Save programconfig. +def save_jobconfig(request) -> JobConfig: + """Save jobconfig. Args: request: request data. Returns: - saved programconfig + saved jobconfig """ - programconfig = ProgramConfig( + jobconfig = JobConfig( workers=settings.RAY_CLUSTER_WORKER_REPLICAS, min_workers=settings.RAY_CLUSTER_WORKER_MIN_REPLICAS, max_workers=settings.RAY_CLUSTER_WORKER_MAX_REPLICAS, - worker_cpu=2, - worker_mem=3, auto_scaling=settings.RAY_CLUSTER_WORKER_AUTO_SCALING, ) if request.data.get("config"): config_data = json.loads(request.data.get("config")) if "workers" in config_data and config_data["workers"]: - programconfig.workers = config_data["workers"] + jobconfig.workers = config_data["workers"] if "min_workers" in config_data and config_data["min_workers"]: - programconfig.min_workers = config_data["min_workers"] + jobconfig.min_workers = config_data["min_workers"] if "max_workers" in config_data and config_data["max_workers"]: - programconfig.max_workers = config_data["max_workers"] - if "worker_cpu" in config_data and config_data["worker_cpu"]: - programconfig.worker_cpu = config_data["worker_cpu"] - if "worker_mem" in config_data and config_data["worker_mem"]: - programconfig.worker_mem = config_data["worker_mem"] + jobconfig.max_workers = config_data["max_workers"] if "auto_scaling" in config_data and config_data["auto_scaling"]: - programconfig.auto_scaling = config_data["auto_scaling"] - programconfig.full_clean() - programconfig.save() - return programconfig + jobconfig.auto_scaling = config_data["auto_scaling"] + jobconfig.full_clean() + jobconfig.save() + return jobconfig def save_program(serializer, request) -> Program: @@ -69,8 +63,6 @@ def save_program(serializer, request) -> Program: saved program """ - programconfig = save_programconfig(request) - existing_program = ( Program.objects.filter(title=serializer.data.get("title"), author=request.user) .order_by("-created") @@ -87,7 +79,6 @@ def save_program(serializer, request) -> Program: program = Program(**serializer.data) program.artifact = request.FILES.get("artifact") program.author = request.user - program.config = programconfig program.save() return program @@ -140,7 +131,7 @@ def execute_job(job: Job) -> Job: job.logs = "Compute resource was not found." else: compute_resource = create_ray_cluster( - job.author, cluster_name=cluster_name, program_config=job.program.config + job.author, cluster_name=cluster_name, job_config=job.config ) if compute_resource: # if compute resource was created in time with no problems diff --git a/gateway/api/serializers.py b/gateway/api/serializers.py index 97be7b862..dd39357af 100644 --- a/gateway/api/serializers.py +++ b/gateway/api/serializers.py @@ -8,16 +8,16 @@ from rest_framework import serializers -from .models import Program, Job, ProgramConfig +from .models import Program, Job, JobConfig -class ProgramConfigSerializer(serializers.ModelSerializer): +class JobConfigSerializer(serializers.ModelSerializer): """ - Serializer for the Program Config model. + Serializer for the Job Config model. """ class Meta: - model = ProgramConfig + model = JobConfig class ProgramSerializer(serializers.ModelSerializer): diff --git a/gateway/api/v1/serializers.py b/gateway/api/v1/serializers.py index 0159cb89b..14fa2663b 100644 --- a/gateway/api/v1/serializers.py +++ b/gateway/api/v1/serializers.py @@ -5,18 +5,16 @@ from api import serializers -class ProgramConfigSerializer(serializers.ProgramSerializer): +class JobConfigSerializer(serializers.JobConfigSerializer): """ - Program Config serializer first version. Include basic fields from the initial model. + Job Config serializer first version. Include basic fields from the initial model. """ - class Meta(serializers.ProgramConfigSerializer.Meta): + class Meta(serializers.JobConfigSerializer.Meta): fields = [ "workers", "min_workers", "max_workers", - "worker_cpu", - "worker_mem", "auto_scaling", ] diff --git a/gateway/api/views.py b/gateway/api/views.py index ee9d626ca..9d880eb6e 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -37,7 +37,7 @@ from .models import Program, Job from .ray import get_job_handler -from .schedule import save_program, save_programconfig +from .schedule import save_program, save_jobconfig from .serializers import JobSerializer, ExistingProgramSerializer from .utils import build_env_variables, encrypt_env_vars @@ -116,15 +116,17 @@ def run_existing(self, request): status=status.HTTP_404_NOT_FOUND, ) - programconfig = save_programconfig(request) - program.config = programconfig - program.save() + try: + jobconfig = save_jobconfig(request) + except ValidationError as exp: + return HttpResponseBadRequest(f"Bad Request: {exp}") job = Job( program=program, arguments=serializer.data.get("arguments"), author=request.user, status=Job.QUEUED, + config=jobconfig, ) job.save() @@ -156,14 +158,17 @@ def run(self, request): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) try: - program = save_program(serializer=serializer, request=request) + jobconfig = save_jobconfig(request) except ValidationError as exp: return HttpResponseBadRequest(f"Bad Request: {exp}") + program = save_program(serializer=serializer, request=request) + job = Job( program=program, arguments=program.arguments, author=request.user, status=Job.QUEUED, + config=jobconfig, ) job.save() From a03ada2d4ddce8cd917d4000550591743e1e4801 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Fri, 3 Nov 2023 17:57:20 -0400 Subject: [PATCH 3/5] fix merge error --- client/quantum_serverless/core/job.py | 12 ++++++++++-- client/quantum_serverless/core/provider.py | 1 + 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index 69acb5fde..05350d6a3 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -109,6 +109,7 @@ def run_existing( self, program: Union[str, QiskitPattern], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ): """Executes existing program.""" raise NotImplementedError @@ -208,6 +209,7 @@ def run_existing( self, program: Union[str, QiskitPattern], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ): raise NotImplementedError("Run existing is not available for RayJobClient.") @@ -242,7 +244,12 @@ def get(self, job_id) -> Optional["Job"]: def list(self, **kwargs) -> List["Job"]: return [job["job"] for job in list(self._jobs.values())] - def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None): + def run( + self, + program: QiskitPattern, + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ): if program.dependencies: for dependency in program.dependencies: subprocess.check_call( @@ -294,10 +301,11 @@ def upload(self, program: QiskitPattern): } return program.title - def run_existing( + def run_existing( # pylint: disable=too-many-locals self, program: Union[str, QiskitPattern], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ): if isinstance(program, QiskitPattern): title = program.title diff --git a/client/quantum_serverless/core/provider.py b/client/quantum_serverless/core/provider.py index bb32f7d34..7a86c8307 100644 --- a/client/quantum_serverless/core/provider.py +++ b/client/quantum_serverless/core/provider.py @@ -620,6 +620,7 @@ def run( self, program: Union[QiskitPattern, str], arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, ) -> Job: if isinstance(program, QiskitPattern) and program.entrypoint is not None: job = self.client.run(program, arguments) From 6196936b2c96acd816d591ae033e25ab51de7370 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Tue, 7 Nov 2023 12:05:44 -0500 Subject: [PATCH 4/5] review comments --- .../migrations/0011_jobconfig_job_config.py | 12 ------ gateway/api/models.py | 14 ------- gateway/api/ray.py | 11 ++++++ gateway/api/schedule.py | 33 +---------------- gateway/api/serializers.py | 27 +++++++++++++- gateway/api/v1/serializers.py | 14 ------- gateway/api/views.py | 37 +++++++++++++------ gateway/main/settings.py | 4 +- 8 files changed, 65 insertions(+), 87 deletions(-) diff --git a/gateway/api/migrations/0011_jobconfig_job_config.py b/gateway/api/migrations/0011_jobconfig_job_config.py index 5a3856bad..458144316 100644 --- a/gateway/api/migrations/0011_jobconfig_job_config.py +++ b/gateway/api/migrations/0011_jobconfig_job_config.py @@ -31,30 +31,18 @@ class Migration(migrations.Migration): "workers", models.IntegerField( null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(5), - ], ), ), ( "min_workers", models.IntegerField( null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(2), - ], ), ), ( "max_workers", models.IntegerField( null=True, - validators=[ - django.core.validators.MinValueValidator(0), - django.core.validators.MaxValueValidator(10), - ], ), ), ], diff --git a/gateway/api/models.py b/gateway/api/models.py index df27d3fd8..636171526 100644 --- a/gateway/api/models.py +++ b/gateway/api/models.py @@ -4,8 +4,6 @@ from django.core.validators import ( FileExtensionValidator, - MinValueValidator, - MaxValueValidator, ) from django.db import models from django.conf import settings @@ -26,24 +24,12 @@ class JobConfig(models.Model): auto_scaling = models.BooleanField(default=False, null=True) workers = models.IntegerField( null=True, - validators=[ - MinValueValidator(0), - MaxValueValidator(settings.RAY_CLUSTER_WORKER_REPLICAS_MAX), - ], ) min_workers = models.IntegerField( null=True, - validators=[ - MinValueValidator(0), - MaxValueValidator(settings.RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX), - ], ) max_workers = models.IntegerField( null=True, - validators=[ - MinValueValidator(0), - MaxValueValidator(settings.RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX), - ], ) def __str__(self): diff --git a/gateway/api/ray.py b/gateway/api/ray.py index 3e2878504..fe217efcb 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -177,6 +177,17 @@ def create_ray_cluster( namespace = settings.RAY_KUBERAY_NAMESPACE cluster_name = cluster_name or f"{user.username}-{str(uuid.uuid4())[:8]}" if not cluster_data: + if not job_config: + job_config = JobConfig() + if not job_config.workers: + job_config.workers = settings.RAY_CLUSTER_WORKER_REPLICAS + if not job_config.min_workers: + job_config.min_workers = settings.RAY_CLUSTER_WORKER_MIN_REPLICAS + if not job_config.max_workers: + job_config.max_workers = settings.RAY_CLUSTER_WORKER_MAX_REPLICAS + if not job_config.auto_scaling: + job_config.auto_scaling = settings.RAY_CLUSTER_WORKER_AUTO_SCALING + cluster = get_template("rayclustertemplate.yaml") manifest = cluster.render( { diff --git a/gateway/api/schedule.py b/gateway/api/schedule.py index 40f9241d3..1ac00dde7 100644 --- a/gateway/api/schedule.py +++ b/gateway/api/schedule.py @@ -1,6 +1,5 @@ """Scheduling related functions.""" import logging -import json import random import uuid from typing import List @@ -14,7 +13,7 @@ from opentelemetry import trace -from api.models import Job, Program, ComputeResource, JobConfig +from api.models import Job, Program, ComputeResource from api.ray import submit_job, create_ray_cluster, kill_ray_cluster from main import settings as config @@ -23,36 +22,6 @@ logger = logging.getLogger("commands") -def save_jobconfig(request) -> JobConfig: - """Save jobconfig. - - Args: - request: request data. - - Returns: - saved jobconfig - """ - jobconfig = JobConfig( - workers=settings.RAY_CLUSTER_WORKER_REPLICAS, - min_workers=settings.RAY_CLUSTER_WORKER_MIN_REPLICAS, - max_workers=settings.RAY_CLUSTER_WORKER_MAX_REPLICAS, - auto_scaling=settings.RAY_CLUSTER_WORKER_AUTO_SCALING, - ) - if request.data.get("config"): - config_data = json.loads(request.data.get("config")) - if "workers" in config_data and config_data["workers"]: - jobconfig.workers = config_data["workers"] - if "min_workers" in config_data and config_data["min_workers"]: - jobconfig.min_workers = config_data["min_workers"] - if "max_workers" in config_data and config_data["max_workers"]: - jobconfig.max_workers = config_data["max_workers"] - if "auto_scaling" in config_data and config_data["auto_scaling"]: - jobconfig.auto_scaling = config_data["auto_scaling"] - jobconfig.full_clean() - jobconfig.save() - return jobconfig - - def save_program(serializer, request) -> Program: """Save program. diff --git a/gateway/api/serializers.py b/gateway/api/serializers.py index dd39357af..8cd57935d 100644 --- a/gateway/api/serializers.py +++ b/gateway/api/serializers.py @@ -6,8 +6,8 @@ Version serializers inherit from the different serializers. """ +from django.conf import settings from rest_framework import serializers - from .models import Program, Job, JobConfig @@ -18,6 +18,31 @@ class JobConfigSerializer(serializers.ModelSerializer): class Meta: model = JobConfig + fields = [ + "workers", + "min_workers", + "max_workers", + "auto_scaling", + ] + + workers = serializers.IntegerField( + max_value=settings.RAY_CLUSTER_WORKER_REPLICAS_MAX, + required=False, + allow_null=True, + ) + min_workers = serializers.IntegerField( + max_value=settings.RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX, + required=False, + allow_null=True, + ) + max_workers = serializers.IntegerField( + max_value=settings.RAY_CLUSTER_WORKER_MAX_REPLICAS_MAX, + required=False, + allow_null=True, + ) + auto_scaling = serializers.BooleanField( + default=False, required=False, allow_null=True + ) class ProgramSerializer(serializers.ModelSerializer): diff --git a/gateway/api/v1/serializers.py b/gateway/api/v1/serializers.py index 14fa2663b..d5c557b1a 100644 --- a/gateway/api/v1/serializers.py +++ b/gateway/api/v1/serializers.py @@ -5,20 +5,6 @@ from api import serializers -class JobConfigSerializer(serializers.JobConfigSerializer): - """ - Job Config serializer first version. Include basic fields from the initial model. - """ - - class Meta(serializers.JobConfigSerializer.Meta): - fields = [ - "workers", - "min_workers", - "max_workers", - "auto_scaling", - ] - - class ProgramSerializer(serializers.ProgramSerializer): """ Program serializer first version. Include basic fields from the initial model. diff --git a/gateway/api/views.py b/gateway/api/views.py index 9d880eb6e..6b5918360 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -20,8 +20,7 @@ from dj_rest_auth.registration.views import SocialLoginView from django.conf import settings from django.contrib.auth import get_user_model -from django.core.exceptions import ValidationError -from django.http import StreamingHttpResponse, HttpResponseBadRequest +from django.http import StreamingHttpResponse from rest_framework import viewsets, permissions, status from rest_framework.decorators import action from rest_framework.generics import get_object_or_404 @@ -37,8 +36,8 @@ from .models import Program, Job from .ray import get_job_handler -from .schedule import save_program, save_jobconfig -from .serializers import JobSerializer, ExistingProgramSerializer +from .schedule import save_program +from .serializers import JobSerializer, ExistingProgramSerializer, JobConfigSerializer from .utils import build_env_variables, encrypt_env_vars logger = logging.getLogger("gateway") @@ -116,10 +115,17 @@ def run_existing(self, request): status=status.HTTP_404_NOT_FOUND, ) - try: - jobconfig = save_jobconfig(request) - except ValidationError as exp: - return HttpResponseBadRequest(f"Bad Request: {exp}") + jobconfig = None + config_data = request.data.get("config") + if config_data: + config_serializer = JobConfigSerializer(data=json.loads(config_data)) + if not config_serializer.is_valid(): + print(config_serializer.errors) + return Response( + config_serializer.errors, status=status.HTTP_400_BAD_REQUEST + ) + + jobconfig = config_serializer.save() job = Job( program=program, @@ -157,10 +163,17 @@ def run(self, request): if not serializer.is_valid(): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - try: - jobconfig = save_jobconfig(request) - except ValidationError as exp: - return HttpResponseBadRequest(f"Bad Request: {exp}") + jobconfig = None + config_data = request.data.get("config") + if config_data: + config_serializer = JobConfigSerializer(data=json.loads(config_data)) + if not config_serializer.is_valid(): + return Response( + config_serializer.errors, status=status.HTTP_400_BAD_REQUEST + ) + + jobconfig = config_serializer.save() + program = save_program(serializer=serializer, request=request) job = Job( diff --git a/gateway/main/settings.py b/gateway/main/settings.py index a13290634..608facef7 100644 --- a/gateway/main/settings.py +++ b/gateway/main/settings.py @@ -300,12 +300,12 @@ "RAY_CLUSTER_MODE_LOCAL_HOST", "http://localhost:8265" ), } -RAY_CLUSTER_WORKER_REPLICAS = int(os.environ.get("RAY_CLUSTER_WORKER_REPLICAS", "0")) +RAY_CLUSTER_WORKER_REPLICAS = int(os.environ.get("RAY_CLUSTER_WORKER_REPLICAS", "1")) RAY_CLUSTER_WORKER_REPLICAS_MAX = int( os.environ.get("RAY_CLUSTER_WORKER_REPLICAS_MAX", "5") ) RAY_CLUSTER_WORKER_MIN_REPLICAS = int( - os.environ.get("RAY_CLUSTER_WORKER_MIN_REPLICAS", "0") + os.environ.get("RAY_CLUSTER_WORKER_MIN_REPLICAS", "1") ) RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX = int( os.environ.get("RAY_CLUSTER_WORKER_MIN_REPLICAS_MAX", "2") From a85c93060641e2c7df33a7e3c5be66fec7e49b75 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Wed, 8 Nov 2023 12:17:54 -0500 Subject: [PATCH 5/5] add tests --- gateway/api/views.py | 1 - gateway/tests/api/test_serializer.py | 33 ++++++++++++++++++++++++++++ gateway/tests/api/test_v1_program.py | 29 +++++++++++++++++++++++- 3 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 gateway/tests/api/test_serializer.py diff --git a/gateway/api/views.py b/gateway/api/views.py index 6b5918360..6fc312769 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -120,7 +120,6 @@ def run_existing(self, request): if config_data: config_serializer = JobConfigSerializer(data=json.loads(config_data)) if not config_serializer.is_valid(): - print(config_serializer.errors) return Response( config_serializer.errors, status=status.HTTP_400_BAD_REQUEST ) diff --git a/gateway/tests/api/test_serializer.py b/gateway/tests/api/test_serializer.py new file mode 100644 index 000000000..db6a6cdda --- /dev/null +++ b/gateway/tests/api/test_serializer.py @@ -0,0 +1,33 @@ +"""Tests for serializer functions.""" + +import json +from rest_framework.test import APITestCase +from api.serializers import JobConfigSerializer +from api.models import JobConfig + + +class SerializerTest(APITestCase): + """Tests for serializer.""" + + def test_JobConfigSerializer(self): + data = '{"workers": null, "min_workers": 1, "max_workers": 5, "auto_scaling": true}' + config_serializer = JobConfigSerializer(data=json.loads(data)) + assert config_serializer.is_valid() + jobconfig = config_serializer.save() + + entry = JobConfig.objects.get(id=jobconfig.id) + assert not entry.workers + assert entry.min_workers == 1 + assert entry.max_workers == 5 + assert entry.auto_scaling + + data = '{"workers": 3, "min_workers": null, "max_workers": null, "auto_scaling": null}' + config_serializer = JobConfigSerializer(data=json.loads(data)) + assert config_serializer.is_valid() + jobconfig = config_serializer.save() + + entry = JobConfig.objects.get(id=jobconfig.id) + assert entry.workers == 3 + assert not entry.min_workers + assert not entry.max_workers + assert not entry.auto_scaling diff --git a/gateway/tests/api/test_v1_program.py b/gateway/tests/api/test_v1_program.py index ac23a5220..757ce7e30 100644 --- a/gateway/tests/api/test_v1_program.py +++ b/gateway/tests/api/test_v1_program.py @@ -1,8 +1,8 @@ """Tests program APIs.""" - from django.urls import reverse from rest_framework import status from rest_framework.test import APITestCase +from api.models import Job, JobConfig class TestProgramApi(APITestCase): @@ -54,3 +54,30 @@ def test_program_detail(self): self.assertEqual(programs_response.status_code, status.HTTP_200_OK) self.assertEqual(programs_response.data.get("title"), "Program") self.assertEqual(programs_response.data.get("entrypoint"), "program.py") + + def test_run_existing(self): + """Tests run existing authorized.""" + auth = reverse("rest_login") + response = self.client.post( + auth, {"username": "test_user", "password": "123"}, format="json" + ) + token = response.data.get("access") + self.client.credentials(HTTP_AUTHORIZATION="Bearer " + token) + + programs_response = self.client.post( + "/api/v1/programs/run_existing/", + data={ + "title": "Program", + "entrypoint": "program.py", + "arguments": {}, + "dependencies": [], + "config": '{"workers": null, "min_workers": 1, "max_workers": 5, "auto_scaling": true}', + }, + format="json", + ) + job_id = programs_response.data.get("id") + job = Job.objects.get(id=job_id) + self.assertEqual(job.config.min_workers, 1) + self.assertEqual(job.config.max_workers, 5) + self.assertEqual(job.config.workers, None) + self.assertEqual(job.config.auto_scaling, True)