Skip to content

Commit

Permalink
Use UUIDs instead of object hashes to avoid collisions (#29542)
Browse files Browse the repository at this point in the history
* Add uuid

fix test

fix test

Fix test

* Change class name resembling its functionality

* Add PID to the unique string

* Change the unique id to be bytes

* remove decode

* Replace hash computation with a combined uuid. Resulting key should have the same length.

* Mark internal classes as such.

* misc fixup.

* raise RuntimeError when more than 1 element in observed while CoGroupByKey

* Add MLTransform dropping elements to known issues

* Remove internal use comments since it is now evident from naming.

* Remove references to hash

* Remove references to hash

* Remove references to hash

* Remove references to hash

* Edit for clarity

* Clarify helper code.

* yapf

---------

Co-authored-by: Valentyn Tymofieiev <valentyn@google.com>
Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 4, 2023
1 parent 367e4ec commit c9c89fe
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 59 deletions.
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed MLTransform when duplicated elements are dropped in the output PCollection.([#29600](https://github.com/apache/beam/issues/29600))


## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down Expand Up @@ -134,6 +136,10 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a
* Fixed [CVE-2023-39325](https://www.cve.org/CVERecord?id=CVE-2023-39325) (Java/Python/Go) ([#29118](https://github.com/apache/beam/issues/29118)).
* Mitigated [CVE-2023-47248](https://nvd.nist.gov/vuln/detail/CVE-2023-47248) (Python) [#29392](https://github.com/apache/beam/issues/29392).

## Known issues

* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)).

# [2.51.0] - 2023-10-03

## New Features / Improvements
Expand Down Expand Up @@ -168,6 +174,8 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a
* Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246).
* Python pipelines using BigQuery Storage Read API might need to pin `fastavro`
dependency to 1.8.3 or earlier on some runners that don't use Beam Docker containers: [#28811](https://github.com/apache/beam/issues/28811)
* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)).


# [2.50.0] - 2023-08-30

Expand Down Expand Up @@ -229,6 +237,7 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a
* Beam Python containers rely on a version of Debian/aom that has several security vulnerabilities: [CVE-2021-30474](https://nvd.nist.gov/vuln/detail/CVE-2021-30474), [CVE-2021-30475](https://nvd.nist.gov/vuln/detail/CVE-2021-30475), [CVE-2021-30473](https://nvd.nist.gov/vuln/detail/CVE-2021-30473), [CVE-2020-36133](https://nvd.nist.gov/vuln/detail/CVE-2020-36133), [CVE-2020-36131](https://nvd.nist.gov/vuln/detail/CVE-2020-36131), [CVE-2020-36130](https://nvd.nist.gov/vuln/detail/CVE-2020-36130), and [CVE-2020-36135](https://nvd.nist.gov/vuln/detail/CVE-2020-36135)
* Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable.
* Python SDK worker start-up logs, particularly PIP dependency installations, that are not logged at warning or higher are suppressed. This suppression is reverted in 2.51.0.
* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)).

