Skip to content

Commit

Permalink
Add option to setup preemptible VMs (#408)
Browse files Browse the repository at this point in the history
PR that adds an option to set a preemptible VM on the component spec
level.
This can be done by setting `preemptible` to True at the `Op` level.
Note that you also need to assign a preemptible nodepool. I have already
setup some preemptible nodepools in [this
PR](https://github.com/ml6team/Express-infra/compare/add-preemtibles-node-pools?expand=1)

Caveat: seems like kfp currently only enables setting up preemptibles on
a GCP cluster since this functionaliteis seems to be tightly integrated
with GKE. More info [here](
Preemptible nodepools can only run up to 24 hours
[link](https://cloud.google.com/compute/docs/instances/preemptible#:~:text=Compute%20Engine%20always%20stops%20preemptible%20instances%20after%20they%20run%20for%2024%20hours.%20Certain%20actions%20reset%20this%2024%2Dhour%20counter.)

We can think later on incorporating a retry mechanism once the issue
with Dask is fixed. For now those VMs can be used to reduce development
costs.
  • Loading branch information
PhilippeMoussalli authored Sep 13, 2023
1 parent c23189b commit 7ad42aa
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 2 deletions.
11 changes: 10 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ Next, we define two operations: `load_from_hub_op`, which is a based from a reus
Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.

## Setting Custom node pool parameters
Each component can optionally be constrained to run on particular node(s) using `node_pool_label` and `node_pool_name`. You can find these under the Kubernetes labels of your cluster. You can use the default node label provided by Kubernetes or attach your own. Note that the value of these labels is cloud provider specific.
Each component can optionally be constrained to run on particular node(s) using `node_pool_label` and `node_pool_name`. You can find these under the Kubernetes labels of your cluster.
You can use the default node label provided by Kubernetes or attach your own. Note that the value of these labels is cloud provider specific.

Note that you can also setup a component to use a preemptible VM by setting `preemptible` to `True`.
This Requires the setup and assignment of a preemptible node pool. Note that preemptibles only work
when KFP is setup on GCP.

More info here: https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/



## Setting Custom partitioning parameters

Expand Down
11 changes: 11 additions & 0 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ def _resolve_imports(self):
"""Resolve imports for the Kubeflow compiler."""
try:
import kfp
import kfp.gcp as kfp_gcp

self.kfp = kfp
self.kfp_gcp = kfp_gcp
except ImportError:
msg = """You need to install kfp to use the Kubeflow compiler,\n
you can install it with `pip install fondant[kfp]`"""
Expand Down Expand Up @@ -307,11 +309,20 @@ def _set_configuration(self, task, fondant_component_operation):
number_of_gpus = fondant_component_operation.number_of_gpus
node_pool_label = fondant_component_operation.node_pool_label
node_pool_name = fondant_component_operation.node_pool_name
preemptible = fondant_component_operation.preemptible

# Assign optional specification
if number_of_gpus is not None:
task.set_gpu_limit(number_of_gpus)
if node_pool_name is not None and node_pool_label is not None:
task.add_node_selector_constraint(node_pool_label, node_pool_name)
if preemptible is True:
logger.warning(
f"Preemptible VM enabled on component `{fondant_component_operation.name}`. Please"
f" note that Preemptible nodepools only works on clusters setup on GCP and "
f"with nodepools pre-configured with preemptible VMs. More info here:"
f" https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/",
)
task.apply(self.kfp_gcp.use_preemptible_nodepool())

return task
13 changes: 13 additions & 0 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class ComponentOp:
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
preemptible: False by default. Set to True to use a preemptible VM.
Requires the setup and assignment of a preemptible node pool. Note that preemptibles only
work when KFP is setup on GCP. More info here:
https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/
Note:
- A Fondant Component operation is created by defining a Fondant Component and its input
Expand All @@ -58,6 +62,7 @@ def __init__(
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
cache: t.Optional[bool] = True,
preemptible: t.Optional[bool] = False,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
Expand All @@ -74,6 +79,7 @@ def __init__(
node_pool_label,
node_pool_name,
)
self.preemptible = preemptible

def _configure_caching_from_image_tag(
self,
Expand Down Expand Up @@ -152,6 +158,7 @@ def from_registry(
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
cache: t.Optional[bool] = True,
preemptible: t.Optional[bool] = False,
) -> "ComponentOp":
"""Load a reusable component by its name.
Expand All @@ -164,6 +171,11 @@ def from_registry(
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
preemptible: False by default. Set to True to use a preemptible VM.
Requires the setup and assignment of a preemptible node pool. Note that preemptibles
only work when KFP is setup on GCP. More info here:
https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/
"""
components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")

Expand All @@ -179,6 +191,7 @@ def from_registry(
node_pool_label=node_pool_label,
node_pool_name=node_pool_name,
cache=cache,
preemptible=preemptible,
)

def get_component_cache_key(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ spec:
entrypoint: test-pipeline
serviceAccountName: pipeline-runner
templates:
- container:
- affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- preference:
matchExpressions:
- key: cloud.google.com/gke-preemptible
operator: In
values:
- 'true'
weight: 50
container:
args: []
command:
- fondant
Expand Down Expand Up @@ -91,6 +101,11 @@ spec:
artifacts:
- name: first-component-output_manifest_path
path: /tmp/outputs/output_manifest_path/data
tolerations:
- effect: NoSchedule
key: preemptible
operator: Equal
value: 'true'
- container:
args: []
command:
Expand Down
1 change: 1 addition & 0 deletions tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
arguments={"storage_args": "a dummy string arg"},
input_partition_rows="disable",
number_of_gpus=1,
preemptible=True,
),
"cache_key": "1",
},
Expand Down

0 comments on commit 7ad42aa

Please sign in to comment.