Skip to content

Commit

Permalink
Add dask plugin #patch (#1366)
Browse files Browse the repository at this point in the history
* Add dummy task type to test backend plugin

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add docs page

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add dask models

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add function to convert resources

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add tests to `dask` task

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Remove namespace

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update setup.py

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add dask to `plugin/README.md`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Add README.md for `dask`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Top level export of `JopPodSpec` and `DaskCluster`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update docs for images

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update README.md

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update models after `flyteidl` change

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update task after `flyteidl` change

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Raise error when less than 1 worker

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update flyteidl to >= 1.3.2

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update doc requirements

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Update doc-requirements.txt

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Re-lock dependencies on linux

Signed-off-by: Bernhard Stadlbauer <bernhard@pachama.com>

* Update dask API docs

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Fix documentation links

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Default optional model constructor arguments to `None`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Refactor `convert_resources_to_resource_model` to `core.resources`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Use `convert_resources_to_resource_model` in `core.node`

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>

* Incorporate review feedback

Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>

* Lint

Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>

Signed-off-by: Bernhard Stadlbauer <b.stadlbauer@gmx.net>
Signed-off-by: Bernhard Stadlbauer <bernhard@pachama.com>
Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
Co-authored-by: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com>
Co-authored-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
  • Loading branch information
3 people committed Jan 12, 2023
1 parent 581a5c6 commit 531db3c
Show file tree
Hide file tree
Showing 19 changed files with 906 additions and 26 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
Expand Down
1 change: 1 addition & 0 deletions doc-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ whylogs # whylogs
whylabs-client # whylogs
ray # ray
scikit-learn # scikit-learn
dask[distributed] # dask
vaex # vaex
mlflow # mlflow
12 changes: 12 additions & 0 deletions docs/source/plugins/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _dask:

###################################################
Dask API reference
###################################################

.. tags:: Integration, DistributedComputing, KubernetesOperator

.. automodule:: flytekitplugins.dask
:no-members:
:no-inherited-members:
:no-special-members:
2 changes: 2 additions & 0 deletions docs/source/plugins/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Plugin API reference
* :ref:`AWS Sagemaker <awssagemaker>` - AWS Sagemaker plugin reference
* :ref:`Google Bigquery <bigquery>` - Google Bigquery plugin reference
* :ref:`FS Spec <fsspec>` - FS Spec API reference
* :ref:`Dask <dask>` - Dask standard API reference
* :ref:`Deck standard <deck>` - Deck standard API reference
* :ref:`Dolt standard <dolt>` - Dolt standard API reference
* :ref:`Great expectations <greatexpectations>` - Great expectations API reference
Expand Down Expand Up @@ -40,6 +41,7 @@ Plugin API reference
AWS Sagemaker <awssagemaker>
Google Bigquery <bigquery>
FS Spec <fsspec>
Dask <dask>
Deck standard <deck>
Dolt standard <dolt>
Great expectations <greatexpectations>
Expand Down
16 changes: 10 additions & 6 deletions flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typing
from typing import Any, List

from flytekit.core.resources import Resources
from flytekit.core.resources import Resources, convert_resources_to_resource_model
from flytekit.core.utils import _dnsify
from flytekit.models import literals as _literal_models
from flytekit.models.core import workflow as _workflow_model
Expand Down Expand Up @@ -92,9 +92,14 @@ def with_overrides(self, *args, **kwargs):
for k, v in alias_dict.items():
self._aliases.append(_workflow_model.Alias(var=k, alias=v))
if "requests" in kwargs or "limits" in kwargs:
requests = _convert_resource_overrides(kwargs.get("requests"), "requests")
limits = _convert_resource_overrides(kwargs.get("limits"), "limits")
self._resources = _resources_model(requests=requests, limits=limits)
requests = kwargs.get("requests")
if requests and not isinstance(requests, Resources):
raise AssertionError("requests should be specified as flytekit.Resources")
limits = kwargs.get("limits")
if limits and not isinstance(limits, Resources):
raise AssertionError("limits should be specified as flytekit.Resources")

self._resources = convert_resources_to_resource_model(requests=requests, limits=limits)
if "timeout" in kwargs:
timeout = kwargs["timeout"]
if timeout is None:
Expand Down Expand Up @@ -122,8 +127,7 @@ def _convert_resource_overrides(
) -> [_resources_model.ResourceEntry]:
if resources is None:
return []
if not isinstance(resources, Resources):
raise AssertionError(f"{resource_name} should be specified as flytekit.Resources")

resource_entries = []
if resources.cpu is not None:
resource_entries.append(_resources_model.ResourceEntry(_resources_model.ResourceName.CPU, resources.cpu))
Expand Down
43 changes: 42 additions & 1 deletion flytekit/core/resources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Optional
from typing import List, Optional

from flytekit.models import task as task_models