# [2.49.0] - 2023-07-17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def check_mltransform_compute_and_apply_vocabulary_with_scalar():
Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))
[END mltransform_compute_and_apply_vocabulary_with_scalar] '''.splitlines(
Expand Down
119 changes: 60 additions & 59 deletions sdks/python/apache_beam/ml/transforms/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# pytype: skip-file

import collections
import hashlib
import os
import typing
import uuid
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -49,6 +49,8 @@
'TFTProcessHandler',
]

_ID_COLUMN = 'tmp_uuid' # Name for a temporary column.

RAW_DATA_METADATA_DIR = 'raw_data_metadata'
SCHEMA_FILE = 'schema.pbtxt'
# tensorflow transform doesn't support the types other than tf.int64,
Expand Down Expand Up @@ -80,23 +82,23 @@
tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]]


class ConvertScalarValuesToListValues(beam.DoFn):
class _ConvertScalarValuesToListValues(beam.DoFn):
def process(
self,
element,
):
hash_key, element = element
id, element = element
new_dict = {}
for key, value in element.items():
if isinstance(value,
tuple(_primitive_types_to_typing_container_type.keys())):
new_dict[key] = [value]
else:
new_dict[key] = value
yield (hash_key, new_dict)
yield (id, new_dict)


class ConvertNamedTupleToDict(
class _ConvertNamedTupleToDict(
beam.PTransform[beam.PCollection[typing.Union[beam.Row, typing.NamedTuple]],
beam.PCollection[Dict[str,
common_types.InstanceDictType]]]):
Expand All @@ -121,76 +123,75 @@ def expand(
return pcoll | beam.Map(lambda x: x._asdict())


class ComputeAndAttachHashKey(beam.DoFn):
class _ComputeAndAttachUniqueID(beam.DoFn):
"""
Computes and attaches a hash key to the element.
Only for internal use. No backwards compatibility guarantees.
Computes and attaches a unique id to each element in the PCollection.
"""
def process(self, element):
hash_object = hashlib.sha256()
for _, value in element.items():
# handle the case where value is a list or numpy array
if isinstance(value, (list, np.ndarray)):
hash_object.update(str(list(value)).encode())
else: # assume value is a primitive that can be turned into str
hash_object.update(str(value).encode())
yield (hash_object.hexdigest(), element)
# UUID1 includes machine-specific bits and has a counter. As long as not too
# many are generated at the same time, they should be unique.
# UUID4 generation should be unique in practice as long as underlying random
# number generation is not compromised.
# A combintation of both should avoid the anecdotal pitfalls where
# replacing one with the other has helped some users.
# UUID collision will result in data loss, but we can detect that and fail.

# TODO(https://github.com/apache/beam/issues/29593): Evaluate MLTransform
# implementation without CoGBK.
unique_key = uuid.uuid1().bytes + uuid.uuid4().bytes
yield (unique_key, element)


class GetMissingColumnsPColl(beam.DoFn):
class _GetMissingColumns(beam.DoFn):
"""
Returns data containing only the columns that are not
present in the schema. This is needed since TFT only outputs
columns that are transformed by any of the data processing transforms.
Only for internal use. No backwards compatibility guarantees.
"""
def __init__(self, existing_columns):
self.existing_columns = existing_columns

def process(self, element):
new_dict = {}
hash_key, element = element
for key, value in element.items():
if key not in self.existing_columns:
new_dict[key] = value
yield (hash_key, new_dict)
id, row_dict = element
new_dict = {
k: v
for k, v in row_dict.items() if k not in self.existing_columns
}
yield (id, new_dict)


class MakeHashKeyAsColumn(beam.DoFn):
class _MakeIdAsColumn(beam.DoFn):
"""
Extracts the hash key from the element and adds it as a column.
Only for internal use. No backwards compatibility guarantees.
Extracts the id from the element and adds it as a column instead.
"""
def process(self, element):
hash_key, element = element
element['hash_key'] = hash_key
id, element = element
element[_ID_COLUMN] = id
yield element


class ExtractHashAndKeyPColl(beam.DoFn):
class _ExtractIdAndKeyPColl(beam.DoFn):
"""
Extracts the hash key and return hashkey and element as a tuple.
Only for internal use. No backwards compatibility guarantees.
Extracts the id and return id and element as a tuple.
"""
def process(self, element):
hashkey = element['hash_key'][0]
del element['hash_key']
yield (hashkey.decode('utf-8'), element)
id = element[_ID_COLUMN][0]
del element[_ID_COLUMN]
yield (id, element)


class MergeDicts(beam.DoFn):
class _MergeDicts(beam.DoFn):
"""
Merges the dictionaries in the PCollection.
Only for internal use. No backwards compatibility guarantees.
Merges processed and unprocessed columns from CoGBK result into a single row.
"""
def process(self, element):
_, element = element
unused_row_id, row_dicts_tuple = element
new_dict = {}
for d in element:
for d in row_dicts_tuple:
# After CoGBK, dicts with processed and unprocessed portions of each row
# are wrapped in 1-element lists, since all rows have a unique id.
# Assertion could fail due to UUID collision.
assert len(d) == 1, f"Expected 1 element, got: {len(d)}."
new_dict.update(d[0])
yield new_dict

Expand Down Expand Up @@ -323,7 +324,7 @@ def _get_raw_data_feature_spec_per_column(
def get_raw_data_metadata(
self, input_types: Dict[str, type]) -> dataset_metadata.DatasetMetadata:
raw_data_feature_spec = self.get_raw_data_feature_spec(input_types)
raw_data_feature_spec['hash_key'] = tf.io.VarLenFeature(dtype=tf.string)
raw_data_feature_spec[_ID_COLUMN] = tf.io.VarLenFeature(dtype=tf.string)
return self.convert_raw_data_feature_spec_to_dataset_metadata(
raw_data_feature_spec)

Expand Down Expand Up @@ -417,14 +418,14 @@ def process_data(
# convert Row or NamedTuple to Dict
raw_data = (
raw_data
| ConvertNamedTupleToDict().with_output_types(
| _ConvertNamedTupleToDict().with_output_types(
Dict[str, typing.Union[tuple(column_type_mapping.values())]])) # type: ignore
# AnalyzeAndTransformDataset raise type hint since this is
# schema'd PCollection and the current output type would be a
# custom type(NamedTuple) or a beam.Row type.
else:
column_type_mapping = self._map_column_names_to_types_from_transforms()
# Add hash key so TFT can output hash_key as output but as a no-op.
# Add id so TFT can output id as output but as a no-op.
raw_data_metadata = self.get_raw_data_metadata(
input_types=column_type_mapping)
# Write untransformed metadata to a file so that it can be re-used
Expand All @@ -445,29 +446,29 @@ def process_data(
raw_data_metadata = metadata_io.read_metadata(
os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR))

keyed_raw_data = (raw_data | beam.ParDo(ComputeAndAttachHashKey()))
keyed_raw_data = (raw_data | beam.ParDo(_ComputeAndAttachUniqueID()))

feature_set = [feature.name for feature in raw_data_metadata.schema.feature]
columns_not_in_schema_with_hash = (
keyed_columns_not_in_schema = (
keyed_raw_data
| beam.ParDo(GetMissingColumnsPColl(feature_set)))
| beam.ParDo(_GetMissingColumns(feature_set)))

# To maintain consistency by outputting numpy array all the time,
# whether a scalar value or list or np array is passed as input,
# we will convert scalar values to list values and TFT will ouput
# numpy array all the time.
keyed_raw_data = keyed_raw_data | beam.ParDo(
ConvertScalarValuesToListValues())
_ConvertScalarValuesToListValues())

raw_data_list = (keyed_raw_data | beam.ParDo(MakeHashKeyAsColumn()))
raw_data_list = (keyed_raw_data | beam.ParDo(_MakeIdAsColumn()))

with tft_beam.Context(temp_dir=self.artifact_location):
data = (raw_data_list, raw_data_metadata)
if self.artifact_mode == ArtifactMode.PRODUCE:
transform_fn = (
data
| "AnalyzeDataset" >> tft_beam.AnalyzeDataset(self.process_data_fn))
# TODO: Remove the 'hash_key' column from the transformed
# TODO: Remove the 'id' column from the transformed
# dataset schema generated by TFT.
self.write_transform_artifacts(transform_fn, self.artifact_location)
else:
Expand All @@ -490,25 +491,25 @@ def process_data(
# So we will use a RowTypeConstraint to create a schema'd PCollection.
# this is needed since new columns are included in the
# transformed_dataset.
del self.transformed_schema['hash_key']
del self.transformed_schema[_ID_COLUMN]
row_type = RowTypeConstraint.from_fields(
list(self.transformed_schema.items()))

# If a non schema PCollection is passed, and one of the input columns
# is not transformed by any of the transforms, then the output will
# not have that column. So we will join the missing columns from the
# raw_data to the transformed_dataset.
transformed_dataset = (
transformed_dataset | beam.ParDo(ExtractHashAndKeyPColl()))
keyed_transformed_dataset = (
transformed_dataset | beam.ParDo(_ExtractIdAndKeyPColl()))

# The grouping is needed here since tensorflow transform only outputs
# columns that are transformed by any of the transforms. So we will
# join the missing columns from the raw_data to the transformed_dataset
# using the hash key.
# using the id.
transformed_dataset = (
(transformed_dataset, columns_not_in_schema_with_hash)
(keyed_transformed_dataset, keyed_columns_not_in_schema)
| beam.CoGroupByKey()
| beam.ParDo(MergeDicts()))
| beam.ParDo(_MergeDicts()))

# The schema only contains the columns that are transformed.
transformed_dataset = (
Expand Down
46 changes: 46 additions & 0 deletions sdks/python/apache_beam/ml/transforms/handlers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,52 @@ def test_consume_mode_with_extra_columns_in_the_input(self):
equal_to(expected_test_data_z, equals_fn=np.array_equal),
label='unused column: z')

def test_handler_with_same_input_elements(self):
with beam.Pipeline() as p:
data = [
{
'x': 'I'
},
{
'x': 'love'
},
{
'x': 'Beam'
},
{
'x': 'Beam'
},
{
'x': 'is'
},
{
'x': 'awesome'
},
]
raw_data = (p | beam.Create(data))
process_handler = handlers.TFTProcessHandler(
transforms=[tft.ComputeAndApplyVocabulary(columns=['x'])],
artifact_location=self.artifact_location,
)
transformed_data = process_handler.process_data(raw_data)

expected_data = [
beam.Row(x=np.array([4])),
beam.Row(x=np.array([1])),
beam.Row(x=np.array([0])),
beam.Row(x=np.array([0])),
beam.Row(x=np.array([2])),
beam.Row(x=np.array([3])),
]

expected_data_x = [row.x for row in expected_data]
actual_data_x = transformed_data | beam.Map(lambda x: x.x)

assert_that(
actual_data_x,
equal_to(expected_data_x, equals_fn=np.array_equal),
label='transformed data')


if __name__ == '__main__':
unittest.main()

0 comments on commit c9c89fe

Please sign in to comment.