Skip to content

Commit

Permalink
[ML][Pipelines] Fix validation bug for init/finalize (Azure#27466)
Browse files Browse the repository at this point in the history
* fix: validate init/finalize when _data is str

* test: add test for bug fix logic

* test: move spark test back to dsl_pipeline class
  • Loading branch information
zhengfeiwang authored Nov 14, 2022
1 parent e426989 commit c8ebd81
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,21 +329,26 @@ def _validate_init_finalize_job(self) -> MutableValidationResult:
validation_result.append_error(yaml_path="jobs", message="No other job except for on_init/on_finalize job.")

def _is_isolated_job(_validate_job_name: str) -> bool:
def _try_get_data_binding(_input_output_data) -> Union[str, None]:
"""Try to get data binding from input/output data, return None if not found."""
if isinstance(_input_output_data, str):
return _input_output_data
if not hasattr(_input_output_data, "_data_binding"):
return None
return _input_output_data._data_binding()

_validate_job = self.jobs[_validate_job_name]
# no input to validate job
for _input_name in _validate_job.inputs:
if not hasattr(_validate_job.inputs[_input_name]._data, "_data_binding"):
continue
_data_binding = _validate_job.inputs[_input_name]._data._data_binding()
if is_data_binding_expression(_data_binding, ["parent", "jobs"]):
_data_binding = _try_get_data_binding(_validate_job.inputs[_input_name]._data)
if _data_binding is not None and is_data_binding_expression(_data_binding, ["parent", "jobs"]):
return False
# no output from validate job
for _job_name, _job in self.jobs.items():
for _input_name in _job.inputs:
if not hasattr(_job.inputs[_input_name]._data, "_data_binding"):
continue
_data_binding = _job.inputs[_input_name]._data._data_binding()
if is_data_binding_expression(_data_binding, ["parent", "jobs", _validate_job_name]):
_data_binding = _try_get_data_binding(_job.inputs[_input_name]._data)
if _data_binding is not None \
and is_data_binding_expression(_data_binding, ["parent", "jobs", _validate_job_name]):
return False
return True

Expand Down
110 changes: 109 additions & 1 deletion sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from azure.ai.ml.constants._common import (
AZUREML_PRIVATE_FEATURES_ENV_VAR,
AZUREML_RESOURCE_PROVIDER,
InputOutputModes,
NAMED_RESOURCE_ID_FORMAT,
VERSIONED_RESOURCE_ID_FORMAT,
AssetTypes,
Expand All @@ -26,7 +27,7 @@
JobResourceConfiguration,
PipelineJob,
)
from azure.ai.ml.entities._builders import Command
from azure.ai.ml.entities._builders import Command, Spark
from azure.ai.ml.entities._job.pipeline._io import PipelineInput
from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function
from azure.ai.ml.exceptions import UserErrorException, ValidationException, ParamValueNotExistsError
Expand Down Expand Up @@ -2023,3 +2024,110 @@ def pipeline_func_consume_invalid_component():
with pytest.raises(UserErrorException) as e:
pipeline_func_consume_invalid_component()
assert str(e.value) == "Exactly one output is expected for condition node, 0 outputs found."

def test_dsl_pipeline_with_spark_hobo(self) -> None:
add_greeting_column_func = load_component(
"./tests/test_configs/dsl_pipeline/spark_job_in_pipeline/add_greeting_column_component.yml"
)
count_by_row_func = load_component(
"./tests/test_configs/dsl_pipeline/spark_job_in_pipeline/count_by_row_component.yml"
)

@dsl.pipeline(description="submit a pipeline with spark job")
def spark_pipeline_from_yaml(iris_data):
add_greeting_column = add_greeting_column_func(file_input=iris_data)
add_greeting_column.resources = {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"}
count_by_row = count_by_row_func(file_input=iris_data)
count_by_row.resources = {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"}
count_by_row.identity = {"type": "managed"}

return {"output": count_by_row.outputs.output}

dsl_pipeline: PipelineJob = spark_pipeline_from_yaml(
iris_data=Input(
path="https://azuremlexamples.blob.core.windows.net/datasets/iris.csv",
type=AssetTypes.URI_FILE,
mode=InputOutputModes.DIRECT,
),
)
dsl_pipeline.outputs.output.mode = "Direct"

spark_node = dsl_pipeline.jobs["add_greeting_column"]
job_data_path_input = spark_node.inputs["file_input"]._meta
assert job_data_path_input
# spark_node.component._id = "azureml:test_component:1"
spark_node_dict = spark_node._to_dict()

spark_node_rest_obj = spark_node._to_rest_object()
regenerated_spark_node = Spark._from_rest_object(spark_node_rest_obj)

spark_node_dict_from_rest = regenerated_spark_node._to_dict()
omit_fields = []
assert pydash.omit(spark_node_dict, *omit_fields) == pydash.omit(spark_node_dict_from_rest, *omit_fields)
omit_fields = [
"jobs.add_greeting_column.componentId",
"jobs.count_by_row.componentId",
"jobs.add_greeting_column.properties",
"jobs.count_by_row.properties",
]
actual_job = pydash.omit(dsl_pipeline._to_rest_object().properties.as_dict(), *omit_fields)
assert actual_job == {
"description": "submit a pipeline with spark job",
"properties": {},
"tags": {},
"display_name": "spark_pipeline_from_yaml",
"is_archived": False,
"job_type": "Pipeline",
"inputs": {
"iris_data": {
"mode": "Direct",
"uri": "https://azuremlexamples.blob.core.windows.net/datasets/iris.csv",
"job_input_type": "uri_file",
}
},
"jobs": {
"add_greeting_column": {
"type": "spark",
"resources": {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"},
"entry": {"file": "add_greeting_column.py", "spark_job_entry_type": "SparkJobPythonEntry"},
"py_files": ["utils.zip"],
"files": ["my_files.txt"],
"identity": {"identity_type": "UserIdentity"},
"conf": {
"spark.driver.cores": 2,
"spark.driver.memory": "1g",
"spark.executor.cores": 1,
"spark.executor.memory": "1g",
"spark.executor.instances": 1,
},
"args": "--file_input ${{inputs.file_input}}",
"name": "add_greeting_column",
"inputs": {
"file_input": {"job_input_type": "literal", "value": "${{parent.inputs.iris_data}}"},
},
"_source": "YAML.COMPONENT",
},
"count_by_row": {
"_source": "YAML.COMPONENT",
"args": "--file_input ${{inputs.file_input}} " "--output ${{outputs.output}}",
"conf": {
"spark.driver.cores": 2,
"spark.driver.memory": "1g",
"spark.executor.cores": 1,
"spark.executor.instances": 1,
"spark.executor.memory": "1g",
},
"entry": {"file": "count_by_row.py", "spark_job_entry_type": "SparkJobPythonEntry"},
"files": ["my_files.txt"],
"identity": {"identity_type": "Managed"},
"inputs": {"file_input": {"job_input_type": "literal", "value": "${{parent.inputs.iris_data}}"}},
"jars": ["scalaproj.jar"],
"name": "count_by_row",
"outputs": {"output": {"type": "literal", "value": "${{parent.outputs.output}}"}},
"resources": {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"},
"type": "spark",
},
},
"outputs": {"output": {"job_output_type": "uri_folder", "mode": "Direct"}},
"settings": {"_source": "DSL"},
}
137 changes: 21 additions & 116 deletions sdk/ml/azure-ai-ml/tests/dsl/unittests/test_init_finalize_job.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from functools import partial
from pathlib import Path

import pydash
import pytest
from azure.ai.ml import Input, dsl, load_component
from azure.ai.ml.constants._common import (
AssetTypes,
InputOutputModes,
)
from azure.ai.ml import dsl, load_component, load_job
from azure.ai.ml.entities import PipelineJob
from azure.ai.ml.entities._builders import Spark

from .._util import _DSL_TIMEOUT_SECOND

Expand All @@ -28,7 +22,16 @@ class TestInitFinalizeJob:
)
hello_world_func = load_component(str(components_dir / "helloworld_component.yml"))

def test_init_finalize_job(self) -> None:
def test_init_finalize_job_from_yaml(self) -> None:
pipeline_job = load_job("./tests/test_configs/pipeline_jobs/pipeline_job_init_finalize.yaml")
assert pipeline_job._validate_init_finalize_job().passed
assert pipeline_job.settings.on_init == "a"
assert pipeline_job.settings.on_finalize == "c"
pipeline_job_dict = pipeline_job._to_rest_object().as_dict()
assert pipeline_job_dict["properties"]["settings"]["on_init"] == "a"
assert pipeline_job_dict["properties"]["settings"]["on_finalize"] == "c"

def test_init_finalize_job_from_sdk(self) -> None:
from azure.ai.ml._internal.dsl import set_pipeline_settings
from azure.ai.ml.dsl import pipeline

Expand Down Expand Up @@ -80,7 +83,16 @@ def in_decorator_func():
pipeline3 = in_decorator_func()
assert_pipeline_job_init_finalize_job(pipeline3)

def test_invalid_init_finalize_job(self) -> None:
def test_invalid_init_finalize_job_from_yaml(self) -> None:
pipeline_job = load_job("./tests/test_configs/pipeline_jobs/pipeline_job_init_finalize_invalid.yaml")
validation_result = pipeline_job._validate_init_finalize_job()
assert not validation_result.passed
assert (
validation_result.error_messages["settings.on_finalize"]
== "On_finalize job should not have connection to other execution node."
)

def test_invalid_init_finalize_job_from_sdk(self) -> None:
# invalid case: job name not exists
@dsl.pipeline()
def invalid_init_finalize_job_func():
Expand Down Expand Up @@ -164,110 +176,3 @@ def subgraph_init_finalize_job_func():
assert valid_pipeline._validate().passed
assert valid_pipeline.settings.on_init == "init_job"
assert valid_pipeline.settings.on_finalize == "finalize_job"

def test_dsl_pipeline_with_spark_hobo(self) -> None:
add_greeting_column_func = load_component(
"./tests/test_configs/dsl_pipeline/spark_job_in_pipeline/add_greeting_column_component.yml"
)
count_by_row_func = load_component(
"./tests/test_configs/dsl_pipeline/spark_job_in_pipeline/count_by_row_component.yml"
)

@dsl.pipeline(description="submit a pipeline with spark job")
def spark_pipeline_from_yaml(iris_data):
add_greeting_column = add_greeting_column_func(file_input=iris_data)
add_greeting_column.resources = {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"}
count_by_row = count_by_row_func(file_input=iris_data)
count_by_row.resources = {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"}
count_by_row.identity = {"type": "managed"}

return {"output": count_by_row.outputs.output}

dsl_pipeline: PipelineJob = spark_pipeline_from_yaml(
iris_data=Input(
path="https://azuremlexamples.blob.core.windows.net/datasets/iris.csv",
type=AssetTypes.URI_FILE,
mode=InputOutputModes.DIRECT,
),
)
dsl_pipeline.outputs.output.mode = "Direct"

spark_node = dsl_pipeline.jobs["add_greeting_column"]
job_data_path_input = spark_node.inputs["file_input"]._meta
assert job_data_path_input
# spark_node.component._id = "azureml:test_component:1"
spark_node_dict = spark_node._to_dict()

spark_node_rest_obj = spark_node._to_rest_object()
regenerated_spark_node = Spark._from_rest_object(spark_node_rest_obj)

spark_node_dict_from_rest = regenerated_spark_node._to_dict()
omit_fields = []
assert pydash.omit(spark_node_dict, *omit_fields) == pydash.omit(spark_node_dict_from_rest, *omit_fields)
omit_fields = [
"jobs.add_greeting_column.componentId",
"jobs.count_by_row.componentId",
"jobs.add_greeting_column.properties",
"jobs.count_by_row.properties",
]
actual_job = pydash.omit(dsl_pipeline._to_rest_object().properties.as_dict(), *omit_fields)
assert actual_job == {
"description": "submit a pipeline with spark job",
"properties": {},
"tags": {},
"display_name": "spark_pipeline_from_yaml",
"is_archived": False,
"job_type": "Pipeline",
"inputs": {
"iris_data": {
"mode": "Direct",
"uri": "https://azuremlexamples.blob.core.windows.net/datasets/iris.csv",
"job_input_type": "uri_file",
}
},
"jobs": {
"add_greeting_column": {
"type": "spark",
"resources": {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"},
"entry": {"file": "add_greeting_column.py", "spark_job_entry_type": "SparkJobPythonEntry"},
"py_files": ["utils.zip"],
"files": ["my_files.txt"],
"identity": {"identity_type": "UserIdentity"},
"conf": {
"spark.driver.cores": 2,
"spark.driver.memory": "1g",
"spark.executor.cores": 1,
"spark.executor.memory": "1g",
"spark.executor.instances": 1,
},
"args": "--file_input ${{inputs.file_input}}",
"name": "add_greeting_column",
"inputs": {
"file_input": {"job_input_type": "literal", "value": "${{parent.inputs.iris_data}}"},
},
"_source": "YAML.COMPONENT",
},
"count_by_row": {
"_source": "YAML.COMPONENT",
"args": "--file_input ${{inputs.file_input}} " "--output ${{outputs.output}}",
"conf": {
"spark.driver.cores": 2,
"spark.driver.memory": "1g",
"spark.executor.cores": 1,
"spark.executor.instances": 1,
"spark.executor.memory": "1g",
},
"entry": {"file": "count_by_row.py", "spark_job_entry_type": "SparkJobPythonEntry"},
"files": ["my_files.txt"],
"identity": {"identity_type": "Managed"},
"inputs": {"file_input": {"job_input_type": "literal", "value": "${{parent.inputs.iris_data}}"}},
"jars": ["scalaproj.jar"],
"name": "count_by_row",
"outputs": {"output": {"type": "literal", "value": "${{parent.outputs.output}}"}},
"resources": {"instance_type": "Standard_E8S_V3", "runtime_version": "3.1.0"},
"type": "spark",
},
},
"outputs": {"output": {"job_output_type": "uri_folder", "mode": "Direct"}},
"settings": {"_source": "DSL"},
}
Original file line number Diff line number Diff line change
Expand Up @@ -1321,14 +1321,6 @@ def test_pipeline_job_with_parameter_group(self):
},
}

def test_pipeline_with_init_finalize(self) -> None:
pipeline_job = load_job("./tests/test_configs/pipeline_jobs/pipeline_job_init_finalize.yaml")
assert pipeline_job.settings.on_init == "a"
assert pipeline_job.settings.on_finalize == "c"
pipeline_job_dict = pipeline_job._to_rest_object().as_dict()
assert pipeline_job_dict["properties"]["settings"]["on_init"] == "a"
assert pipeline_job_dict["properties"]["settings"]["on_finalize"] == "c"

def test_non_string_pipeline_node_input(self):
test_path = "./tests/test_configs/pipeline_jobs/rest_non_string_input_pipeline.json"
with open(test_path, "r") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
outputs:
world_output:
c:
command: echo ${{inputs.world_input}}/world.txt
command: echo hello ${{inputs.hello_string}}
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
inputs:
world_input: ${{parent.jobs.b.outputs.world_output}}
hello_string: ${{parent.inputs.hello_string_top_level_input}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: hello_pipeline_abc

inputs:
hello_string_top_level_input: "hello world"

settings:
continue_on_step_failure: True
default_compute: azureml:cpu-cluster
default_datastore: azureml:workspacefilestore
on_init: a
on_finalize: c

jobs:
a:
command: echo hello ${{inputs.hello_string}}
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
inputs:
hello_string: ${{parent.inputs.hello_string_top_level_input}}
b:
command: echo "world" >> ${{outputs.world_output}}/world.txt
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
outputs:
world_output:
c:
command: echo ${{inputs.world_input}}/world.txt
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
inputs:
world_input: ${{parent.jobs.b.outputs.world_output}}

0 comments on commit c8ebd81

Please sign in to comment.