@dataclass
Expand Down Expand Up @@ -35,3 +37,42 @@ class Resources(object):
class ResourceSpec(object):
requests: Optional[Resources] = None
limits: Optional[Resources] = None


_ResouceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry


def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]:
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.CPU, value=resources.cpu))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.MEMORY, value=resources.mem))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.GPU, value=resources.gpu))
if resources.storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.STORAGE, value=resources.storage))
if resources.ephemeral_storage is not None:
resource_entries.append(_ResourceEntry(name=_ResouceName.EPHEMERAL_STORAGE, value=resources.ephemeral_storage))
return resource_entries


def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model
:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)
37 changes: 18 additions & 19 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Dict, List, Optional

from flytekit.loggers import logger
from flytekit.models import task as _task_models
from flytekit.models import task as task_models


def _dnsify(value: str) -> str:
Expand Down Expand Up @@ -52,7 +52,7 @@ def _get_container_definition(
image: str,
command: List[str],
args: List[str],
data_loading_config: Optional[_task_models.DataLoadingConfig] = None,
data_loading_config: Optional[task_models.DataLoadingConfig] = None,
storage_request: Optional[str] = None,
ephemeral_storage_request: Optional[str] = None,
cpu_request: Optional[str] = None,
Expand All @@ -64,7 +64,7 @@ def _get_container_definition(
gpu_limit: Optional[str] = None,
memory_limit: Optional[str] = None,
environment: Optional[Dict[str, str]] = None,
) -> _task_models.Container:
) -> task_models.Container:
storage_limit = storage_limit
storage_request = storage_request
ephemeral_storage_limit = ephemeral_storage_limit
Expand All @@ -76,50 +76,49 @@ def _get_container_definition(
memory_limit = memory_limit
memory_request = memory_request

# TODO: Use convert_resources_to_resource_model instead of manually fixing the resources.
requests = []
if storage_request:
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_request)
task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_request)
)
if ephemeral_storage_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_request
)
)
if cpu_request:
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_request))
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_request))
if gpu_request:
requests.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_request))
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_request))
if memory_request:
requests.append(
_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_request)
)
requests.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_request))

limits = []
if storage_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.STORAGE, storage_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.STORAGE, storage_limit))
if ephemeral_storage_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
task_models.Resources.ResourceEntry(
task_models.Resources.ResourceName.EPHEMERAL_STORAGE, ephemeral_storage_limit
)
)
if cpu_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.CPU, cpu_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.CPU, cpu_limit))
if gpu_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.GPU, gpu_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.GPU, gpu_limit))
if memory_limit:
limits.append(_task_models.Resources.ResourceEntry(_task_models.Resources.ResourceName.MEMORY, memory_limit))
limits.append(task_models.Resources.ResourceEntry(task_models.Resources.ResourceName.MEMORY, memory_limit))

if environment is None:
environment = {}

return _task_models.Container(
return task_models.Container(
image=image,
command=command,
args=args,
resources=_task_models.Resources(limits=limits, requests=requests),
resources=task_models.Resources(limits=limits, requests=requests),
env=environment,
config={},
data_loading_config=data_loading_config,
Expand Down
1 change: 1 addition & 0 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All the Flytekit plugins maintained by the core team are added here. It is not n
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| dask | ```bash pip install flytekitplugins-dask ``` | Installs SDK to author dask jobs that can be executed natively on Kubernetes using the Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-dask/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
Expand Down
21 changes: 21 additions & 0 deletions plugins/flytekit-dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Flytekit Dask Plugin

Flyte can execute `dask` jobs natively on a Kubernetes Cluster, which manages the virtual `dask` cluster's lifecycle
(spin-up and tear down). It leverages the open-source Kubernetes Dask Operator and can be enabled without signing up
for any service. This is like running a transient (ephemeral) `dask` cluster - a type of cluster spun up for a specific
task and torn down after completion. This helps in making sure that the Python environment is the same on the job-runner
(driver), scheduler and the workers.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-dask
```

To configure Dask in the Flyte deployment's backed, follow
[step 1](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_dask/index.html#step-1-deploy-the-dask-plugin-in-the-flyte-backend)
and
[step 2](https://docs.flyte.org/projects/cookbook/en/latest/auto/auto/integrations/kubernetes/k8s_dask/index.html#step-2-environment-setup)

An [example](https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_dask/index.html)
can be found in the documentation.
15 changes: 15 additions & 0 deletions plugins/flytekit-dask/flytekitplugins/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
.. currentmodule:: flytekitplugins.dask
This package contains the Python related side of the Dask Plugin
.. autosummary::
:template: custom.rst
:toctree: generated/
Dask
Scheduler
WorkerGroup
"""

from flytekitplugins.dask.task import Dask, Scheduler, WorkerGroup
Loading

0 comments on commit 531db3c

Please sign in to comment.