Skip to content

Commit

Permalink
Add default spark image (#1616)
Browse files Browse the repository at this point in the history
* Add default spark image

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

* fix tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Fix tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* nit

Signed-off-by: Kevin Su <pingsutw@apache.org>

---------

Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw authored and eapolinario committed May 16, 2023
1 parent b2fb4bf commit d778232
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 20 deletions.
42 changes: 42 additions & 0 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,45 @@ jobs:
file: ./plugins/flytekit-sqlalchemy/Dockerfile
cache-from: type=gha
cache-to: type=gha,mode=max

build-and-push-spark-images:
runs-on: ubuntu-latest
needs: deploy
steps:
- uses: actions/checkout@v2
with:
fetch-depth: "0"
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1
- name: Login to GitHub Container Registry
if: ${{ github.event_name == 'release' }}
uses: docker/login-action@v1
with:
registry: ghcr.io
username: "${{ secrets.FLYTE_BOT_USERNAME }}"
password: "${{ secrets.FLYTE_BOT_PAT }}"
- name: Prepare Spark Image Names
id: spark-names
uses: docker/metadata-action@v3
with:
images: |
ghcr.io/${{ github.repository_owner }}/flytekit
tags: |
spark-latest
spark-${{ github.sha }}
spark-${{ needs.deploy.outputs.version }}
- name: Push Spark Image to GitHub Registry
uses: docker/build-push-action@v2
with:
context: "./plugins/flytekit-spark/"
platforms: linux/arm64, linux/amd64
push: ${{ github.event_name == 'release' }}
tags: ${{ steps.spark-names.outputs.tags }}
build-args: |
VERSION=${{ needs.deploy.outputs.version }}
file: ./plugins/flytekit-spark/Dockerfile
cache-from: type=gha
cache-to: type=gha,mode=max
15 changes: 10 additions & 5 deletions flytekit/configuration/default_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@ def default_image(cls) -> str:
def find_image_for(
cls, python_version: typing.Optional[PythonVersion] = None, flytekit_version: typing.Optional[str] = None
) -> str:
if python_version is None:
python_version = PythonVersion((sys.version_info.major, sys.version_info.minor))

return cls._DEFAULT_IMAGE_PREFIXES[python_version] + (
flytekit_version.replace("v", "") if flytekit_version else cls.get_version_suffix()
)

@classmethod
def get_version_suffix(cls) -> str:
from flytekit import __version__

if not __version__ or __version__ == "0.0.0+develop":
version_suffix = "latest"
else:
version_suffix = __version__
if python_version is None:
python_version = PythonVersion((sys.version_info.major, sys.version_info.minor))
return cls._DEFAULT_IMAGE_PREFIXES[python_version] + (
flytekit_version.replace("v", "") if flytekit_version else version_suffix
)
return version_suffix
8 changes: 5 additions & 3 deletions flytekit/image_spec/image_spec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import base64
import hashlib
import os
import sys
import typing
from abc import abstractmethod
from copy import copy
Expand All @@ -27,7 +26,7 @@ class ImageSpec:
Args:
name: name of the image.
python_version: python version of the image.
python_version: python version of the image. Use default python in the base image if None.
builder: Type of plugin to build the image. Use envd by default.
source_root: source root of the image.
env: environment variables of the image.
Expand All @@ -38,7 +37,7 @@ class ImageSpec:
"""

name: str = "flytekit"
python_version: str = f"{sys.version_info.major}.{sys.version_info.minor}"
python_version: str = None # Use default python in the base image if None.
builder: str = "envd"
source_root: Optional[str] = None
env: Optional[typing.Dict[str, str]] = None
Expand Down Expand Up @@ -94,6 +93,9 @@ def exist(self) -> bool:
if response.status_code == 200:
return True

if response.status_code == 404:
return False

click.secho(f"Failed to check if the image exists with error : {e}", fg="red")
click.secho("Flytekit assumes that the image already exists.", fg="blue")
return True
Expand Down
6 changes: 5 additions & 1 deletion plugins/flytekit-envd/flytekitplugins/envd/image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,20 @@ def build():
base(image="{base_image}", dev=False)
install.python_packages(name = [{', '.join(map(str, map(lambda x: f'"{x}"', packages)))}])
install.apt_packages(name = [{', '.join(map(str, map(lambda x: f'"{x}"', apt_packages)))}])
install.python(version="{image_spec.python_version}")
runtime.environ(env={env})
"""

if image_spec.python_version:
# Indentation is required by envd
envd_config += f' install.python(version="{image_spec.python_version}")\n'

ctx = context_manager.FlyteContextManager.current_context()
cfg_path = ctx.file_access.get_random_local_path("build.envd")
pathlib.Path(cfg_path).parent.mkdir(parents=True, exist_ok=True)

if image_spec.source_root:
shutil.copytree(image_spec.source_root, pathlib.Path(cfg_path).parent, dirs_exist_ok=True)
# Indentation is required by envd
envd_config += ' io.copy(host_path="./", envd_path="/root")'

with open(cfg_path, "w+") as f:
Expand Down
2 changes: 1 addition & 1 deletion plugins/flytekit-envd/tests/test_image_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def build():
base(image="cr.flyte.org/flyteorg/flytekit:py3.8-latest", dev=False)
install.python_packages(name = ["pandas"])
install.apt_packages(name = ["git"])
install.python(version="3.8")
runtime.environ(env={'PYTHONPATH': '/root', '_F_IMG_ID': 'flytekit:yZ8jICcDTLoDArmNHbWNwg..'})
install.python(version="3.8")
"""
)
14 changes: 14 additions & 0 deletions plugins/flytekit-spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# https://github.com/apache/spark/blob/master/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
FROM apache/spark-py:3.3.1
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

USER 0
RUN ln -s /usr/bin/python3 /usr/bin/python

ARG VERSION
RUN pip install flytekitplugins-spark==$VERSION
RUN pip install flytekit==$VERSION

RUN chown -R ${spark_uid}:${spark_uid} /root
WORKDIR /root
USER ${spark_uid}
37 changes: 27 additions & 10 deletions plugins/flytekit-spark/flytekitplugins/spark/task.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import os
import typing
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, Dict, Optional, Union, cast

from google.protobuf.json_format import MessageToDict
from pyspark.sql import SparkSession

from flytekit import FlyteContextManager, PythonFunctionTask
from flytekit.configuration import SerializationSettings
from flytekit.configuration import DefaultImages, SerializationSettings
from flytekit.core.context_manager import ExecutionParameters
from flytekit.extend import ExecutionState, TaskPlugins
from flytekit.image_spec import ImageSpec

from .models import SparkJob, SparkType

Expand Down Expand Up @@ -48,15 +48,15 @@ class Databricks(Spark):
databricks_instance: Domain name of your deployment. Use the form <account>.cloud.databricks.com.
"""

databricks_conf: typing.Optional[Dict[str, typing.Union[str, dict]]] = None
databricks_conf: Optional[Dict[str, Union[str, dict]]] = None
databricks_token: Optional[str] = None
databricks_instance: Optional[str] = None


# This method does not reset the SparkSession since it's a bit hard to handle multiple
# Spark sessions in a single application as it's described in:
# https://stackoverflow.com/questions/41491972/how-can-i-tear-down-a-sparksession-and-create-a-new-one-within-one-application.
def new_spark_session(name: str, conf: typing.Dict[str, str] = None):
def new_spark_session(name: str, conf: Dict[str, str] = None):
"""
Optionally creates a new spark session and returns it.
In cluster mode (running in hosted flyte, this will disregard the spark conf passed in)
Expand Down Expand Up @@ -99,26 +99,43 @@ class PysparkFunctionTask(PythonFunctionTask[Spark]):

_SPARK_TASK_TYPE = "spark"

def __init__(self, task_config: Spark, task_function: Callable, **kwargs):
def __init__(
self,
task_config: Spark,
task_function: Callable,
container_image: Optional[Union[str, ImageSpec]] = None,
**kwargs,
):
self.sess: Optional[SparkSession] = None
self._default_executor_path: Optional[str] = None
self._default_applications_path: Optional[str] = None

if isinstance(container_image, ImageSpec):
if container_image.base_image is None:
img = f"cr.flyte.org/flyteorg/flytekit:spark-{DefaultImages.get_version_suffix()}"
container_image.base_image = img
# default executor path and applications path in apache/spark-py:3.3.1
self._default_executor_path = "/usr/bin/python3"
self._default_applications_path = "local:///usr/local/bin/entrypoint.py"
super(PysparkFunctionTask, self).__init__(
task_config=task_config,
task_type=self._SPARK_TASK_TYPE,
task_function=task_function,
container_image=container_image,
**kwargs,
)
self.sess: Optional[SparkSession] = None

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
job = SparkJob(
spark_conf=self.task_config.spark_conf,
hadoop_conf=self.task_config.hadoop_conf,
application_file="local://" + settings.entrypoint_settings.path,
executor_path=settings.python_interpreter,
application_file=self._default_applications_path or "local://" + settings.entrypoint_settings.path,
executor_path=self._default_executor_path or settings.python_interpreter,
main_class="",
spark_type=SparkType.PYTHON,
)
if isinstance(self.task_config, Databricks):
cfg = typing.cast(Databricks, self.task_config)
cfg = cast(Databricks, self.task_config)
job._databricks_conf = cfg.databricks_conf
job._databricks_token = cfg.databricks_token
job._databricks_instance = cfg.databricks_instance
Expand Down
4 changes: 4 additions & 0 deletions tests/flytekit/unit/configuration/test_image_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ def test_image_create():

ic = ImageConfig.from_images("cr.flyte.org/im/g:latest")
assert ic.default_image.fqn == "cr.flyte.org/im/g"


def test_get_version_suffix():
assert DefaultImages.get_version_suffix() == "latest"

0 comments on commit d778232

Please sign in to comment.