Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Parallel Run Step partition by keys #26844

Merged
merged 28 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, **kwargs):
self._max_concurrency_per_instance = kwargs.pop("max_concurrency_per_instance", None)
self._error_threshold = kwargs.pop("error_threshold", None)
self._mini_batch_size = kwargs.pop("mini_batch_size", None)
self._partition_keys = kwargs.pop("partition_keys", None)
self._logging_level = kwargs.pop("logging_level", None)
self._retry_settings = kwargs.pop("retry_settings", BatchRetrySettings())
self._init = False
Expand Down Expand Up @@ -60,6 +61,15 @@ def mini_batch_size(self) -> int:
def mini_batch_size(self, value: int):
self._mini_batch_size = value

@property
def partition_keys(self) -> List:
"""The keys used to partition dataset into mini-batches."""
return self._partition_keys

@partition_keys.setter
def partition_keys(self, value: List):
self._partition_keys = value

@property
def logging_level(self) -> str:
"""A string of the logging level name"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class ParallelComponentSchema(ComponentSchema):
mini_batch_size = fields.Str(
bupt-wenxiaole marked this conversation as resolved.
Show resolved Hide resolved
metadata={"description": "The The batch size of current job."},
)
partition_keys = fields.List(
fields.Str(),
bupt-wenxiaole marked this conversation as resolved.
Show resolved Hide resolved
metadata={"description": "The keys used to partition input data into mini-batches"}
)

input_data = fields.Str()
retry_settings = NestedField(RetrySettingsSchema, unknown=INCLUDE)
max_concurrency_per_instance = fields.Integer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class ParameterizedParallelSchema(PathAwareSchema):
mini_batch_size = fields.Str(
metadata={"description": "The batch size of current job."},
)
partition_keys = fields.List(
fields.Str(),
metadata={"description": "The keys used to partition input data into mini-batches"}
)
input_data = fields.Str()
resources = NestedField(JobResourceConfigurationSchema)
retry_settings = NestedField(RetrySettingsSchema, unknown=INCLUDE)
Expand Down
18 changes: 17 additions & 1 deletion sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import copy
import logging
import re
import json
from enum import Enum
from typing import Dict, List, Union

Expand Down Expand Up @@ -69,6 +70,12 @@ class Parallel(BaseNode):
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches.
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, the partition keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: List
:param input_data: The input data.
:type input_data: str
:param inputs: Inputs of the component/job.
Expand Down Expand Up @@ -104,6 +111,7 @@ def __init__(
mini_batch_error_threshold: int = None,
input_data: str = None,
task: Dict[str, Union[ParallelTask, str]] = None,
partition_keys: List = None,
mini_batch_size: int = None,
resources: JobResourceConfiguration = None,
environment_variables: Dict = None,
Expand Down Expand Up @@ -147,6 +155,7 @@ def __init__(
raise ValueError("mini_batch_size unit must be kb, mb or gb")

self.mini_batch_size = mini_batch_size
self.partition_keys = partition_keys
self.input_data = input_data
self._retry_settings = retry_settings
self.logging_level = logging_level
Expand All @@ -166,6 +175,8 @@ def __init__(
self.mini_batch_error_threshold or self.component.mini_batch_error_threshold
)
self.mini_batch_size = self.mini_batch_size or self.component.mini_batch_size
self.partition_keys = self.partition_keys or self.component.partition_keys

if not self.task:
self.task = self.component.task
# task.code is based on self.component.base_path
Expand Down Expand Up @@ -270,6 +281,7 @@ def _to_job(self) -> ParallelJob:
properties=self.properties,
compute=self.compute,
resources=self.resources,
partition_keys=self.partition_keys,
mini_batch_size=self.mini_batch_size,
task=self.task,
retry_settings=self.retry_settings,
Expand Down Expand Up @@ -318,6 +330,8 @@ def _to_rest_object(self, **kwargs) -> dict:
retry_settings=get_rest_dict_for_node_attrs(self.retry_settings),
logging_level=self.logging_level,
mini_batch_size=self.mini_batch_size,
partition_keys=json.dumps(self.partition_keys)
if self.partition_keys is not None else self.partition_keys,
resources=get_rest_dict_for_node_attrs(self.resources),
)
)
Expand Down Expand Up @@ -353,7 +367,8 @@ def _from_rest_object(cls, obj: dict) -> "Parallel":
# distribution, sweep won't have distribution
if "distribution" in obj and obj["distribution"]:
obj["distribution"] = DistributionConfiguration._from_rest_object(obj["distribution"])

if "partition_keys" in obj and obj["partition_keys"]:
obj["partition_keys"] = json.dumps(obj["partition_keys"])
return Parallel(**obj)

def _build_inputs(self):
Expand Down Expand Up @@ -392,6 +407,7 @@ def __call__(self, *args, **kwargs) -> "Parallel":
node.tags = self.tags
node.display_name = self.display_name
node.mini_batch_size = self.mini_batch_size
node.partition_keys = self.partition_keys
node.logging_level = self.logging_level
node.max_concurrency_per_instance = self.max_concurrency_per_instance
node.error_threshold = self.error_threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os
from typing import Dict, Union
from typing import Dict, Union, List

from azure.ai.ml._restclient.v2022_02_01_preview.models import AmlToken, ManagedIdentity
from azure.ai.ml.constants._component import ComponentSource
Expand Down Expand Up @@ -31,6 +31,7 @@ def parallel_run_function(
mini_batch_error_threshold: int = None,
task: RunFunction = None,
mini_batch_size: str = None,
partition_keys: List = None,
input_data: str = None,
inputs: Dict = None,
outputs: Dict = None,
Expand Down Expand Up @@ -136,6 +137,12 @@ def parallel_run_function(
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter.
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches.
bupt-wenxiaole marked this conversation as resolved.
Show resolved Hide resolved
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, the partition keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: List
:param input_data: The input data.
:type input_data: str
:param inputs: a dict of inputs used by this parallel.
Expand Down Expand Up @@ -190,6 +197,7 @@ def parallel_run_function(
mini_batch_error_threshold=mini_batch_error_threshold,
task=task,
mini_batch_size=mini_batch_size,
partition_keys=partition_keys,
input_data=input_data,
_source=ComponentSource.BUILDER,
is_deterministic=is_deterministic,
Expand All @@ -216,6 +224,7 @@ def parallel_run_function(
mini_batch_error_threshold=mini_batch_error_threshold,
task=task,
mini_batch_size=mini_batch_size,
partition_keys=partition_keys,
input_data=input_data,
**kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
if distribution:
distribution = DistributionConfiguration._from_rest_object(distribution)

if _type == "parallel":
import json
if "partition_keys" in rest_component_version.component_spec:
bupt-wenxiaole marked this conversation as resolved.
Show resolved Hide resolved
rest_component_version.component_spec["partition_keys"] \
= json.loads(rest_component_version.component_spec["partition_keys"])
# Note: we need to refine the logic here if more specific type logic here.
jobs = rest_component_version.component_spec.pop("jobs", None)
if _type == NodeType.PIPELINE and jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import os
import re
from typing import Any, Dict, Union
from typing import Any, Dict, Union, List

from marshmallow import Schema

Expand All @@ -16,9 +16,10 @@
from azure.ai.ml.entities._job.parallel.parameterized_parallel import ParameterizedParallel
from azure.ai.ml.entities._job.parallel.retry_settings import RetrySettings
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationException
from azure.ai.ml._restclient.v2022_05_01.models import ComponentVersionData

from ..._schema import PathAwareSchema
from .._util import convert_ordered_dict_to_dict, validate_attribute_type
from .._util import validate_attribute_type
from .component import Component


Expand Down Expand Up @@ -53,6 +54,12 @@ class ParallelComponent(Component, ParameterizedParallel): # pylint: disable=to
(optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set
through PipelineParameter.
:type mini_batch_size: str
:param partition_keys: The keys used to partition dataset into mini-batches.
If specified, the data with the same key will be partitioned into the same mini-batch.
If both partition_keys and mini_batch_size are specified, partition_keys will take effect.
The input(s) must be partitioned dataset(s),
and the partition_keys must be a subset of the keys of every input dataset for this to work.
:type partition_keys: list
:param input_data: The input data.
:type input_data: str
:param resources: Compute Resource configuration for the component.
Expand Down Expand Up @@ -86,6 +93,7 @@ def __init__(
mini_batch_error_threshold: int = None,
task: ParallelTask = None,
mini_batch_size: str = None,
partition_keys: List = None,
bupt-wenxiaole marked this conversation as resolved.
Show resolved Hide resolved
input_data: str = None,
resources: JobResourceConfiguration = None,
inputs: Dict = None,
Expand Down Expand Up @@ -116,6 +124,7 @@ def __init__(
# and fill in later with job defaults.
self.task = task
self.mini_batch_size = mini_batch_size
self.partition_keys = partition_keys
self.input_data = input_data
self.retry_settings = retry_settings
self.logging_level = logging_level
Expand Down Expand Up @@ -225,9 +234,13 @@ def _attr_type_map(cls) -> dict:
"resources": (dict, JobResourceConfiguration),
}

def _to_dict(self) -> Dict:
"""Dump the parallel component content into a dictionary."""
return convert_ordered_dict_to_dict({**self._other_parameter, **super(ParallelComponent, self)._to_dict()})
def _to_rest_object(self) -> ComponentVersionData:
rest_object = super()._to_rest_object()
if self.partition_keys:
import json
bupt-wenxiaole marked this conversation as resolved.
Show resolved Hide resolved
rest_object.properties.component_spec["partition_keys"]= \
json.dumps(self.partition_keys)
return rest_object

@classmethod
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ParallelJob(Job, ParameterizedParallel, JobIOMixin):
:type task: ParallelTask
:param mini_batch_size: The mini batch size.
:type mini_batch_size: str
:param partition_keys: The partition keys.
:type partition_keys: list
:param input_data: The input data.
:type input_data: str
:param inputs: Inputs of the job.
Expand Down Expand Up @@ -107,6 +109,7 @@ def _to_component(self, context: Dict = None, **kwargs):
return ParallelComponent(
base_path=context[BASE_PATH_CONTEXT_KEY],
mini_batch_size=self.mini_batch_size,
partition_keys=self.partition_keys,
input_data=self.input_data,
task=self.task,
retry_settings=self.retry_settings,
Expand Down Expand Up @@ -137,6 +140,7 @@ def _to_node(self, context: Dict = None, **kwargs):
inputs=self.inputs,
outputs=self.outputs,
mini_batch_size=self.mini_batch_size,
partition_keys=self.partition_keys,
input_data=self.input_data,
# task will be inherited from component & base_path will be set correctly.
retry_settings=self.retry_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# ---------------------------------------------------------

import logging
from typing import Dict, Union
from typing import Dict, Union, List

from ..job_resource_configuration import JobResourceConfiguration
from .parallel_task import ParallelTask
Expand Down Expand Up @@ -47,10 +47,12 @@ def __init__(
input_data: str = None,
task: ParallelTask = None,
mini_batch_size: int = None,
partition_keys: List = None,
resources: Union[dict, JobResourceConfiguration] = None,
environment_variables: Dict = None,
):
self.mini_batch_size = mini_batch_size
self.partition_keys = partition_keys
self.task = task
self.retry_settings = retry_settings
self.input_data = input_data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_parallel_component_version_as_a_function_with_inputs(self):
"error_threshold": None,
"logging_level": None,
"max_concurrency_per_instance": None,
"partition_keys": None,
"mini_batch_error_threshold": None,
"mini_batch_size": 10485760,
"retry_settings": None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,30 @@ def test_serialize_deserialize_basic(self, mock_machinelearning_client: MLClient

assert component_entity.code
assert component_entity.code == f"{str(Path('./tests/test_configs/python').resolve())}:1"

def test_serialize_deserialize_partition_keys(self, mock_machinelearning_client: MLClient):
test_path = "./tests/test_configs/components/parallel_component_with_partition_keys.yml"
component_entity = load_component_entity_from_yaml(test_path, mock_machinelearning_client)
rest_path = "./tests/test_configs/components/parallel_component_with_partition_keys_rest.json"
target_entity = load_component_entity_from_rest_json(rest_path)

# skip check code and environment
component_dict = component_entity._to_dict()
assert component_dict["id"]
component_dict = pydash.omit(
dict(component_dict),
"task.code",
"id",
)
expected_dict = pydash.omit(
dict(target_entity._to_dict()),
"task.code",
"creation_context",
"id",
)

assert component_dict == expected_dict
assert component_dict["partition_keys"] == ["foo", "bar"]

assert component_entity.code
assert component_entity.code == f"{str(Path('./tests/test_configs/python').resolve())}:1"
3 changes: 3 additions & 0 deletions sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,7 @@ def parallel_in_pipeline(job_data_path):
},
"name": "node1",
"mini_batch_size": 5,
"partition_keys": None,
"retry_settings": None,
"logging_level": "DEBUG",
"max_concurrency_per_instance": 1,
Expand Down Expand Up @@ -1945,6 +1946,7 @@ def parallel_in_pipeline(job_data_path):
},
"outputs": {},
"mini_batch_size": 1,
"partition_keys": None,
"task": {
"type": "run_function",
"entry_script": "score.py",
Expand Down Expand Up @@ -1992,6 +1994,7 @@ def parallel_in_pipeline(job_data_path):
},
"outputs": {"job_output_path": {"value": "${{parent.outputs.job_out_data}}", "type": "literal"}},
"mini_batch_size": 1,
"partition_keys": None,
"task": {
"type": "run_function",
"entry_script": "score.py",
Expand Down
Loading