Skip to content

Commit

Permalink
Add node pool label (#327)
Browse files Browse the repository at this point in the history
This PR adds a configurable node pool label in the pipeline
specification. This is needed because the default label name differs per
cloud provider.
  • Loading branch information
shayorshay authored Aug 2, 2023
1 parent 7fef744 commit 30533bc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
6 changes: 5 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def build_pipeline():
"batch_size": 2,
"max_new_tokens": 50,
},
number_of_gpus=1,
number_of_gpus=1,
node_pool_label="node_pool",
node_pool_name="model-inference-pool",
)
pipeline.add_op(caption_images_op, dependencies=load_from_hub_op)
Expand All @@ -46,6 +47,9 @@ Next, we define two operations: `load_from_hub_op`, which is a based from a reus
!!! note "IMPORTANT"
Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.

## Setting Custom node pool parameters
Each component can optionally be constrained to run on particular node(s) using `node_pool_label` and `node_pool_name`. You can find these under the Kubernetes labels of your cluster. You can use the default node label provided by Kubernetes or attach your own. Note that the value of these labels is cloud provider specific.

## Setting Custom partitioning parameters

When working with Fondant, each component deals with datasets, and Dask is used internally
Expand Down
40 changes: 31 additions & 9 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ComponentOp:
output_partition_size: the size of the output written dataset. Defaults to 250MB,
set to "disable" to disable automatic repartitioning of the output,
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths,
values are Kubernetes volumes or inherited types(e.g. PipelineVolumes).
Expand All @@ -47,12 +48,12 @@ class ComponentOp:
Note:
- A Fondant Component operation is created by defining a Fondant Component and its input
arguments.
- The `number_of_gpus`, `node_pool_name`,`p_volumes` and `ephemeral_storage_size`
attributes are optional and can be used to specify additional configurations for
the operation. More information on the optional attributes that can be assigned to
kfp components here:
https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html
arguments.
- The `number_of_gpus`, `node_pool_label`, `node_pool_name`,`p_volumes` and
`ephemeral_storage_size` attributes are optional and can be used to specify additional
configurations for the operation. More information on the optional attributes that can
be assigned to kfp components here:
https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html
"""

COMPONENT_SPEC_NAME = "fondant_component.yaml"
Expand All @@ -65,6 +66,7 @@ def __init__(
input_partition_rows: t.Optional[t.Union[str, int]] = None,
output_partition_size: t.Optional[str] = None,
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None,
ephemeral_storage_size: t.Optional[str] = None,
Expand All @@ -80,7 +82,10 @@ def __init__(
self.arguments.setdefault("component_spec", self.component_spec.specification)

self.number_of_gpus = number_of_gpus
self.node_pool_name = node_pool_name
self.node_pool_label, self.node_pool_name = self._validate_node_pool_spec(
node_pool_label,
node_pool_name,
)
self.p_volumes = p_volumes
self.ephemeral_storage_size = ephemeral_storage_size

Expand All @@ -101,6 +106,19 @@ def _set_arguments(

return arguments

def _validate_node_pool_spec(
self,
node_pool_label,
node_pool_name,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Validate node pool specification."""
if bool(node_pool_label) != bool(node_pool_name):
msg = "Both node_pool_label and node_pool_name must be specified or both must be None."
raise InvalidPipelineDefinition(
msg,
)
return node_pool_label, node_pool_name

@property
def dockerfile_path(self) -> t.Optional[Path]:
path = self.component_dir / "Dockerfile"
Expand All @@ -115,6 +133,7 @@ def from_registry(
input_partition_rows: t.Optional[t.Union[int, str]] = None,
output_partition_size: t.Optional[str] = None,
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None,
ephemeral_storage_size: t.Optional[str] = None,
Expand All @@ -129,6 +148,7 @@ def from_registry(
output_partition_size: the size of the output written dataset. Defaults to 250MB,
set to "disable" to disable automatic repartitioning of the output,
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount
paths, values are Kubernetes volumes or inherited types(e.g. PipelineVolumes).
Expand All @@ -148,6 +168,7 @@ def from_registry(
input_partition_rows=input_partition_rows,
output_partition_size=output_partition_size,
number_of_gpus=number_of_gpus,
node_pool_label=node_pool_label,
node_pool_name=node_pool_name,
p_volumes=p_volumes,
ephemeral_storage_size=ephemeral_storage_size,
Expand Down Expand Up @@ -356,15 +377,16 @@ def _get_component_function(
def _set_task_configuration(task, fondant_component_operation):
# Unpack optional specifications
number_of_gpus = fondant_component_operation.number_of_gpus
node_pool_label = fondant_component_operation.node_pool_label
node_pool_name = fondant_component_operation.node_pool_name
p_volumes = fondant_component_operation.p_volumes
ephemeral_storage_size = fondant_component_operation.ephemeral_storage_size

# Assign optional specification
if number_of_gpus is not None:
task.set_gpu_limit(number_of_gpus)
if node_pool_name is not None:
task.add_node_selector_constraint("node_pool", node_pool_name)
if node_pool_label is not None and node_pool_name is not None:
task.add_node_selector_constraint(node_pool_label, node_pool_name)
if p_volumes is not None:
task.add_pvolumes(p_volumes)
if ephemeral_storage_size is not None:
Expand Down
14 changes: 14 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ def test_component_op(
output_partition_size="250 MB",
)

with pytest.raises(InvalidPipelineDefinition):
ComponentOp(
Path(components_path / component_names[0]),
arguments=component_args,
node_pool_label="dummy_label",
)

with pytest.raises(InvalidPipelineDefinition):
ComponentOp(
Path(components_path / component_names[0]),
arguments=component_args,
node_pool_name="dummy_name",
)


@pytest.mark.parametrize(
"valid_pipeline_example",
Expand Down

0 comments on commit 30533bc

Please sign in to comment.