diff --git a/CHANGES.md b/CHANGES.md index f9ee5d289117..847532e85562 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -86,6 +86,8 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed MLTransform when duplicated elements are dropped in the output PCollection.([#29600](https://github.com/apache/beam/issues/29600)) + ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). @@ -134,6 +136,10 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * Fixed [CVE-2023-39325](https://www.cve.org/CVERecord?id=CVE-2023-39325) (Java/Python/Go) ([#29118](https://github.com/apache/beam/issues/29118)). * Mitigated [CVE-2023-47248](https://nvd.nist.gov/vuln/detail/CVE-2023-47248) (Python) [#29392](https://github.com/apache/beam/issues/29392). +## Known issues + +* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)). + # [2.51.0] - 2023-10-03 ## New Features / Improvements @@ -168,6 +174,8 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246). * Python pipelines using BigQuery Storage Read API might need to pin `fastavro` dependency to 1.8.3 or earlier on some runners that don't use Beam Docker containers: [#28811](https://github.com/apache/beam/issues/28811) +* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)). + # [2.50.0] - 2023-08-30 @@ -229,6 +237,7 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * Beam Python containers rely on a version of Debian/aom that has several security vulnerabilities: [CVE-2021-30474](https://nvd.nist.gov/vuln/detail/CVE-2021-30474), [CVE-2021-30475](https://nvd.nist.gov/vuln/detail/CVE-2021-30475), [CVE-2021-30473](https://nvd.nist.gov/vuln/detail/CVE-2021-30473), [CVE-2020-36133](https://nvd.nist.gov/vuln/detail/CVE-2020-36133), [CVE-2020-36131](https://nvd.nist.gov/vuln/detail/CVE-2020-36131), [CVE-2020-36130](https://nvd.nist.gov/vuln/detail/CVE-2020-36130), and [CVE-2020-36135](https://nvd.nist.gov/vuln/detail/CVE-2020-36135) * Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable. * Python SDK worker start-up logs, particularly PIP dependency installations, that are not logged at warning or higher are suppressed. This suppression is reverted in 2.51.0. +* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)). # [2.49.0] - 2023-07-17 diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py index 0db10718295b..261b480b1083 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py @@ -57,6 +57,7 @@ def check_mltransform_compute_and_apply_vocabulary_with_scalar(): Row(x=array([4])) Row(x=array([1])) Row(x=array([0])) +Row(x=array([0])) Row(x=array([2])) Row(x=array([3])) [END mltransform_compute_and_apply_vocabulary_with_scalar] '''.splitlines( diff --git a/sdks/python/apache_beam/ml/transforms/handlers.py b/sdks/python/apache_beam/ml/transforms/handlers.py index 8695d5146efa..e7d4f52ded85 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers.py +++ b/sdks/python/apache_beam/ml/transforms/handlers.py @@ -17,9 +17,9 @@ # pytype: skip-file import collections -import hashlib import os import typing +import uuid from typing import Dict from typing import List from typing import Optional @@ -49,6 +49,8 @@ 'TFTProcessHandler', ] +_ID_COLUMN = 'tmp_uuid' # Name for a temporary column. + RAW_DATA_METADATA_DIR = 'raw_data_metadata' SCHEMA_FILE = 'schema.pbtxt' # tensorflow transform doesn't support the types other than tf.int64, @@ -80,12 +82,12 @@ tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]] -class ConvertScalarValuesToListValues(beam.DoFn): +class _ConvertScalarValuesToListValues(beam.DoFn): def process( self, element, ): - hash_key, element = element + id, element = element new_dict = {} for key, value in element.items(): if isinstance(value, @@ -93,10 +95,10 @@ def process( new_dict[key] = [value] else: new_dict[key] = value - yield (hash_key, new_dict) + yield (id, new_dict) -class ConvertNamedTupleToDict( +class _ConvertNamedTupleToDict( beam.PTransform[beam.PCollection[typing.Union[beam.Row, typing.NamedTuple]], beam.PCollection[Dict[str, common_types.InstanceDictType]]]): @@ -121,76 +123,75 @@ def expand( return pcoll | beam.Map(lambda x: x._asdict()) -class ComputeAndAttachHashKey(beam.DoFn): +class _ComputeAndAttachUniqueID(beam.DoFn): """ - Computes and attaches a hash key to the element. - Only for internal use. No backwards compatibility guarantees. + Computes and attaches a unique id to each element in the PCollection. """ def process(self, element): - hash_object = hashlib.sha256() - for _, value in element.items(): - # handle the case where value is a list or numpy array - if isinstance(value, (list, np.ndarray)): - hash_object.update(str(list(value)).encode()) - else: # assume value is a primitive that can be turned into str - hash_object.update(str(value).encode()) - yield (hash_object.hexdigest(), element) + # UUID1 includes machine-specific bits and has a counter. As long as not too + # many are generated at the same time, they should be unique. + # UUID4 generation should be unique in practice as long as underlying random + # number generation is not compromised. + # A combintation of both should avoid the anecdotal pitfalls where + # replacing one with the other has helped some users. + # UUID collision will result in data loss, but we can detect that and fail. + + # TODO(https://github.com/apache/beam/issues/29593): Evaluate MLTransform + # implementation without CoGBK. + unique_key = uuid.uuid1().bytes + uuid.uuid4().bytes + yield (unique_key, element) -class GetMissingColumnsPColl(beam.DoFn): +class _GetMissingColumns(beam.DoFn): """ Returns data containing only the columns that are not present in the schema. This is needed since TFT only outputs columns that are transformed by any of the data processing transforms. - - Only for internal use. No backwards compatibility guarantees. """ def __init__(self, existing_columns): self.existing_columns = existing_columns def process(self, element): - new_dict = {} - hash_key, element = element - for key, value in element.items(): - if key not in self.existing_columns: - new_dict[key] = value - yield (hash_key, new_dict) + id, row_dict = element + new_dict = { + k: v + for k, v in row_dict.items() if k not in self.existing_columns + } + yield (id, new_dict) -class MakeHashKeyAsColumn(beam.DoFn): +class _MakeIdAsColumn(beam.DoFn): """ - Extracts the hash key from the element and adds it as a column. - - Only for internal use. No backwards compatibility guarantees. + Extracts the id from the element and adds it as a column instead. """ def process(self, element): - hash_key, element = element - element['hash_key'] = hash_key + id, element = element + element[_ID_COLUMN] = id yield element -class ExtractHashAndKeyPColl(beam.DoFn): +class _ExtractIdAndKeyPColl(beam.DoFn): """ - Extracts the hash key and return hashkey and element as a tuple. - - Only for internal use. No backwards compatibility guarantees. + Extracts the id and return id and element as a tuple. """ def process(self, element): - hashkey = element['hash_key'][0] - del element['hash_key'] - yield (hashkey.decode('utf-8'), element) + id = element[_ID_COLUMN][0] + del element[_ID_COLUMN] + yield (id, element) -class MergeDicts(beam.DoFn): +class _MergeDicts(beam.DoFn): """ - Merges the dictionaries in the PCollection. - - Only for internal use. No backwards compatibility guarantees. + Merges processed and unprocessed columns from CoGBK result into a single row. """ def process(self, element): - _, element = element + unused_row_id, row_dicts_tuple = element new_dict = {} - for d in element: + for d in row_dicts_tuple: + # After CoGBK, dicts with processed and unprocessed portions of each row + # are wrapped in 1-element lists, since all rows have a unique id. + # Assertion could fail due to UUID collision. + assert len(d) == 1, f"Expected 1 element, got: {len(d)}." new_dict.update(d[0]) yield new_dict @@ -323,7 +324,7 @@ def _get_raw_data_feature_spec_per_column( def get_raw_data_metadata( self, input_types: Dict[str, type]) -> dataset_metadata.DatasetMetadata: raw_data_feature_spec = self.get_raw_data_feature_spec(input_types) - raw_data_feature_spec['hash_key'] = tf.io.VarLenFeature(dtype=tf.string) + raw_data_feature_spec[_ID_COLUMN] = tf.io.VarLenFeature(dtype=tf.string) return self.convert_raw_data_feature_spec_to_dataset_metadata( raw_data_feature_spec) @@ -417,14 +418,14 @@ def process_data( # convert Row or NamedTuple to Dict raw_data = ( raw_data - | ConvertNamedTupleToDict().with_output_types( + | _ConvertNamedTupleToDict().with_output_types( Dict[str, typing.Union[tuple(column_type_mapping.values())]])) # type: ignore # AnalyzeAndTransformDataset raise type hint since this is # schema'd PCollection and the current output type would be a # custom type(NamedTuple) or a beam.Row type. else: column_type_mapping = self._map_column_names_to_types_from_transforms() - # Add hash key so TFT can output hash_key as output but as a no-op. + # Add id so TFT can output id as output but as a no-op. raw_data_metadata = self.get_raw_data_metadata( input_types=column_type_mapping) # Write untransformed metadata to a file so that it can be re-used @@ -445,21 +446,21 @@ def process_data( raw_data_metadata = metadata_io.read_metadata( os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR)) - keyed_raw_data = (raw_data | beam.ParDo(ComputeAndAttachHashKey())) + keyed_raw_data = (raw_data | beam.ParDo(_ComputeAndAttachUniqueID())) feature_set = [feature.name for feature in raw_data_metadata.schema.feature] - columns_not_in_schema_with_hash = ( + keyed_columns_not_in_schema = ( keyed_raw_data - | beam.ParDo(GetMissingColumnsPColl(feature_set))) + | beam.ParDo(_GetMissingColumns(feature_set))) # To maintain consistency by outputting numpy array all the time, # whether a scalar value or list or np array is passed as input, # we will convert scalar values to list values and TFT will ouput # numpy array all the time. keyed_raw_data = keyed_raw_data | beam.ParDo( - ConvertScalarValuesToListValues()) + _ConvertScalarValuesToListValues()) - raw_data_list = (keyed_raw_data | beam.ParDo(MakeHashKeyAsColumn())) + raw_data_list = (keyed_raw_data | beam.ParDo(_MakeIdAsColumn())) with tft_beam.Context(temp_dir=self.artifact_location): data = (raw_data_list, raw_data_metadata) @@ -467,7 +468,7 @@ def process_data( transform_fn = ( data | "AnalyzeDataset" >> tft_beam.AnalyzeDataset(self.process_data_fn)) - # TODO: Remove the 'hash_key' column from the transformed + # TODO: Remove the 'id' column from the transformed # dataset schema generated by TFT. self.write_transform_artifacts(transform_fn, self.artifact_location) else: @@ -490,7 +491,7 @@ def process_data( # So we will use a RowTypeConstraint to create a schema'd PCollection. # this is needed since new columns are included in the # transformed_dataset. - del self.transformed_schema['hash_key'] + del self.transformed_schema[_ID_COLUMN] row_type = RowTypeConstraint.from_fields( list(self.transformed_schema.items())) @@ -498,17 +499,17 @@ def process_data( # is not transformed by any of the transforms, then the output will # not have that column. So we will join the missing columns from the # raw_data to the transformed_dataset. - transformed_dataset = ( - transformed_dataset | beam.ParDo(ExtractHashAndKeyPColl())) + keyed_transformed_dataset = ( + transformed_dataset | beam.ParDo(_ExtractIdAndKeyPColl())) # The grouping is needed here since tensorflow transform only outputs # columns that are transformed by any of the transforms. So we will # join the missing columns from the raw_data to the transformed_dataset - # using the hash key. + # using the id. transformed_dataset = ( - (transformed_dataset, columns_not_in_schema_with_hash) + (keyed_transformed_dataset, keyed_columns_not_in_schema) | beam.CoGroupByKey() - | beam.ParDo(MergeDicts())) + | beam.ParDo(_MergeDicts())) # The schema only contains the columns that are transformed. transformed_dataset = ( diff --git a/sdks/python/apache_beam/ml/transforms/handlers_test.py b/sdks/python/apache_beam/ml/transforms/handlers_test.py index 327c8c76c0e9..d67d8ec3e705 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers_test.py +++ b/sdks/python/apache_beam/ml/transforms/handlers_test.py @@ -569,6 +569,52 @@ def test_consume_mode_with_extra_columns_in_the_input(self): equal_to(expected_test_data_z, equals_fn=np.array_equal), label='unused column: z') + def test_handler_with_same_input_elements(self): + with beam.Pipeline() as p: + data = [ + { + 'x': 'I' + }, + { + 'x': 'love' + }, + { + 'x': 'Beam' + }, + { + 'x': 'Beam' + }, + { + 'x': 'is' + }, + { + 'x': 'awesome' + }, + ] + raw_data = (p | beam.Create(data)) + process_handler = handlers.TFTProcessHandler( + transforms=[tft.ComputeAndApplyVocabulary(columns=['x'])], + artifact_location=self.artifact_location, + ) + transformed_data = process_handler.process_data(raw_data) + + expected_data = [ + beam.Row(x=np.array([4])), + beam.Row(x=np.array([1])), + beam.Row(x=np.array([0])), + beam.Row(x=np.array([0])), + beam.Row(x=np.array([2])), + beam.Row(x=np.array([3])), + ] + + expected_data_x = [row.x for row in expected_data] + actual_data_x = transformed_data | beam.Map(lambda x: x.x) + + assert_that( + actual_data_x, + equal_to(expected_data_x, equals_fn=np.array_equal), + label='transformed data') + if __name__ == '__main__': unittest.main()