Skip to content

Commit

Permalink
fix(sdk): Relax the requirement that component inputs/outputs must ap…
Browse files Browse the repository at this point in the history
…pear on the command line. (#6268)

* Relax the requirement that component inputs/outputs must appear
on the command line.

This change enables component builders to define their implementation
(i.e. the container arg and command) independent of the actual
component inputs/outputs defined in component.yaml. The latter will be
used when determining inputs/outputs during compilation.

Only works for tasks that turn into ContainerOps during compilation.
Naturally, does not work for ContainerOps (which have no inputs/outputs
anyway).

* fix bugs, update goldens.

* Fix issue with inputs.

* merge and update.

* Restore compiler yamls.

* Disambiguate parameters and artifacts in component bridge.

* Restore goldens from HEAD.

* restore compiler_tests.py to HEAD.

* str-ify pipeline params.

* cr comments.
  • Loading branch information
neuromage authored Aug 10, 2021
1 parent 1feb9a0 commit 911562f
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 146 deletions.
31 changes: 1 addition & 30 deletions sdk/python/kfp/components/_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,35 +529,6 @@ def _func_to_component_spec_v2(
from kfp.components._structures import ExecutorInputPlaceholder
component_spec = _extract_component_interface(func)

component_inputs = component_spec.inputs or []
component_outputs = component_spec.outputs or []

outputs_passed_using_func_parameters = [
output for output in component_outputs
if output._passing_style is not None
]
arguments = []
for input in component_inputs + outputs_passed_using_func_parameters:
flag = "--{}-output-path".format(input.name.replace("_", "-"))

if input._passing_style in [InputPath, io_types.InputAnnotation]:
arguments_for_input = [flag, InputPathPlaceholder(input.name)]
elif input._passing_style in [OutputPath, io_types.OutputAnnotation]:
arguments_for_input = [flag, OutputPathPlaceholder(input.name)]
else:
arguments_for_input = [flag, InputValuePlaceholder(input.name)]

arguments.extend(arguments_for_input)

# Add output placeholders for return values from func.
func_outputs = [
output for output in component_outputs
if output._passing_style is None
]
for output in func_outputs:
flag = "--" + output.name.replace("_", "-")
arguments.extend([flag, OutputPathPlaceholder(output.name)])

component_spec.implementation=ContainerImplementation(
container=ContainerSpec(
image=base_image,
Expand All @@ -578,7 +549,7 @@ def _func_to_component_spec_v2(
"--executor_input",
ExecutorInputPlaceholder(),
"--function_to_execute", func.__name__,
] + arguments,
]
)
)
return component_spec
Expand Down
21 changes: 20 additions & 1 deletion sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,32 @@ def _create_container_op_from_component_and_arguments(
_container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value

component_meta = copy.copy(component_spec)
task._set_metadata(component_meta)
task._set_metadata(component_meta, original_arguments)
if component_ref:
component_ref_without_spec = copy.copy(component_ref)
component_ref_without_spec.spec = None
task._component_ref = component_ref_without_spec

task._parameter_arguments = resolved_cmd.inputs_consumed_by_value
name_to_spec_type = {}
if component_meta.inputs:
name_to_spec_type = {
input.name: input.type
for input in component_meta.inputs
}
if kfp.COMPILING_FOR_V2:
for name, spec_type in name_to_spec_type.items():
if (name in original_arguments and
type_utils.is_parameter_type(spec_type)):
task._parameter_arguments[name] = str(original_arguments[name])

for name in list(task.artifact_arguments.keys()):
if name in task._parameter_arguments:
del task.artifact_arguments[name]

for name in list(task.input_artifact_paths.keys()):
if name in task._parameter_arguments:
del task.input_artifact_paths[name]

# Previously, ContainerOp had strict requirements for the output names, so we
# had to convert all the names before passing them to the ContainerOp
Expand Down
53 changes: 43 additions & 10 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import inspect
import re
import warnings
from typing import Any, Dict, Iterable, List, TypeVar, Union, Callable, Optional, Sequence
Expand All @@ -26,7 +24,7 @@
V1Lifecycle, V1Volume)

import kfp
from kfp.components import _structures
from kfp.components import _components, _structures
from kfp.dsl import _pipeline_param
from kfp.dsl import dsl_utils
from kfp.pipeline_spec import pipeline_spec_pb2
Expand Down Expand Up @@ -396,7 +394,7 @@ def set_gpu_limit(self, gpu: Union[str, _pipeline_param.PipelineParam], vendor:
ignored in v2.
"""

if not isinstance(gpu,_pipeline_param.PipelineParam) or not isinstance(gpu,_pipeline_param.PipelineParam):
if not isinstance(gpu,_pipeline_param.PipelineParam) or not isinstance(gpu,_pipeline_param.PipelineParam):
self._validate_positive_number(gpu, 'gpu')

if self._container_spec:
Expand Down Expand Up @@ -832,6 +830,11 @@ def __init__(self,
# used to mark this op with loop arguments
self.loop_args = None

# Placeholder for inputs when adding ComponentSpec metadata to this
# ContainerOp. This holds inputs defined in ComponentSpec that have
# a corresponding PipelineParam.
self._component_spec_inputs_with_pipeline_params = []

# attributes specific to `BaseOp`
self._inputs = []
self.dependent_names = []
Expand All @@ -848,7 +851,7 @@ def inputs(self):
# called the 1st time (because there are in-place updates to `PipelineParam`
# during compilation - remove in-place updates for easier debugging?)
if not self._inputs:
self._inputs = []
self._inputs = self._component_spec_inputs_with_pipeline_params or []
# TODO replace with proper k8s obj?
for key in self.attrs_with_pipelineparams:
self._inputs += _pipeline_param.extract_pipelineparams_from_any(
Expand Down Expand Up @@ -1280,6 +1283,12 @@ def _decorated(*args, **kwargs):
for name in file_outputs.keys()
}

self._set_single_output_attribute()

self.pvolumes = {}
self.add_pvolumes(pvolumes)

def _set_single_output_attribute(self):
# Syntactic sugar: Add task.output attribute if the component has a single
# output.
# TODO: Currently the "MLPipeline UI Metadata" output is removed from
Expand All @@ -1291,9 +1300,6 @@ def _decorated(*args, **kwargs):
else:
self.output = _MultipleOutputsError()

self.pvolumes = {}
self.add_pvolumes(pvolumes)

@property
def is_v2(self):
return self._is_v2
Expand Down Expand Up @@ -1351,17 +1357,42 @@ def immediate_value_pipeline():
"""
return self._container

def _set_metadata(self, metadata):
def _set_metadata(self, metadata, arguments: Optional[Dict[str, Any]] = None):
"""Passes the ContainerOp the metadata information and configures the right output.
Args:
metadata (ComponentSpec): component metadata
arguments: Dictionary of input arguments to the component.
"""
if not isinstance(metadata, _structures.ComponentSpec):
raise ValueError('_set_metadata is expecting ComponentSpec.')

self._metadata = metadata

if self._metadata.outputs:
declared_outputs = {
output.name: _pipeline_param.PipelineParam(
output.name, op_name=self.name)
for output in self._metadata.outputs
}
self.outputs.update(declared_outputs)

for output in self._metadata.outputs:
if output.name not in self.file_outputs:
output_filename = _components._generate_output_file_name(output.name)
self.file_outputs[output.name] = output_filename

if arguments is not None:
for input_name, value in arguments.items():
self.artifact_arguments[input_name] = str(value)
if (isinstance(value, _pipeline_param.PipelineParam)):
self._component_spec_inputs_with_pipeline_params.append(value)

if input_name not in self.input_artifact_paths:
input_artifact_path = _components._generate_input_file_name(
input_name)
self.input_artifact_paths[input_name] = input_artifact_path

if self.file_outputs:
for output in self.file_outputs.keys():
output_type = self.outputs[output].param_type
Expand All @@ -1370,6 +1401,8 @@ def _set_metadata(self, metadata):
output_type = output_meta.type
self.outputs[output].param_type = output_type

self._set_single_output_attribute()

def add_pvolumes(self, pvolumes: Dict[str, V1Volume] = None):
"""Updates the existing pvolumes dict, extends volumes and volume_mounts and redefines the pvolume attribute.
Expand Down Expand Up @@ -1411,7 +1444,7 @@ def add_node_selector_constraint(self, label_name: Union[str, _pipeline_param.P
Returns:
self return to allow chained call with other resource specification.
"""
if self.container_spec and not(isinstance(label_name, _pipeline_param.PipelineParam) or isinstance(value, _pipeline_param.PipelineParam)):
if self.container_spec and not(isinstance(label_name, _pipeline_param.PipelineParam) or isinstance(value, _pipeline_param.PipelineParam)):
accelerator_cnt = 1
if self.container_spec.resources.accelerator.count > 1:
# Reserve the number if already set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"preprocess",
"--message-output-path",
"{{$.inputs.parameters['message']}}",
"--output-dataset-one-output-path",
"{{$.outputs.artifacts['output_dataset_one'].path}}",
"--output-dataset-two-output-path",
"{{$.outputs.artifacts['output_dataset_two'].path}}",
"--output-parameter-output-path",
"{{$.outputs.parameters['output_parameter'].output_file}}",
"--output-bool-parameter-output-path",
"{{$.outputs.parameters['output_bool_parameter'].output_file}}",
"--output-dict-parameter-output-path",
"{{$.outputs.parameters['output_dict_parameter'].output_file}}",
"--output-list-parameter-output-path",
"{{$.outputs.parameters['output_list_parameter'].output_file}}"
"preprocess"
],
"command": [
"sh",
Expand All @@ -125,23 +111,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"train",
"--dataset-one-output-path",
"{{$.inputs.artifacts['dataset_one'].path}}",
"--dataset-two-output-path",
"{{$.inputs.artifacts['dataset_two'].path}}",
"--message-output-path",
"{{$.inputs.parameters['message']}}",
"--input-bool-output-path",
"{{$.inputs.parameters['input_bool']}}",
"--input-dict-output-path",
"{{$.inputs.parameters['input_dict']}}",
"--input-list-output-path",
"{{$.inputs.parameters['input_list']}}",
"--num-steps-output-path",
"{{$.inputs.parameters['num_steps']}}",
"--model-output-path",
"{{$.outputs.artifacts['model'].path}}"
"train"
],
"command": [
"sh",
Expand Down Expand Up @@ -260,4 +230,4 @@
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"add_numbers",
"--first-output-path",
"{{$.inputs.parameters['first']}}",
"--second-output-path",
"{{$.inputs.parameters['second']}}",
"--Output",
"{{$.outputs.parameters['Output'].output_file}}"
"add_numbers"
],
"command": [
"sh",
Expand All @@ -129,13 +123,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"concat_message",
"--first-output-path",
"{{$.inputs.parameters['first']}}",
"--second-output-path",
"{{$.inputs.parameters['second']}}",
"--Output",
"{{$.outputs.parameters['Output'].output_file}}"
"concat_message"
],
"command": [
"sh",
Expand All @@ -155,13 +143,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"output_artifact",
"--number-output-path",
"{{$.inputs.parameters['number']}}",
"--message-output-path",
"{{$.inputs.parameters['message']}}",
"--Output",
"{{$.outputs.artifacts['Output'].path}}"
"output_artifact"
],
"command": [
"sh",
Expand All @@ -181,15 +163,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"output_named_tuple",
"--artifact-output-path",
"{{$.inputs.artifacts['artifact'].path}}",
"--scalar",
"{{$.outputs.parameters['scalar'].output_file}}",
"--metrics",
"{{$.outputs.artifacts['metrics'].path}}",
"--model",
"{{$.outputs.artifacts['model'].path}}"
"output_named_tuple"
],
"command": [
"sh",
Expand Down Expand Up @@ -354,4 +328,4 @@
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"training_op",
"--input1-output-path",
"{{$.inputs.parameters['input1']}}"
"training_op"
],
"command": [
"sh",
Expand All @@ -62,9 +60,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"training_op",
"--input1-output-path",
"{{$.inputs.parameters['input1']}}"
"training_op"
],
"command": [
"sh",
Expand Down Expand Up @@ -174,4 +170,4 @@
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
}
Loading

0 comments on commit 911562f

Please sign in to comment.