diff --git a/pipeline_dp/pipeline_operations.py b/pipeline_dp/pipeline_operations.py index 3ebf423a..fd49a712 100644 --- a/pipeline_dp/pipeline_operations.py +++ b/pipeline_dp/pipeline_operations.py @@ -1,20 +1,15 @@ """Adapters for working with pipeline frameworks.""" -from enum import Enum from functools import partial -import os import multiprocessing as mp -from pipeline_dp import accumulator import random import numpy as np import abc import apache_beam as beam import apache_beam.transforms.combiners as combiners -import collections import pipeline_dp.accumulator as accumulator import typing -from typing import Any, Optional, Callable, Tuple import collections import itertools @@ -95,20 +90,50 @@ def is_serialization_immediate_on_reduce_by_key(self): return False +class UniqueLabelsGenerator: + """Generate unique labels for each pipeline aggregation.""" + + def __init__(self, suffix): + self._labels = set() + self._suffix = suffix + + def _add_if_unique(self, label): + if label in self._labels: + return False + self._labels.add(label) + return True + + def unique(self, label): + if not label: + label = "UNDEFINED_STAGE_NAME" + suffix_label = label + "_" + self._suffix + if self._add_if_unique(suffix_label): + return suffix_label + for i in itertools.count(1): + label_candidate = label + "_" + str(i) + "_" + self._suffix + if self._add_if_unique(label_candidate): + return label_candidate + + class BeamOperations(PipelineOperations): """Apache Beam adapter.""" + def __init__(self, suffix): + super().__init__() + self._ulb = UniqueLabelsGenerator(suffix) + def map(self, col, fn, stage_name: str): - return col | stage_name >> beam.Map(fn) + return col | self._ulb.unique(stage_name) >> beam.Map(fn) def flat_map(self, col, fn, stage_name: str): - return col | stage_name >> beam.FlatMap(fn) + return col | self._ulb.unique(stage_name) >> beam.FlatMap(fn) def map_tuple(self, col, fn, stage_name: str): - return col | stage_name >> beam.Map(lambda x: fn(*x)) + return col | self._ulb.unique(stage_name) >> beam.Map(lambda x: fn(*x)) def map_values(self, col, fn, stage_name: str): - return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v))) + return col | self._ulb.unique(stage_name) >> beam.MapTuple(lambda k, v: + (k, fn(v))) def group_by_key(self, col, stage_name: str): """Group the values for each key in the PCollection into a single sequence. @@ -121,10 +146,10 @@ def group_by_key(self, col, stage_name: str): An PCollection of tuples in which the type of the second item is list. """ - return col | stage_name >> beam.GroupByKey() + return col | self._ulb.unique(stage_name) >> beam.GroupByKey() def filter(self, col, fn, stage_name: str): - return col | stage_name >> beam.Filter(fn) + return col | self._ulb.unique(stage_name) >> beam.Filter(fn) def filter_by_key(self, col, keys_to_keep, stage_name: str): @@ -154,29 +179,32 @@ def does_keep(pk_val): # Keys to keep are in memory. if not isinstance(keys_to_keep, set): keys_to_keep = set(keys_to_keep) - return col | "Filtering out" >> beam.Filter(does_keep) + return col | self._ulb.unique("Filtering out") >> beam.Filter( + does_keep) # `keys_to_keep` are not in memory. Filter out with a join. - keys_to_keep = (keys_to_keep | - "Reformat PCollection" >> beam.Map(lambda x: (x, True))) + keys_to_keep = (keys_to_keep | self._ulb.unique("Reformat PCollection") + >> beam.Map(lambda x: (x, True))) return ({ VALUES: col, TO_KEEP: keys_to_keep } | "CoGroup by values and to_keep partition flag " >> - beam.CoGroupByKey() | - "Filtering out" >> beam.ParDo(PartitionsFilterJoin())) + beam.CoGroupByKey() | self._ulb.unique("Partitions Filter Join") + >> beam.ParDo(PartitionsFilterJoin())) def keys(self, col, stage_name: str): - return col | stage_name >> beam.Keys() + return col | self._ulb.unique(stage_name) >> beam.Keys() def values(self, col, stage_name: str): - return col | stage_name >> beam.Values() + return col | self._ulb.unique(stage_name) >> beam.Values() def sample_fixed_per_key(self, col, n: int, stage_name: str): - return col | stage_name >> combiners.Sample.FixedSizePerKey(n) + return col | self._ulb.unique( + stage_name) >> combiners.Sample.FixedSizePerKey(n) def count_per_element(self, col, stage_name: str): - return col | stage_name >> combiners.Count.PerElement() + return col | self._ulb.unique( + stage_name) >> combiners.Count.PerElement() def reduce_accumulators_per_key(self, col, stage_name: str = None): # TODO: Use merge function from the accumulator framework. @@ -189,7 +217,8 @@ def merge_accumulators(accumulators): res = acc return res - return col | stage_name >> beam.CombinePerKey(merge_accumulators) + return col | self._ulb.unique(stage_name) >> beam.CombinePerKey( + merge_accumulators) class SparkRDDOperations(PipelineOperations): diff --git a/pipeline_dp/private_beam.py b/pipeline_dp/private_beam.py index 593b060d..1579c2db 100644 --- a/pipeline_dp/private_beam.py +++ b/pipeline_dp/private_beam.py @@ -83,7 +83,7 @@ def __init__(self, self._sum_params = sum_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: - beam_operations = pipeline_dp.BeamOperations() + beam_operations = pipeline_dp.BeamOperations("Expand") dp_engine = pipeline_dp.DPEngine(self._budget_accountant, beam_operations) diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index 40cb9438..d17c2381 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -526,7 +526,7 @@ def test_run_e2e_beam(self): input = p | "Create input" >> beam.Create(list(range(10))) output = self.run_e2e_private_partition_selection_large_budget( - input, pipeline_dp.BeamOperations()) + input, pipeline_dp.BeamOperations("e2e_test")) beam_util.assert_that(output, beam_util.is_not_empty()) diff --git a/tests/pipeline_operations_test.py b/tests/pipeline_operations_test.py index 887c4a6b..60252f67 100644 --- a/tests/pipeline_operations_test.py +++ b/tests/pipeline_operations_test.py @@ -20,7 +20,7 @@ class BeamOperationsTest(parameterized.TestCase): @classmethod def setUpClass(cls): - cls.ops = BeamOperations() + cls.ops = BeamOperations("test_ops") cls.data_extractors = DataExtractors( partition_extractor=lambda x: x[1], privacy_id_extractor=lambda x: x[0], @@ -72,7 +72,8 @@ def test_reduce_accumulators_per_key(self): (8, 1)]) col = self.ops.map_values(col, SumAccumulator, "Wrap into accumulators") - col = self.ops.reduce_accumulators_per_key(col) + col = self.ops.reduce_accumulators_per_key( + col, "Reduce accumulators per key") result = col | "Get accumulated values" >> beam.Map( lambda row: (row[0], row[1].get_metrics())) @@ -80,6 +81,98 @@ def test_reduce_accumulators_per_key(self): beam_util.equal_to([(6, 2), (7, 2), (8, 1)])) +class BeamOperationsStageNameTest(unittest.TestCase): + + def test_beam_create_stage_name_must_be_unique(self): + with test_pipeline.TestPipeline() as p: + p | f"SAME_BEAM_CREATE_NAME" >> beam.Create([(6, 1), (6, 2)]) + with self.assertRaisesRegex( + RuntimeError, + expected_regex="A transform with label " + "\"SAME_BEAM_CREATE_NAME\" already exists " + "in the pipeline."): + p | f"SAME_BEAM_CREATE_NAME" >> beam.Create([(6, 1), (6, 2)]) + + def test_ops_stage_name_must_be_unique(self): + ops_1 = BeamOperations("SAME_OPS_SUFFIX") + ops_2 = BeamOperations("SAME_OPS_SUFFIX") + with test_pipeline.TestPipeline() as p: + col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), + (6, 2)]) + ops_1.map(col, SumAccumulator, "SAME_MAP_NAME") + with self.assertRaisesRegex(RuntimeError, + expected_regex="A transform with label " + "\"SAME_MAP_NAME_SAME_OPS_SUFFIX\" " + "already exists in the pipeline"): + ops_2.map(col, SumAccumulator, "SAME_MAP_NAME") + + def test_empty_stage_names(self): + ops = BeamOperations("UNIQUE_OPS_SUFFIX") + with test_pipeline.TestPipeline() as p: + col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), + (6, 2)]) + col = ops.map_values(col, SumAccumulator, "Wrap into accumulators") + ops.reduce_accumulators_per_key(col) + ops.reduce_accumulators_per_key(col) + ops.reduce_accumulators_per_key(col) + + self.assertEqual("UNIQUE_OPS_SUFFIX", ops._ulb._suffix) + self.assertEqual(4, len(ops._ulb._labels)) + self.assertIn("UNDEFINED_STAGE_NAME_UNIQUE_OPS_SUFFIX", + ops._ulb._labels) + self.assertIn("UNDEFINED_STAGE_NAME_1_UNIQUE_OPS_SUFFIX", + ops._ulb._labels) + self.assertIn("UNDEFINED_STAGE_NAME_2_UNIQUE_OPS_SUFFIX", + ops._ulb._labels) + + def test_one_suffix_multiple_same_stage_name(self): + ops = BeamOperations("UNIQUE_OPS_SUFFIX") + with test_pipeline.TestPipeline() as p: + col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), + (6, 2)]) + ops.map(col, SumAccumulator, "SAME_MAP_NAME") + ops.map(col, SumAccumulator, "SAME_MAP_NAME") + ops.map(col, SumAccumulator, "SAME_MAP_NAME") + + self.assertEqual("UNIQUE_OPS_SUFFIX", ops._ulb._suffix) + self.assertEqual(3, len(ops._ulb._labels)) + self.assertIn("SAME_MAP_NAME_UNIQUE_OPS_SUFFIX", ops._ulb._labels) + self.assertIn("SAME_MAP_NAME_1_UNIQUE_OPS_SUFFIX", ops._ulb._labels) + self.assertIn("SAME_MAP_NAME_2_UNIQUE_OPS_SUFFIX", ops._ulb._labels) + + def test_multiple_suffix_same_ops_stage_name(self): + ops = [] + with test_pipeline.TestPipeline() as p: + for i in range(0, 2): + ops.insert(i, BeamOperations(f"UNIQUE_OPS_SUFFIX_{i}")) + col = p | f"UNIQUE_BEAM_CREATE_NAME_{i}" >> beam.Create([(6, 1), + (6, 2) + ]) + ops[i].map(col, SumAccumulator, "SAME_MAP_NAME") + ops[i].map_values(col, SumAccumulator, "SAME_MAP_VALUES_NAME") + ops[i].flat_map(col, lambda x: x, "SAME_FLAT_MAP_NAME") + ops[i].map_tuple(col, lambda k, v: k + v, + "SAME_MAP_TUPLES_NAME") + ops[i].group_by_key(col, "SAME_GROUP_BY_KEY_NAME") + ops[i].filter(col, lambda x: True, "SAME_FILTER_NAME") + ops[i].filter_by_key(col, [1], "SAME_FILTER_BY_KEY_NAME") + ops[i].keys(col, "SAME_KEYS_NAME") + ops[i].values(col, "SAME_VALUES_NAME") + ops[i].sample_fixed_per_key(col, 1, "SAME_SAMPLE_NAME") + ops[i].count_per_element(col, "SAME_COUNT_NAME") + col_2 = p | f"UNIQUE_BEAM_CREATE_NAME_FOR_REDUCE_{i}" >> beam.Create( + [(6, 1), (6, 2)]) + col_2 = ops[i].map_values(col_2, SumAccumulator, + "SAME_MAP_VALUES_NAME") + ops[i].reduce_accumulators_per_key(col_2, + "SAME_REDUCE_ACC_NAME") + + self.assertEqual("UNIQUE_OPS_SUFFIX_0", ops[0]._ulb._suffix) + self.assertEqual("UNIQUE_OPS_SUFFIX_1", ops[1]._ulb._suffix) + self.assertEqual(13, len(ops[0]._ulb._labels)) + self.assertEqual(13, len(ops[1]._ulb._labels)) + + @unittest.skipIf(sys.platform == "win32", "There are some problems with PySpark setup on Windows") class SparkRDDOperationsTest(parameterized.TestCase):