From 9ce1e27889460a8685111f32ff0c7bc0a3af1fdc Mon Sep 17 00:00:00 2001 From: Fiona Waters Date: Sun, 6 Oct 2024 22:31:16 +0100 Subject: [PATCH] Further refactoring of pytorch launcher for pipelines v2 and training operator Signed-off-by: Fiona-Waters fiwaters6@gmail.com Signed-off-by: Fiona Waters --- .../kubeflow/pytorch-launcher/Dockerfile | 2 +- .../pytorch-launcher/requirements.txt | 2 + .../kubeflow/pytorch-launcher/sample.py | 223 ++++++++---------- .../pytorch-launcher/src/launch_pytorchjob.py | 34 +-- 4 files changed, 122 insertions(+), 139 deletions(-) diff --git a/components/kubeflow/pytorch-launcher/Dockerfile b/components/kubeflow/pytorch-launcher/Dockerfile index 378b37fd523..16c105acddc 100644 --- a/components/kubeflow/pytorch-launcher/Dockerfile +++ b/components/kubeflow/pytorch-launcher/Dockerfile @@ -20,4 +20,4 @@ RUN pip install --no-cache-dir -r requirements.txt ADD build /ml -ENTRYPOINT ["python", "/ml/launch_pytorchjob.py"] +ENTRYPOINT ["python", "/ml/src/launch_pytorchjob.py"] diff --git a/components/kubeflow/pytorch-launcher/requirements.txt b/components/kubeflow/pytorch-launcher/requirements.txt index 2fc0c670117..607b033ad28 100644 --- a/components/kubeflow/pytorch-launcher/requirements.txt +++ b/components/kubeflow/pytorch-launcher/requirements.txt @@ -1,4 +1,6 @@ pyyaml kubernetes kubeflow-pytorchjob +kubeflow.training retrying +str2bool diff --git a/components/kubeflow/pytorch-launcher/sample.py b/components/kubeflow/pytorch-launcher/sample.py index 2d21adf3d22..588a5f25b6c 100644 --- a/components/kubeflow/pytorch-launcher/sample.py +++ b/components/kubeflow/pytorch-launcher/sample.py @@ -1,8 +1,7 @@ -from typing import NamedTuple -from collections import namedtuple import kfp from kfp import dsl -from kfp import components +from typing import Optional + def get_current_namespace(): """Returns current namespace if available, else kubeflow""" @@ -14,12 +13,52 @@ def get_current_namespace(): current_namespace = "kubeflow" return current_namespace -@dsl.component(base_image="python:slim") + +@dsl.component() +def create_master_spec() -> dict: + # Define master spec + master = { + "replicas": 1, + "restartPolicy": "OnFailure", + "template": { + "metadata": { + "annotations": { + # See https://github.com/kubeflow/website/issues/2011 + "sidecar.istio.io/inject": "false" + } + }, + "spec": { + "containers": [ + { + "args": [ + "--backend", + "gloo", + ], + "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest", + "name": "pytorch", + "resources": { + "requests": { + "memory": "4Gi", + "cpu": "2000m", + }, + "limits": { + "memory": "4Gi", + "cpu": "2000m", + }, + }, + } + ], + }, + }, + } + + return master + + +@dsl.component def create_worker_spec( worker_num: int = 0 -) -> NamedTuple( - "CreatWorkerSpec", [("worker_spec", dict)] -): # type: ignore +) -> dict: """ Creates pytorch-job worker spec """ @@ -63,158 +102,96 @@ def create_worker_spec( }, } - worker_spec_output = namedtuple( - "MyWorkerOutput", ["worker_spec"] - ) - return worker_spec_output(worker) + return worker # container component description setting inputs and implementation @dsl.container_component def pytorch_job_launcher( name: str, - namespace: str = 'kubeflow', + kind: str = "PyTorchJob", + namespace: str = "kubeflow", version: str = 'v1', master_spec: dict = {}, worker_spec: dict = {}, job_timeout_minutes: int = 1440, delete_after_done: bool = True, clean_pod_policy: str = 'Running', - active_deadline_seconds: int = None, - backoff_limit: int = None, - ttl_seconds_after_finished: int = None, + active_deadline_seconds: Optional[int] = None, + backoff_limit: Optional[int] = None, + ttl_seconds_after_finished: Optional[int] = None, ): + command_args = [ + '--name', name, + '--kind', kind, + '--namespace', namespace, + '--version', version, + '--masterSpec', master_spec, + '--workerSpec', worker_spec, + '--jobTimeoutMinutes', job_timeout_minutes, + '--deleteAfterDone', delete_after_done, + '--cleanPodPolicy', clean_pod_policy,] + if active_deadline_seconds is not None and isinstance(active_deadline_seconds, int): + command_args.append(['--activeDeadlineSeconds', str(active_deadline_seconds)]) + if backoff_limit is not None and isinstance(backoff_limit, int): + command_args.append(['--backoffLimit', str(backoff_limit)]) + if ttl_seconds_after_finished is not None and isinstance(ttl_seconds_after_finished, int): + command_args.append(['--ttlSecondsAfterFinished', str(ttl_seconds_after_finished)]) + return dsl.ContainerSpec( image='quay.io/rh_ee_fwaters/kubeflow-pytorchjob-launcher:v2', - command=['python', '/ml/launch_pytorchjob.py'], - args=[ - '--name', name, - '--namespace', namespace, - '--version', version, - '--masterSpec', master_spec, - '--workerSpec', worker_spec, - '--jobTimeoutMinutes', job_timeout_minutes, - '--deleteAfterDone', delete_after_done, - '--cleanPodPolicy', clean_pod_policy, - dsl.IfPresentPlaceholder(input_name='active_deadline_seconds', then=['--activeDeadlineSeconds', active_deadline_seconds]), - dsl.IfPresentPlaceholder(input_name='backoff_limit', then=['--backoffLimit', backoff_limit]), - dsl.IfPresentPlaceholder(input_name='ttl_seconds_after_finished', then=['--ttlSecondsAfterFinished', ttl_seconds_after_finished]) - ] + command=['python', '/ml/src/launch_pytorchjob.py'], + args=command_args ) + @dsl.pipeline( name="launch-kubeflow-pytorchjob", description="An example to launch pytorch.", ) -def pytorch_job_pipeline(): - pytorch_job = pytorch_job_launcher( - name="sample-pytorch-job", - namespace="kubeflow", - version="v1", - master_spec={}, - worker_spec={}, - job_timeout_minutes=1440, - delete_after_done=True, - clean_pod_policy="Running" - ) - -@dsl.component() -def mnist_train( - namespace: str = get_current_namespace(), +def pytorch_job_pipeline( + kind: str = "PyTorchJob", worker_replicas: int = 1, - ttl_seconds_after_finished: int = -1, - job_timeout_minutes: int = 600, - delete_after_done: bool = False, + ttl_seconds_after_finished: int = 3600, + job_timeout_minutes: int = 1440, + delete_after_done: bool = True, + clean_pod_policy: str ="Running" ): - pytorchjob_launcher_op = pytorch_job_pipeline + + namespace = get_current_namespace() + worker_spec = create_worker_spec(worker_num=worker_replicas) + master_spec = create_master_spec() - master = { - "replicas": 1, - "restartPolicy": "OnFailure", - "template": { - "metadata": { - "annotations": { - # See https://github.com/kubeflow/website/issues/2011 - "sidecar.istio.io/inject": "false" - } - }, - "spec": { - "containers": [ - { - # To override default command - # "command": [ - # "python", - # "/opt/mnist/src/mnist.py" - # ], - "args": [ - "--backend", - "gloo", - ], - # Or, create your own image from - # https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist - "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest", - "name": "pytorch", - "resources": { - "requests": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - "limits": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - }, - } - ], - # If imagePullSecrets required - # "imagePullSecrets": [ - # {"name": "image-pull-secret"}, - # ], - }, - }, - } - create_worker_spec(worker_replicas) - - # Launch and monitor the job with the launcher - pytorchjob_launcher_op( - # Note: name needs to be a unique pytorchjob name in the namespace. - # Using RUN_ID_PLACEHOLDER is one way of getting something unique. - name=f"name-{kfp.dsl.RUN_ID_PLACEHOLDER}", + result = pytorch_job_launcher( + name=f"mnist-train-{kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER}", + kind=kind, namespace=namespace, - master_spec=master, - # pass worker_spec as a string because the JSON serializer will convert - # the placeholder for worker_replicas (which it sees as a string) into - # a quoted variable (eg a string) instead of an unquoted variable - # (number). If worker_replicas is quoted in the spec, it will break in - # k8s. See https://github.com/kubeflow/pipelines/issues/4776 - worker_spec=create_worker_spec.outputs[ - "worker_spec" - ], + version="v2", + worker_spec=worker_spec.output, + master_spec=master_spec.output, ttl_seconds_after_finished=ttl_seconds_after_finished, job_timeout_minutes=job_timeout_minutes, delete_after_done=delete_after_done, + clean_pod_policy=clean_pod_policy, ) if __name__ == "__main__": import kfp.compiler as compiler - pipeline_file = "test.tar.gz" + pipeline_file = "test.yaml" print( f"Compiling pipeline as {pipeline_file}" ) compiler.Compiler().compile( - mnist_train, pipeline_file + pytorch_job_pipeline, pipeline_file ) -# # To run: -# client = kfp.Client() -# run = client.create_run_from_pipeline_package( -# pipeline_file, -# arguments={}, -# run_name="test pytorchjob run" -# ) -# print(f"Created run {run}") + # To run: + host="http://localhost:8080" + client = kfp.Client(host=host) + run = client.create_run_from_pipeline_package( + pipeline_file, + arguments={}, + run_name="test pytorchjob run" + ) + print(f"Created run {run}") diff --git a/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py b/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py index 0b9aba1ff69..70c661dac20 100644 --- a/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py +++ b/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py @@ -1,16 +1,16 @@ import argparse import datetime -from distutils.util import strtobool +from str2bool import str2bool import logging import yaml from kubernetes import client as k8s_client from kubernetes import config -import launch_crd -from kubeflow.pytorchjob import V1PyTorchJob as V1PyTorchJob_original -from kubeflow.pytorchjob import V1PyTorchJobSpec as V1PyTorchJobSpec_original from kubeflow.training import TrainingClient +from kubeflow.training import KubeflowOrgV1RunPolicy +from kubeflow.training import KubeflowOrgV1PyTorchJob +from kubeflow.training import KubeflowOrgV1PyTorchJobSpec def yamlOrJsonStr(string): @@ -30,16 +30,16 @@ def get_current_namespace(): # Patch PyTorchJob APIs to align with k8s usage -class V1PyTorchJob(V1PyTorchJob_original): +class V1PyTorchJob(KubeflowOrgV1PyTorchJob): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.openapi_types = self.swagger_types + # self.openapi_types = self.swagger_types -class V1PyTorchJobSpec(V1PyTorchJobSpec_original): +class V1PyTorchJobSpec(KubeflowOrgV1PyTorchJobSpec): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.openapi_types = self.swagger_types + # self.openapi_types = self.swagger_types def get_arg_parser(): @@ -71,7 +71,7 @@ def get_arg_parser(): parser.add_argument('--workerSpec', type=yamlOrJsonStr, default={}, help='Job worker replicaSpecs.') - parser.add_argument('--deleteAfterDone', type=strtobool, + parser.add_argument('--deleteAfterDone', type=str2bool, default=True, help='When Job done, delete the Job automatically if it is True.') parser.add_argument('--jobTimeoutMinutes', type=int, @@ -100,7 +100,7 @@ def main(args): 'Master': args.masterSpec, 'Worker': args.workerSpec, }, - run_policy=k8s_client.V1RunPolicy( + run_policy=KubeflowOrgV1RunPolicy( active_deadline_seconds=args.activeDeadlineSeconds, backoff_limit=args.backoffLimit, clean_pod_policy=args.cleanPodPolicy, @@ -119,16 +119,20 @@ def main(args): ), spec=jobSpec, ) - - serialized_job = k8s_client.ApiClient().sanitize_for_serialization(job) + # print(job) + # serialized_job = k8s_client.ApiClient().sanitize_for_serialization(job) + # print(serialized_job) logging.info('Creating TrainingClient.') - config.load_incluster_config() + # remove one of these depending on where you are running this + # config.load_incluster_config() + config.load_kube_config() + training_client = TrainingClient() - logging.info('Submitting PyTorchJob.') - training_client.create_job(serialized_job) + logging.info(f"Creating PyTorchJob in namespace: {args.namespace}") + training_client.create_job(job, namespace=args.namespace) expected_conditions = ["Succeeded", "Failed"] logging.info(