diff --git a/.github/workflows/pythonpublish.yml b/.github/workflows/pythonpublish.yml index 7e3cb6e416..325800d928 100644 --- a/.github/workflows/pythonpublish.yml +++ b/.github/workflows/pythonpublish.yml @@ -141,3 +141,45 @@ jobs: file: ./plugins/flytekit-sqlalchemy/Dockerfile cache-from: type=gha cache-to: type=gha,mode=max + + build-and-push-spark-images: + runs-on: ubuntu-latest + needs: deploy + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: "0" + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v1 + - name: Login to GitHub Container Registry + if: ${{ github.event_name == 'release' }} + uses: docker/login-action@v1 + with: + registry: ghcr.io + username: "${{ secrets.FLYTE_BOT_USERNAME }}" + password: "${{ secrets.FLYTE_BOT_PAT }}" + - name: Prepare Spark Image Names + id: spark-names + uses: docker/metadata-action@v3 + with: + images: | + ghcr.io/${{ github.repository_owner }}/flytekit + tags: | + spark-latest + spark-${{ github.sha }} + spark-${{ needs.deploy.outputs.version }} + - name: Push Spark Image to GitHub Registry + uses: docker/build-push-action@v2 + with: + context: "./plugins/flytekit-spark/" + platforms: linux/arm64, linux/amd64 + push: ${{ github.event_name == 'release' }} + tags: ${{ steps.spark-names.outputs.tags }} + build-args: | + VERSION=${{ needs.deploy.outputs.version }} + file: ./plugins/flytekit-spark/Dockerfile + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/flytekit/configuration/default_images.py b/flytekit/configuration/default_images.py index 8c01041eed..625e69d9ae 100644 --- a/flytekit/configuration/default_images.py +++ b/flytekit/configuration/default_images.py @@ -30,14 +30,19 @@ def default_image(cls) -> str: def find_image_for( cls, python_version: typing.Optional[PythonVersion] = None, flytekit_version: typing.Optional[str] = None ) -> str: + if python_version is None: + python_version = PythonVersion((sys.version_info.major, sys.version_info.minor)) + + return cls._DEFAULT_IMAGE_PREFIXES[python_version] + ( + flytekit_version.replace("v", "") if flytekit_version else cls.get_version_suffix() + ) + + @classmethod + def get_version_suffix(cls) -> str: from flytekit import __version__ if not __version__ or __version__ == "0.0.0+develop": version_suffix = "latest" else: version_suffix = __version__ - if python_version is None: - python_version = PythonVersion((sys.version_info.major, sys.version_info.minor)) - return cls._DEFAULT_IMAGE_PREFIXES[python_version] + ( - flytekit_version.replace("v", "") if flytekit_version else version_suffix - ) + return version_suffix diff --git a/flytekit/image_spec/image_spec.py b/flytekit/image_spec/image_spec.py index e35fd5c597..0e0ca6ea52 100644 --- a/flytekit/image_spec/image_spec.py +++ b/flytekit/image_spec/image_spec.py @@ -1,7 +1,6 @@ import base64 import hashlib import os -import sys import typing from abc import abstractmethod from copy import copy @@ -27,7 +26,7 @@ class ImageSpec: Args: name: name of the image. - python_version: python version of the image. + python_version: python version of the image. Use default python in the base image if None. builder: Type of plugin to build the image. Use envd by default. source_root: source root of the image. env: environment variables of the image. @@ -38,7 +37,7 @@ class ImageSpec: """ name: str = "flytekit" - python_version: str = f"{sys.version_info.major}.{sys.version_info.minor}" + python_version: str = None # Use default python in the base image if None. builder: str = "envd" source_root: Optional[str] = None env: Optional[typing.Dict[str, str]] = None @@ -94,6 +93,9 @@ def exist(self) -> bool: if response.status_code == 200: return True + if response.status_code == 404: + return False + click.secho(f"Failed to check if the image exists with error : {e}", fg="red") click.secho("Flytekit assumes that the image already exists.", fg="blue") return True diff --git a/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py b/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py index e861b69310..fec6647443 100644 --- a/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py +++ b/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py @@ -48,16 +48,20 @@ def build(): base(image="{base_image}", dev=False) install.python_packages(name = [{', '.join(map(str, map(lambda x: f'"{x}"', packages)))}]) install.apt_packages(name = [{', '.join(map(str, map(lambda x: f'"{x}"', apt_packages)))}]) - install.python(version="{image_spec.python_version}") runtime.environ(env={env}) """ + if image_spec.python_version: + # Indentation is required by envd + envd_config += f' install.python(version="{image_spec.python_version}")\n' + ctx = context_manager.FlyteContextManager.current_context() cfg_path = ctx.file_access.get_random_local_path("build.envd") pathlib.Path(cfg_path).parent.mkdir(parents=True, exist_ok=True) if image_spec.source_root: shutil.copytree(image_spec.source_root, pathlib.Path(cfg_path).parent, dirs_exist_ok=True) + # Indentation is required by envd envd_config += ' io.copy(host_path="./", envd_path="/root")' with open(cfg_path, "w+") as f: diff --git a/plugins/flytekit-envd/tests/test_image_spec.py b/plugins/flytekit-envd/tests/test_image_spec.py index 87b0668171..7c7ccd2151 100644 --- a/plugins/flytekit-envd/tests/test_image_spec.py +++ b/plugins/flytekit-envd/tests/test_image_spec.py @@ -25,7 +25,7 @@ def build(): base(image="cr.flyte.org/flyteorg/flytekit:py3.8-latest", dev=False) install.python_packages(name = ["pandas"]) install.apt_packages(name = ["git"]) - install.python(version="3.8") runtime.environ(env={'PYTHONPATH': '/root', '_F_IMG_ID': 'flytekit:yZ8jICcDTLoDArmNHbWNwg..'}) + install.python(version="3.8") """ ) diff --git a/plugins/flytekit-spark/Dockerfile b/plugins/flytekit-spark/Dockerfile new file mode 100644 index 0000000000..0789df45b7 --- /dev/null +++ b/plugins/flytekit-spark/Dockerfile @@ -0,0 +1,14 @@ +# https://github.com/apache/spark/blob/master/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile +FROM apache/spark-py:3.3.1 +LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit + +USER 0 +RUN ln -s /usr/bin/python3 /usr/bin/python + +ARG VERSION +RUN pip install flytekitplugins-spark==$VERSION +RUN pip install flytekit==$VERSION + +RUN chown -R ${spark_uid}:${spark_uid} /root +WORKDIR /root +USER ${spark_uid} diff --git a/plugins/flytekit-spark/flytekitplugins/spark/task.py b/plugins/flytekit-spark/flytekitplugins/spark/task.py index 7b32e9f28b..564e55778f 100644 --- a/plugins/flytekit-spark/flytekitplugins/spark/task.py +++ b/plugins/flytekit-spark/flytekitplugins/spark/task.py @@ -1,15 +1,15 @@ import os -import typing from dataclasses import dataclass -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, Optional, Union, cast from google.protobuf.json_format import MessageToDict from pyspark.sql import SparkSession from flytekit import FlyteContextManager, PythonFunctionTask -from flytekit.configuration import SerializationSettings +from flytekit.configuration import DefaultImages, SerializationSettings from flytekit.core.context_manager import ExecutionParameters from flytekit.extend import ExecutionState, TaskPlugins +from flytekit.image_spec import ImageSpec from .models import SparkJob, SparkType @@ -48,7 +48,7 @@ class Databricks(Spark): databricks_instance: Domain name of your deployment. Use the form .cloud.databricks.com. """ - databricks_conf: typing.Optional[Dict[str, typing.Union[str, dict]]] = None + databricks_conf: Optional[Dict[str, Union[str, dict]]] = None databricks_token: Optional[str] = None databricks_instance: Optional[str] = None @@ -56,7 +56,7 @@ class Databricks(Spark): # This method does not reset the SparkSession since it's a bit hard to handle multiple # Spark sessions in a single application as it's described in: # https://stackoverflow.com/questions/41491972/how-can-i-tear-down-a-sparksession-and-create-a-new-one-within-one-application. -def new_spark_session(name: str, conf: typing.Dict[str, str] = None): +def new_spark_session(name: str, conf: Dict[str, str] = None): """ Optionally creates a new spark session and returns it. In cluster mode (running in hosted flyte, this will disregard the spark conf passed in) @@ -99,26 +99,43 @@ class PysparkFunctionTask(PythonFunctionTask[Spark]): _SPARK_TASK_TYPE = "spark" - def __init__(self, task_config: Spark, task_function: Callable, **kwargs): + def __init__( + self, + task_config: Spark, + task_function: Callable, + container_image: Optional[Union[str, ImageSpec]] = None, + **kwargs, + ): + self.sess: Optional[SparkSession] = None + self._default_executor_path: Optional[str] = None + self._default_applications_path: Optional[str] = None + + if isinstance(container_image, ImageSpec): + if container_image.base_image is None: + img = f"cr.flyte.org/flyteorg/flytekit:spark-{DefaultImages.get_version_suffix()}" + container_image.base_image = img + # default executor path and applications path in apache/spark-py:3.3.1 + self._default_executor_path = "/usr/bin/python3" + self._default_applications_path = "local:///usr/local/bin/entrypoint.py" super(PysparkFunctionTask, self).__init__( task_config=task_config, task_type=self._SPARK_TASK_TYPE, task_function=task_function, + container_image=container_image, **kwargs, ) - self.sess: Optional[SparkSession] = None def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: job = SparkJob( spark_conf=self.task_config.spark_conf, hadoop_conf=self.task_config.hadoop_conf, - application_file="local://" + settings.entrypoint_settings.path, - executor_path=settings.python_interpreter, + application_file=self._default_applications_path or "local://" + settings.entrypoint_settings.path, + executor_path=self._default_executor_path or settings.python_interpreter, main_class="", spark_type=SparkType.PYTHON, ) if isinstance(self.task_config, Databricks): - cfg = typing.cast(Databricks, self.task_config) + cfg = cast(Databricks, self.task_config) job._databricks_conf = cfg.databricks_conf job._databricks_token = cfg.databricks_token job._databricks_instance = cfg.databricks_instance diff --git a/tests/flytekit/unit/configuration/test_image_config.py b/tests/flytekit/unit/configuration/test_image_config.py index 84c767f8fb..c14832df3c 100644 --- a/tests/flytekit/unit/configuration/test_image_config.py +++ b/tests/flytekit/unit/configuration/test_image_config.py @@ -60,3 +60,7 @@ def test_image_create(): ic = ImageConfig.from_images("cr.flyte.org/im/g:latest") assert ic.default_image.fqn == "cr.flyte.org/im/g" + + +def test_get_version_suffix(): + assert DefaultImages.get_version_suffix() == "latest"