diff --git a/examples/restaurant_visits.ipynb b/examples/restaurant_visits.ipynb index bd27231d..cee4a6e7 100644 --- a/examples/restaurant_visits.ipynb +++ b/examples/restaurant_visits.ipynb @@ -159,7 +159,7 @@ "\n", "# 1. Create PipelineBackend\n", "# PipelineBackend is an object which encapsulates pipeline framework.\n", - "ops = pipeline_dp.LocalBackend()\n", + "backend = pipeline_dp.LocalBackend()\n", "\n", "# 2. Set the total privacy budget.\n", "# BudgetAccountant automatically splits over all DP aggregations.\n", @@ -170,7 +170,7 @@ "\n", "# 3. Create a DPEngine instance.\n", "# DPEngine object performs all DP aggregations.\n", - "dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)\n", + "dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)\n", "\n", "# 4. Specify which DP aggregated metrics to compute.\n", "# AggregateParams constains specifications of metrics that we would like \n", @@ -228,10 +228,10 @@ }, "source": [ "# Framework independent function\n", - "def run_pipeline(data, ops):\n", + "def run_pipeline(data, backend):\n", " budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-7)\n", "\n", - " dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)\n", + " dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)\n", "\n", " params = pipeline_dp.AggregateParams(noise_kind = pipeline_dp.NoiseKind.LAPLACE,\n", " metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM],\n", @@ -266,8 +266,8 @@ "cell_type": "code", "source": [ "# Local demo with run_pipeline\n", - "ops = pipeline_dp.LocalBackend()\n", - "dp_result = list(run_pipeline(rows, ops))\n", + "backend = pipeline_dp.LocalBackend()\n", + "dp_result = list(run_pipeline(rows, backend))\n", "print(dp_result)" ], "metadata": { @@ -296,8 +296,8 @@ "\n", "with beam.Pipeline(runner=runner) as pipeline:\n", " beam_data = pipeline | beam.Create(rows)\n", - " ops = pipeline_dp.BeamBackend()\n", - " dp_result = run_pipeline(beam_data, ops) \n", + " backend = pipeline_dp.BeamBackend()\n", + " dp_result = run_pipeline(beam_data, backend) \n", " dp_result | beam.Map(print)\n" ], "execution_count": null, diff --git a/pipeline_dp/combiners.py b/pipeline_dp/combiners.py index 840b6dea..3d2da6bb 100644 --- a/pipeline_dp/combiners.py +++ b/pipeline_dp/combiners.py @@ -161,8 +161,9 @@ def __init__(self, params: CombinerParams): self._params = params def create_accumulator(self, values: Iterable[float]) -> AccumulatorType: - return len(values), np.clip(values, self._params.aggregate_params.min_value, - self._params.aggregate_params.max_value).sum() + return len(values), np.clip( + values, self._params.aggregate_params.min_value, + self._params.aggregate_params.max_value).sum() def merge_accumulators(self, accum1: AccumulatorType, accum2: AccumulatorType): diff --git a/pipeline_dp/dp_computations.py b/pipeline_dp/dp_computations.py index 33e27ab5..1567bc78 100644 --- a/pipeline_dp/dp_computations.py +++ b/pipeline_dp/dp_computations.py @@ -30,14 +30,14 @@ def squares_interval(self): return 0, max(self.min_value**2, self.max_value**2) return self.min_value**2, self.max_value**2 - + def compute_middle(min_value: float, max_value: float): - """"Returns the middle point of the interval [min_value, max_value].""" + """"Returns the middle point of the interval [min_value, max_value].""" # (min_value + max_value) / 2 may cause an overflow or loss of precision if # min_value and max_value are large. return min_value + (max_value - min_value) / 2 - + def compute_l1_sensitivity(l0_sensitivity: float, linf_sensitivity: float): """Calculates the L1 sensitivity based on the L0 and Linf sensitivities. diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index 0ed8e9b8..b8dcaf91 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -44,9 +44,9 @@ class DPEngine: """Performs DP aggregations.""" def __init__(self, budget_accountant: BudgetAccountant, - ops: PipelineBackend): + backend: PipelineBackend): self._budget_accountant = budget_accountant - self._ops = ops + self._backend = backend self._report_generators = [] def _add_report_stage(self, text): @@ -80,7 +80,7 @@ def _aggregate(self, col, params: AggregateParams, data_extractors) # Extract the columns. - col = self._ops.map( + col = self._backend.map( col, lambda row: (data_extractors.privacy_id_extractor(row), data_extractors.partition_extractor(row), data_extractors.value_extractor(row)), @@ -91,8 +91,8 @@ def _aggregate(self, col, params: AggregateParams, combiner.create_accumulator) # col : ((privacy_id, partition_key), accumulator) - col = self._ops.map_tuple(col, lambda pid_pk, v: (pid_pk[1], v), - "Drop privacy id") + col = self._backend.map_tuple(col, lambda pid_pk, v: (pid_pk[1], v), + "Drop privacy id") # col : (partition_key, accumulator) if params.public_partitions: @@ -101,7 +101,7 @@ def _aggregate(self, col, params: AggregateParams, combiner.create_accumulator) # col : (partition_key, accumulator) - col = self._ops.combine_accumulators_per_key( + col = self._backend.combine_accumulators_per_key( col, combiner, "Reduce accumulators per partition key") # col : (partition_key, accumulator) @@ -115,11 +115,11 @@ def _aggregate(self, col, params: AggregateParams, # col = self._fix_budget_accounting_if_needed(col, accumulator_factory) # Compute DP metrics. - col = self._ops.map_values(col, combiner.compute_metrics, - "Compute DP` metrics") + col = self._backend.map_values(col, combiner.compute_metrics, + "Compute DP` metrics") - col = self._ops.map_values(col, lambda result: result[1], - "Extract results") + col = self._backend.map_values(col, lambda result: result[1], + "Extract results") return col @@ -198,14 +198,14 @@ def select_private_partitions(self, col, max_partitions_contributed = params.max_partitions_contributed # Extract the columns. - col = self._ops.map( + col = self._backend.map( col, lambda row: (data_extractors.privacy_id_extractor(row), data_extractors.partition_extractor(row)), "Extract (privacy_id, partition_key))") # col : (privacy_id, partition_key) # Apply cross-partition contribution bounding - col = self._ops.group_by_key(col, "Group by privacy_id") + col = self._backend.group_by_key(col, "Group by privacy_id") # col : (privacy_id, [partition_key]) @@ -232,38 +232,39 @@ def sample_unique_elements_fn(pid_and_pks): return ((pid, pk) for pk in sampled_elements) - col = self._ops.flat_map(col, sample_unique_elements_fn, - "Sample cross-partition contributions") + col = self._backend.flat_map(col, sample_unique_elements_fn, + "Sample cross-partition contributions") # col : (privacy_id, partition_key) # A compound accumulator without any child accumulators is used to calculate the raw privacy ID count. compound_combiner = CompoundCombiner([]) - col = self._ops.map_tuple( + col = self._backend.map_tuple( col, lambda pid, pk: (pk, compound_combiner.create_accumulator([])), "Drop privacy id and add accumulator") # col : (partition_key, accumulator) - col = self._ops.combine_accumulators_per_key( + col = self._backend.combine_accumulators_per_key( col, compound_combiner, "Combine accumulators per partition key") # col : (partition_key, accumulator) col = self._select_private_partitions_internal( col, max_partitions_contributed) - col = self._ops.keys(col, "Drop accumulators, keep only partition keys") + col = self._backend.keys(col, + "Drop accumulators, keep only partition keys") return col def _drop_not_public_partitions(self, col, public_partitions, data_extractors: DataExtractors): """Drops partitions in `col` which are not in `public_partitions`.""" - col = self._ops.map( + col = self._backend.map( col, lambda row: (data_extractors.partition_extractor(row), row), "Extract partition id") - col = self._ops.filter_by_key(col, public_partitions, - "Filtering out non-public partitions") + col = self._backend.filter_by_key( + col, public_partitions, "Filtering out non-public partitions") self._add_report_stage( f"Public partition selection: dropped non public partitions") - return self._ops.map_tuple(col, lambda k, v: v, "Drop key") + return self._backend.map_tuple(col, lambda k, v: v, "Drop key") def _add_empty_public_partitions(self, col, public_partitions, aggregator_fn): @@ -272,11 +273,11 @@ def _add_empty_public_partitions(self, col, public_partitions, self._add_report_stage( "Adding empty partitions to public partitions that are missing in " "data") - empty_accumulators = self._ops.map( + empty_accumulators = self._backend.map( public_partitions, lambda partition_key: (partition_key, aggregator_fn([])), "Build empty accumulators") - return self._ops.flatten( + return self._backend.flatten( col, empty_accumulators, "Join public partitions with partitions from data") @@ -299,28 +300,29 @@ def _bound_contributions(self, col, max_partitions_contributed: int, accumulator). """ # per partition-contribution bounding with bounding of each contribution - col = self._ops.map_tuple( + col = self._backend.map_tuple( col, lambda pid, pk, v: ((pid, pk), v), "Rekey to ( (privacy_id, partition_key), value))") - col = self._ops.sample_fixed_per_key( + col = self._backend.sample_fixed_per_key( col, max_contributions_per_partition, "Sample per (privacy_id, partition_key)") self._add_report_stage( f"Per-partition contribution bounding: randomly selected not " f"more than {max_contributions_per_partition} contributions") # ((privacy_id, partition_key), [value]) - col = self._ops.map_values( + col = self._backend.map_values( col, aggregator_fn, "Apply aggregate_fn after per partition bounding") # ((privacy_id, partition_key), accumulator) # Cross partition bounding - col = self._ops.map_tuple( + col = self._backend.map_tuple( col, lambda pid_pk, v: (pid_pk[0], (pid_pk[1], v)), "Rekey to (privacy_id, (partition_key, " "accumulator))") - col = self._ops.sample_fixed_per_key(col, max_partitions_contributed, - "Sample per privacy_id") + col = self._backend.sample_fixed_per_key(col, + max_partitions_contributed, + "Sample per privacy_id") self._add_report_stage( f"Cross-partition contribution bounding: randomly selected not more than " @@ -331,9 +333,8 @@ def unnest_cross_partition_bound_sampled_per_key(pid_pk_v): pid, pk_values = pid_pk_v return (((pid, pk), v) for (pk, v) in pk_values) - return self._ops.flat_map(col, - unnest_cross_partition_bound_sampled_per_key, - "Unnest") + return self._backend.flat_map( + col, unnest_cross_partition_bound_sampled_per_key, "Unnest") def _select_private_partitions_internal(self, col, max_partitions_contributed: int): @@ -371,7 +372,7 @@ def filter_fn( f"Private Partition selection: using {budget.mechanism_type.value} " f"method with (eps= {budget.eps}, delta = {budget.delta})") - return self._ops.filter(col, filter_fn, "Filter private partitions") + return self._backend.filter(col, filter_fn, "Filter private partitions") def _fix_budget_accounting_if_needed(self, col, accumulator_factory): """Adds MechanismSpec to accumulators. @@ -391,11 +392,11 @@ def _fix_budget_accounting_if_needed(self, col, accumulator_factory): Returns: col: collection with elements (key, accumulator). """ - if not self._ops.is_serialization_immediate_on_reduce_by_key(): + if not self._backend.is_serialization_immediate_on_reduce_by_key(): # No need to fix, since accumulators contain correct MechanismSpec. return col mechanism_specs = accumulator_factory.get_mechanism_specs() - return self._ops.map_values( + return self._backend.map_values( col, lambda acc: acc.set_mechanism_specs(mechanism_specs)) def _not_a_proper_number(self, num): diff --git a/pipeline_dp/private_beam.py b/pipeline_dp/private_beam.py index 28a18c64..e897a504 100644 --- a/pipeline_dp/private_beam.py +++ b/pipeline_dp/private_beam.py @@ -78,8 +78,8 @@ def __init__(self, self._sum_params = sum_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: - ops = pipeline_dp.BeamBackend() - dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops) + backend = pipeline_dp.BeamBackend() + dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._sum_params.noise_kind, @@ -103,7 +103,7 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: # aggregate() returns a list of metrics for each partition key. # Here is only one metric - sum. Remove list. - dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list") + dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_sum) return dp_result @@ -119,8 +119,8 @@ def __init__(self, self._count_params = count_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: - ops = pipeline_dp.BeamBackend() - dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops) + backend = pipeline_dp.BeamBackend() + dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._count_params.noise_kind, @@ -144,7 +144,7 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: # aggregate() returns a list of metrics for each partition key. # Here is only one metric - count. Remove list. - dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list") + dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_count) return dp_result @@ -160,8 +160,8 @@ def __init__(self, self._privacy_id_count_params = privacy_id_count_params def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: - ops = pipeline_dp.BeamBackend() - dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops) + backend = pipeline_dp.BeamBackend() + dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=self._privacy_id_count_params.noise_kind, @@ -183,7 +183,7 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection: # aggregate() returns a list of metrics for each partition key. # Here is only one metric - privacy_id_count. Remove list. - dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list") + dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_privacy_id_count) return dp_result diff --git a/pipeline_dp/private_spark.py b/pipeline_dp/private_spark.py index b7b3ffb0..84b882ad 100644 --- a/pipeline_dp/private_spark.py +++ b/pipeline_dp/private_spark.py @@ -52,8 +52,8 @@ def sum(self, sum_params: aggregate_params.SumParams) -> RDD: sum_params: parameters for calculation """ - ops = pipeline_dp.SparkRDDBackend() - dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops) + backend = pipeline_dp.SparkRDDBackend() + dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=sum_params.noise_kind, @@ -76,7 +76,7 @@ def sum(self, sum_params: aggregate_params.SumParams) -> RDD: # aggregate() returns a list of metrics for each partition key. # Here is only one metric - sum. Remove list. - dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list") + dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_sum) return dp_result @@ -88,8 +88,8 @@ def count(self, count_params: aggregate_params.CountParams) -> RDD: count_params: parameters for calculation """ - ops = pipeline_dp.SparkRDDBackend() - dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops) + backend = pipeline_dp.SparkRDDBackend() + dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=count_params.noise_kind, @@ -111,7 +111,7 @@ def count(self, count_params: aggregate_params.CountParams) -> RDD: # aggregate() returns a list of metrics for each partition key. # Here is only one metric - count. Remove list. - dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list") + dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_count) return dp_result @@ -125,8 +125,8 @@ def privacy_id_count( privacy_id_count_params: parameters for calculation """ - ops = pipeline_dp.SparkRDDBackend() - dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops) + backend = pipeline_dp.SparkRDDBackend() + dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend) params = pipeline_dp.AggregateParams( noise_kind=privacy_id_count_params.noise_kind, @@ -148,7 +148,7 @@ def privacy_id_count( # aggregate() returns a list of metrics for each partition key. # Here is only one metric - privacy_id_count. Remove list. - dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list") + dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list") # dp_result : (partition_key, dp_privacy_id_count) return dp_result diff --git a/tests/combiners_test.py b/tests/combiners_test.py index 8d475f31..bcda005f 100644 --- a/tests/combiners_test.py +++ b/tests/combiners_test.py @@ -21,7 +21,7 @@ def _create_mechanism_spec(no_noise): def _create_aggregate_params(max_value: float = 1): return pipeline_dp.AggregateParams( min_value=0, - max_value=high, + max_value=max_value, max_partitions_contributed=1, max_contributions_per_partition=3, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, @@ -219,7 +219,7 @@ class MeanCombinerTest(parameterized.TestCase): def _create_combiner(self, no_noise): mechanism_spec = _create_mechanism_spec(no_noise) - aggregate_params = _create_aggregate_params(high=4) + aggregate_params = _create_aggregate_params(max_value=4) params = dp_combiners.CombinerParams(mechanism_spec, aggregate_params) return dp_combiners.MeanCombiner(params) diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index cc15c98f..3de06e8f 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -251,7 +251,7 @@ def test_check_aggregate_params(self): }, { "desc": - "unspecified low", + "unspecified min_value", "col": [0], "params": pipeline_dp.AggregateParams( @@ -264,7 +264,7 @@ def test_check_aggregate_params(self): }, { "desc": - "unspecified high", + "unspecified max_value", "col": [0], "params": pipeline_dp.AggregateParams( @@ -310,7 +310,7 @@ def test_check_aggregate_params(self): total_delta=1e-10) engine = pipeline_dp.DPEngine( budget_accountant=budget_accountant, - ops=pipeline_dp.LocalPipelineOperations()) + backend=pipeline_dp.LocalPipelineBackend()) engine.aggregate(test_case["col"], test_case["params"], test_case["data_extractor"]) @@ -350,7 +350,7 @@ def test_aggregate_report(self): budget_accountant = NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-10) engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) engine.aggregate(col, params1, data_extractor) engine.aggregate(col, params2, data_extractor) engine.select_private_partitions(col, select_partitions_params, @@ -403,7 +403,7 @@ def test_aggregate_computation_graph_verification(self, ] engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) col = engine.aggregate(col=col, params=aggregator_params, data_extractors=data_extractor) @@ -462,7 +462,7 @@ def test_aggregate_private_partition_selection_keep_everything(self): value_extractor=lambda x: None) engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) col = engine.aggregate(col=col, params=aggregator_params, @@ -500,7 +500,7 @@ def test_aggregate_private_partition_selection_drop_many(self): value_extractor=lambda x: None) engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) col = engine.aggregate(col=col, params=aggregator_params, @@ -552,7 +552,7 @@ def test_select_private_partitions(self): partition_extractor=lambda x: x[1]) engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) col = engine.select_private_partitions(col=col, params=params, @@ -650,7 +650,7 @@ def test_check_select_private_partitions(self): total_delta=1e-10) engine = pipeline_dp.DPEngine( budget_accountant=budget_accountant, - ops=pipeline_dp.LocalPipelineOperations()) + backend=pipeline_dp.LocalPipelineBackend()) engine.select_private_partitions(test_case["col"], test_case["params"], test_case["data_extractor"]) @@ -682,7 +682,7 @@ def test_aggregate_public_partitions_drop_non_public(self): value_extractor=lambda x: x) engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) col = engine.aggregate(col=col, params=aggregator_params, @@ -723,7 +723,7 @@ def test_aggregate_public_partitions_add_empty_public_partitions(self): value_extractor=lambda x: 1) engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, - ops=pipeline_dp.LocalBackend()) + backend=pipeline_dp.LocalBackend()) col = engine.aggregate(col=col, params=aggregator_params, @@ -746,13 +746,13 @@ def test_aggregate_public_partitions_add_empty_public_partitions(self): @staticmethod def create_dp_engine_default(accountant: NaiveBudgetAccountant = None, - ops: PipelineBackend = None): + backend: PipelineBackend = None): if not accountant: accountant = NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-10) - if not ops: - ops = pipeline_dp.LocalBackend() - dp_engine = pipeline_dp.DPEngine(accountant, ops) + if not backend: + backend = pipeline_dp.LocalBackend() + dp_engine = pipeline_dp.DPEngine(accountant, backend) aggregator_params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, metrics=[], @@ -763,7 +763,7 @@ def create_dp_engine_default(accountant: NaiveBudgetAccountant = None, return dp_engine @staticmethod - def run_e2e_private_partition_selection_large_budget(col, ops): + def run_e2e_private_partition_selection_large_budget(col, backend): # Arrange aggregator_params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, @@ -783,7 +783,7 @@ def run_e2e_private_partition_selection_large_budget(col, ops): partition_extractor=lambda x: f"pk{x//2}", value_extractor=lambda x: x) - engine = pipeline_dp.DPEngine(budget_accountant, ops) + engine = pipeline_dp.DPEngine(budget_accountant, backend) col = engine.aggregate(col=col, params=aggregator_params, diff --git a/tests/pipeline_backend_test.py b/tests/pipeline_backend_test.py index 3dc5ec0d..6672c519 100644 --- a/tests/pipeline_backend_test.py +++ b/tests/pipeline_backend_test.py @@ -22,7 +22,7 @@ class BeamBackendTest(parameterized.TestCase): @classmethod def setUpClass(cls): - cls.ops = BeamBackend() + cls.backend = BeamBackend() cls.data_extractors = DataExtractors( partition_extractor=lambda x: x[1], privacy_id_extractor=lambda x: x[0], @@ -34,8 +34,8 @@ def test_filter_by_key_must_not_be_none(self): col = p | "Create PCollection" >> beam.Create(data) key_to_keep = None with self.assertRaises(TypeError): - result = self.ops.filter_by_key(col, key_to_keep, - "filte_by_key") + result = self.backend.filter_by_key(col, key_to_keep, + "filter_by_key") @parameterized.parameters( {'in_memory': True}, @@ -49,7 +49,8 @@ def test_filter_by_key_remove(self, in_memory): expected_result = [(7, 1), (9, 10)] if not in_memory: keys_to_keep = p | "To PCollection" >> beam.Create(keys_to_keep) - result = self.ops.filter_by_key(col, keys_to_keep, "filte_by_key") + result = self.backend.filter_by_key(col, keys_to_keep, + "filter_by_key") beam_util.assert_that(result, beam_util.equal_to(expected_result)) @parameterized.parameters( @@ -64,7 +65,8 @@ def test_filter_by_key_empty_keys_to_keep(self, in_memory): keys_to_keep = [] if not in_memory: keys_to_keep = p | "To PCollection" >> beam.Create(keys_to_keep) - result = self.ops.filter_by_key(col, keys_to_keep, "filter_by_key") + result = self.backend.filter_by_key(col, keys_to_keep, + "filter_by_key") beam_util.assert_that(result, beam_util.equal_to([])) def test_reduce_accumulators_per_key(self): @@ -72,9 +74,9 @@ def test_reduce_accumulators_per_key(self): col = p | "Create PCollection" >> beam.Create([(6, 1), (7, 1), (6, 1), (7, 1), (8, 1)]) - col = self.ops.map_values(col, SumAccumulator, - "Wrap into accumulators") - col = self.ops.reduce_accumulators_per_key( + col = self.backend.map_values(col, SumAccumulator, + "Wrap into accumulators") + col = self.backend.reduce_accumulators_per_key( col, "Reduce accumulators per key") result = col | "Get accumulated values" >> beam.Map( lambda row: (row[0], row[1].get_metrics())) @@ -88,13 +90,13 @@ def test_combine_accumulators_per_key(self): (6, 1), (7, 1), (8, 1)]) sum_combiner = SumCombiner() - col = self.ops.group_by_key(col, "group_by_key") - col = self.ops.map_values(col, sum_combiner.create_accumulator, - "Wrap into accumulators") - col = self.ops.combine_accumulators_per_key( + col = self.backend.group_by_key(col, "group_by_key") + col = self.backend.map_values(col, sum_combiner.create_accumulator, + "Wrap into accumulators") + col = self.backend.combine_accumulators_per_key( col, sum_combiner, "Reduce accumulators per key") - result = self.ops.map_values(col, sum_combiner.compute_metrics, - "Compute metrics") + result = self.backend.map_values(col, sum_combiner.compute_metrics, + "Compute metrics") beam_util.assert_that(result, beam_util.equal_to([(6, 2), (7, 2), (8, 1)])) @@ -116,109 +118,112 @@ def _create_mock_pcollection(): @staticmethod def _test_helper(): mock_pcollection = BeamBackendStageNameTest._create_mock_pcollection() - ops = BeamBackend() - ops._ulg = BeamBackendStageNameTest.MockUniqueLabelGenerators() - return mock_pcollection, ops + backend = BeamBackend() + backend._ulg = BeamBackendStageNameTest.MockUniqueLabelGenerators() + return mock_pcollection, backend @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_map(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.map(mock_pcollection, lambda x: x, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.map(mock_pcollection, lambda x: x, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_map_values(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.map_values(mock_pcollection, lambda x: x, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.map_values(mock_pcollection, lambda x: x, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_flat_map(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.flat_map(mock_pcollection, lambda x: x, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.flat_map(mock_pcollection, lambda x: x, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_map_tuple(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.map_tuple(mock_pcollection, lambda x: x, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.map_tuple(mock_pcollection, lambda x: x, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_group_by_key(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.group_by_key(mock_pcollection, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.group_by_key(mock_pcollection, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_filter(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.filter(mock_pcollection, lambda x: True, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.filter(mock_pcollection, lambda x: True, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_filter_by_key(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.filter_by_key(mock_pcollection, [1], "stage_name") + mock_pcollection, backend = self._test_helper() + backend.filter_by_key(mock_pcollection, [1], "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_keys(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.keys(mock_pcollection, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.keys(mock_pcollection, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_values(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.values(mock_pcollection, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.values(mock_pcollection, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_sample_fixed_per_key(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.sample_fixed_per_key(mock_pcollection, 1, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.sample_fixed_per_key(mock_pcollection, 1, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_count_per_element(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.count_per_element(mock_pcollection, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.count_per_element(mock_pcollection, "stage_name") mock_rrshift.assert_called_once_with("unique_label") @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") def test_reduce_accumulators_per_key(self, mock_rrshift): - mock_pcollection, ops = self._test_helper() - ops.reduce_accumulators_per_key(mock_pcollection, "stage_name") + mock_pcollection, backend = self._test_helper() + backend.reduce_accumulators_per_key(mock_pcollection, "stage_name") mock_rrshift.assert_called_once_with("unique_label") - def test_ops_stage_name_must_be_unique(self): - ops_1 = BeamBackend("SAME_OPS_SUFFIX") - ops_2 = BeamBackend("SAME_OPS_SUFFIX") + def test_backend_stage_name_must_be_unique(self): + backend_1 = BeamBackend("SAME_backend_SUFFIX") + backend_2 = BeamBackend("SAME_backend_SUFFIX") with test_pipeline.TestPipeline() as p: col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), (6, 2)]) - ops_1.map(col, lambda x: x, "SAME_MAP_NAME") + backend_1.map(col, lambda x: x, "SAME_MAP_NAME") with self.assertRaisesRegex(RuntimeError, expected_regex="A transform with label " - "\"SAME_MAP_NAME_SAME_OPS_SUFFIX\" " + "\"SAME_MAP_NAME_SAME_backend_SUFFIX\" " "already exists in the pipeline"): - ops_2.map(col, lambda x: x, "SAME_MAP_NAME") + backend_2.map(col, lambda x: x, "SAME_MAP_NAME") def test_one_suffix_multiple_same_stage_name(self): - ops = BeamBackend("UNIQUE_OPS_SUFFIX") + backend = BeamBackend("UNIQUE_BACKEND_SUFFIX") with test_pipeline.TestPipeline() as p: col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), (6, 2)]) - ops.map(col, lambda x: x, "SAME_MAP_NAME") - ops.map(col, lambda x: x, "SAME_MAP_NAME") - ops.map(col, lambda x: x, "SAME_MAP_NAME") + backend.map(col, lambda x: x, "SAME_MAP_NAME") + backend.map(col, lambda x: x, "SAME_MAP_NAME") + backend.map(col, lambda x: x, "SAME_MAP_NAME") - self.assertEqual("UNIQUE_OPS_SUFFIX", ops._ulg._suffix) - self.assertEqual(3, len(ops._ulg._labels)) - self.assertIn("SAME_MAP_NAME_UNIQUE_OPS_SUFFIX", ops._ulg._labels) - self.assertIn("SAME_MAP_NAME_1_UNIQUE_OPS_SUFFIX", ops._ulg._labels) - self.assertIn("SAME_MAP_NAME_2_UNIQUE_OPS_SUFFIX", ops._ulg._labels) + self.assertEqual("UNIQUE_BACKEND_SUFFIX", backend._ulg._suffix) + self.assertEqual(3, len(backend._ulg._labels)) + self.assertIn("SAME_MAP_NAME_UNIQUE_BACKEND_SUFFIX", + backend._ulg._labels) + self.assertIn("SAME_MAP_NAME_1_UNIQUE_BACKEND_SUFFIX", + backend._ulg._labels) + self.assertIn("SAME_MAP_NAME_2_UNIQUE_BACKEND_SUFFIX", + backend._ulg._labels) @unittest.skipIf(sys.platform == "win32" or sys.platform == 'darwin' or ( @@ -236,14 +241,14 @@ def setUpClass(cls): partition_extractor=lambda x: x[1], privacy_id_extractor=lambda x: x[0], value_extractor=lambda x: x[2]) - cls.ops = SparkRDDBackend() + cls.backend = SparkRDDBackend() def test_filter_by_key_none_keys_to_keep(self): data = [(1, 11), (2, 22)] dist_data = self.sc.parallelize(data) key_to_keep = None with self.assertRaises(TypeError): - self.ops.filter_by_key(dist_data, key_to_keep) + self.backend.filter_by_key(dist_data, key_to_keep) @parameterized.parameters({'distributed': False}, {'distributed': True}) def test_filter_by_key_empty_keys_to_keep(self, distributed): @@ -252,7 +257,7 @@ def test_filter_by_key_empty_keys_to_keep(self, distributed): keys_to_keep = [] if distributed: keys_to_keep = self.sc.parallelize(keys_to_keep) - result = self.ops.filter_by_key(dist_data, keys_to_keep).collect() + result = self.backend.filter_by_key(dist_data, keys_to_keep).collect() self.assertListEqual(result, []) @parameterized.parameters({'distributed': False}, {'distributed': True}) @@ -262,13 +267,13 @@ def test_filter_by_key_nonempty_keys_to_keep(self, distributed): keys_to_keep = [1, 3, 3] if distributed: keys_to_keep = self.sc.parallelize(keys_to_keep) - result = self.ops.filter_by_key(dist_data, keys_to_keep).collect() + result = self.backend.filter_by_key(dist_data, keys_to_keep).collect() self.assertListEqual(result, [(1, 11)]) def test_sample_fixed_per_key(self): data = [(1, 11), (2, 22), (3, 33), (1, 14), (2, 25), (1, 16)] dist_data = self.sc.parallelize(data) - rdd = self.ops.sample_fixed_per_key(dist_data, 2) + rdd = self.backend.sample_fixed_per_key(dist_data, 2) result = dict(rdd.collect()) self.assertEqual(len(result[1]), 2) self.assertTrue(set(result[1]).issubset({11, 14, 16})) @@ -278,7 +283,7 @@ def test_sample_fixed_per_key(self): def test_count_per_element(self): data = ['a', 'b', 'a'] dist_data = self.sc.parallelize(data) - rdd = self.ops.count_per_element(dist_data) + rdd = self.backend.count_per_element(dist_data) result = rdd.collect() result = dict(result) self.assertDictEqual(result, {'a': 2, 'b': 1}) @@ -286,9 +291,9 @@ def test_count_per_element(self): def test_reduce_accumulators_per_key(self): data = [(1, 11), (2, 22), (3, 33), (1, 14), (2, 25), (1, 16)] dist_data = self.sc.parallelize(data) - rdd = self.ops.map_values(dist_data, SumAccumulator, - "Wrap into accumulators") - result = self.ops\ + rdd = self.backend.map_values(dist_data, SumAccumulator, + "Wrap into accumulators") + result = self.backend\ .reduce_accumulators_per_key(rdd, "Reduce accumulator per key")\ .map(lambda row: (row[0], row[1].get_metrics()))\ .collect() @@ -297,37 +302,43 @@ def test_reduce_accumulators_per_key(self): def test_combine_accumulators_per_key(self): data = self.sc.parallelize([(1, 2), (2, 1), (1, 4), (3, 8), (2, 3)]) - rdd = self.ops.group_by_key(data) + rdd = self.backend.group_by_key(data) sum_combiner = SumCombiner() - rdd = self.ops.map_values(rdd, sum_combiner.create_accumulator) - rdd = self.ops.combine_accumulators_per_key(rdd, sum_combiner) - rdd = self.ops.map_values(rdd, sum_combiner.compute_metrics) + rdd = self.backend.map_values(rdd, sum_combiner.create_accumulator) + rdd = self.backend.combine_accumulators_per_key(rdd, sum_combiner) + rdd = self.backend.map_values(rdd, sum_combiner.compute_metrics) result = dict(rdd.collect()) self.assertDictEqual(result, {1: 6, 2: 4, 3: 8}) def test_map_tuple(self): data = [(1, 2), (3, 4)] dist_data = self.sc.parallelize(data) - result = self.ops.map_tuple(dist_data, lambda a, b: a + b).collect() + result = self.backend.map_tuple(dist_data, lambda a, b: a + b).collect() self.assertEqual(result, [3, 7]) def test_flat_map(self): data = [[1, 2, 3, 4], [5, 6, 7, 8]] dist_data = self.sc.parallelize(data) self.assertEqual( - self.ops.flat_map(dist_data, lambda x: x).collect(), + self.backend.flat_map(dist_data, lambda x: x).collect(), [1, 2, 3, 4, 5, 6, 7, 8]) data = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])] dist_data = self.sc.parallelize(data) self.assertEqual( - self.ops.flat_map(dist_data, lambda x: x[1]).collect(), + self.backend.flat_map(dist_data, lambda x: x[1]).collect(), [1, 2, 3, 4, 5, 6, 7, 8]) self.assertEqual( - self.ops.flat_map(dist_data, - lambda x: [(x[0], y) for y in x[1]]).collect(), - [("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 5), ("b", 6), - ("b", 7), ("b", 8)]) + self.backend.flat_map( + dist_data, + lambda x: [(x[0], y) for y in x[1]]).collect(), [("a", 1), + ("a", 2), + ("a", 3), + ("a", 4), + ("b", 5), + ("b", 6), + ("b", 7), + ("b", 8)]) @classmethod def tearDownClass(cls): @@ -338,88 +349,94 @@ class LocalBackendTest(unittest.TestCase): @classmethod def setUpClass(cls): - cls.ops = LocalBackend() + cls.backend = LocalBackend() cls.data_extractors = DataExtractors( partition_extractor=lambda x: x[1], privacy_id_extractor=lambda x: x[0], value_extractor=lambda x: x[2]) def test_local_map(self): - self.assertEqual(list(self.ops.map([], lambda x: x / 0)), []) + self.assertEqual(list(self.backend.map([], lambda x: x / 0)), []) - self.assertEqual(list(self.ops.map([1, 2, 3], str)), ["1", "2", "3"]) - self.assertEqual(list(self.ops.map(range(5), lambda x: x**2)), + self.assertEqual(list(self.backend.map([1, 2, 3], str)), + ["1", "2", "3"]) + self.assertEqual(list(self.backend.map(range(5), lambda x: x**2)), [0, 1, 4, 9, 16]) def test_local_map_tuple(self): tuple_list = [(1, 2), (2, 3), (3, 4)] self.assertEqual( - list(self.ops.map_tuple(tuple_list, lambda k, v: k + v)), [3, 5, 7]) + list(self.backend.map_tuple(tuple_list, lambda k, v: k + v)), + [3, 5, 7]) self.assertEqual( - list(self.ops.map_tuple(tuple_list, lambda k, v: (str(k), str(v)))), - [("1", "2"), ("2", "3"), ("3", "4")]) + list( + self.backend.map_tuple(tuple_list, lambda k, v: + (str(k), str(v)))), [("1", "2"), + ("2", "3"), + ("3", "4")]) def test_local_map_values(self): - self.assertEqual(list(self.ops.map_values([], lambda x: x / 0)), []) + self.assertEqual(list(self.backend.map_values([], lambda x: x / 0)), []) tuple_list = [(1, 2), (2, 3), (3, 4)] - self.assertEqual(list(self.ops.map_values(tuple_list, str)), [(1, "2"), - (2, "3"), - (3, "4")]) - self.assertEqual(list(self.ops.map_values(tuple_list, lambda x: x**2)), - [(1, 4), (2, 9), (3, 16)]) + self.assertEqual(list(self.backend.map_values(tuple_list, str)), + [(1, "2"), (2, "3"), (3, "4")]) + self.assertEqual( + list(self.backend.map_values(tuple_list, lambda x: x**2)), + [(1, 4), (2, 9), (3, 16)]) def test_local_group_by_key(self): some_dict = [("cheese", "brie"), ("bread", "sourdough"), ("cheese", "swiss")] - self.assertEqual(list(self.ops.group_by_key(some_dict)), + self.assertEqual(list(self.backend.group_by_key(some_dict)), [("cheese", ["brie", "swiss"]), ("bread", ["sourdough"])]) def test_local_filter(self): - self.assertEqual(list(self.ops.filter([], lambda x: True)), []) - self.assertEqual(list(self.ops.filter([], lambda x: False)), []) + self.assertEqual(list(self.backend.filter([], lambda x: True)), []) + self.assertEqual(list(self.backend.filter([], lambda x: False)), []) example_list = [1, 2, 2, 3, 3, 4, 2] - self.assertEqual(list(self.ops.filter(example_list, lambda x: x % 2)), - [1, 3, 3]) - self.assertEqual(list(self.ops.filter(example_list, lambda x: x < 3)), - [1, 2, 2, 2]) + self.assertEqual( + list(self.backend.filter(example_list, lambda x: x % 2)), [1, 3, 3]) + self.assertEqual( + list(self.backend.filter(example_list, lambda x: x < 3)), + [1, 2, 2, 2]) def test_local_filter_by_key_empty_keys_to_keep(self): col = [(7, 1), (2, 1), (3, 9), (4, 1), (9, 10)] keys_to_keep = [] - result = self.ops.filter_by_key(col, keys_to_keep, "filte_by_key") + result = self.backend.filter_by_key(col, keys_to_keep, "filter_by_key") self.assertEqual(result, []) def test_local_filter_by_key_remove(self): col = [(7, 1), (2, 1), (3, 9), (4, 1), (9, 10)] keys_to_keep = [7, 9] - result = self.ops.filter_by_key(col, keys_to_keep, "filte_by_key") + result = self.backend.filter_by_key(col, keys_to_keep, "filter_by_key") self.assertEqual(result, [(7, 1), (9, 10)]) def test_local_keys(self): - self.assertEqual(list(self.ops.keys([])), []) + self.assertEqual(list(self.backend.keys([])), []) example_list = [(1, 2), (2, 3), (3, 4), (4, 8)] - self.assertEqual(list(self.ops.keys(example_list)), [1, 2, 3, 4]) + self.assertEqual(list(self.backend.keys(example_list)), [1, 2, 3, 4]) def test_local_values(self): - self.assertEqual(list(self.ops.values([])), []) + self.assertEqual(list(self.backend.values([])), []) example_list = [(1, 2), (2, 3), (3, 4), (4, 8)] - self.assertEqual(list(self.ops.values(example_list)), [2, 3, 4, 8]) + self.assertEqual(list(self.backend.values(example_list)), [2, 3, 4, 8]) def test_local_count_per_element(self): example_list = [1, 2, 3, 4, 5, 6, 1, 4, 0, 1] - result = self.ops.count_per_element(example_list) + result = self.backend.count_per_element(example_list) self.assertEqual(dict(result), { 1: 3, @@ -433,18 +450,18 @@ def test_local_count_per_element(self): def test_local_reduce_accumulators_per_key(self): example_list = [(1, 2), (2, 1), (1, 4), (3, 8), (2, 3)] - col = self.ops.map_values(example_list, SumAccumulator) - col = self.ops.reduce_accumulators_per_key(col) + col = self.backend.map_values(example_list, SumAccumulator) + col = self.backend.reduce_accumulators_per_key(col) result = list(map(lambda row: (row[0], row[1].get_metrics()), col)) self.assertEqual(result, [(1, 6), (2, 4), (3, 8)]) def test_local_combine_accumulators_per_key(self): data = [(1, 2), (2, 1), (1, 4), (3, 8), (2, 3)] - col = self.ops.group_by_key(data) + col = self.backend.group_by_key(data) sum_combiner = SumCombiner() - col = self.ops.map_values(col, sum_combiner.create_accumulator) - col = self.ops.combine_accumulators_per_key(col, sum_combiner) - col = self.ops.map_values(col, sum_combiner.compute_metrics) + col = self.backend.map_values(col, sum_combiner.create_accumulator) + col = self.backend.combine_accumulators_per_key(col, sum_combiner) + col = self.backend.map_values(col, sum_combiner.compute_metrics) result = list(col) self.assertEqual(result, [(1, 6), (2, 4), (3, 8)]) @@ -465,15 +482,15 @@ def assert_laziness(operator, *args): # lazy operators accept exceptions_generator_function() # as argument without raising errors: - assert_laziness(self.ops.map, str) - assert_laziness(self.ops.map_values, str) - assert_laziness(self.ops.filter, bool) - assert_laziness(self.ops.values) - assert_laziness(self.ops.keys) - assert_laziness(self.ops.count_per_element) - assert_laziness(self.ops.flat_map, str) - assert_laziness(self.ops.sample_fixed_per_key, int) - assert_laziness(self.ops.reduce_accumulators_per_key) + assert_laziness(self.backend.map, str) + assert_laziness(self.backend.map_values, str) + assert_laziness(self.backend.filter, bool) + assert_laziness(self.backend.values) + assert_laziness(self.backend.keys) + assert_laziness(self.backend.count_per_element) + assert_laziness(self.backend.flat_map, str) + assert_laziness(self.backend.sample_fixed_per_key, int) + assert_laziness(self.backend.reduce_accumulators_per_key) def test_local_sample_fixed_per_key_requires_no_discarding(self): input_col = [("pid1", ('pk1', 1)), ("pid1", ('pk2', 1)), @@ -481,7 +498,7 @@ def test_local_sample_fixed_per_key_requires_no_discarding(self): n = 3 sample_fixed_per_key_result = list( - self.ops.sample_fixed_per_key(input_col, n)) + self.backend.sample_fixed_per_key(input_col, n)) expected_result = [("pid1", [('pk1', 1), ('pk2', 1), ('pk3', 1)]), ("pid2", [('pk4', 1)])] @@ -495,7 +512,7 @@ def test_local_sample_fixed_per_key_with_sampling(self): n = 3 sample_fixed_per_key_result = list( - self.ops.sample_fixed_per_key(input_col, n)) + self.backend.sample_fixed_per_key(input_col, n)) self.assertTrue( all( @@ -504,16 +521,16 @@ def test_local_sample_fixed_per_key_with_sampling(self): def test_local_flat_map(self): input_col = [[1, 2, 3, 4], [5, 6, 7, 8]] - self.assertEqual(list(self.ops.flat_map(input_col, lambda x: x)), + self.assertEqual(list(self.backend.flat_map(input_col, lambda x: x)), [1, 2, 3, 4, 5, 6, 7, 8]) input_col = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])] - self.assertEqual(list(self.ops.flat_map(input_col, lambda x: x[1])), + self.assertEqual(list(self.backend.flat_map(input_col, lambda x: x[1])), [1, 2, 3, 4, 5, 6, 7, 8]) self.assertEqual( list( - self.ops.flat_map(input_col, - lambda x: [(x[0], y) for y in x[1]])), + self.backend.flat_map(input_col, + lambda x: [(x[0], y) for y in x[1]])), [("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 5), ("b", 6), ("b", 7), ("b", 8)]) @@ -521,7 +538,7 @@ def test_local_group_by_key(self): some_dict = [("cheese", "brie"), ("bread", "sourdough"), ("cheese", "swiss")] - self.assertEqual(list(self.ops.group_by_key(some_dict)), + self.assertEqual(list(self.backend.group_by_key(some_dict)), [("cheese", ["brie", "swiss"]), ("bread", ["sourdough"])]) @@ -544,7 +561,7 @@ def value_extract(x): @classmethod def setUpClass(cls): - cls.ops = MultiProcLocalBackend(n_jobs=1) + cls.backend = MultiProcLocalBackend(n_jobs=1) cls.data_extractors = DataExtractors( partition_extractor=cls.partition_extract, privacy_id_extractor=cls.privacy_id_extract, @@ -591,94 +608,101 @@ def assertDatasetsEqual(self, first, second, toplevel=True): @pytest.mark.timeout(10) def test_multiproc_map(self): - self.assertDatasetsEqual(list(self.ops.map([], lambda x: x / 0)), []) - self.assertDatasetsEqual(list(self.ops.map([1, 2, 3], str)), + self.assertDatasetsEqual(list(self.backend.map([], lambda x: x / 0)), + []) + self.assertDatasetsEqual(list(self.backend.map([1, 2, 3], str)), ["1", "2", "3"]) - self.assertDatasetsEqual(list(self.ops.map(range(5), lambda x: x**2)), - [0, 1, 4, 9, 16]) + self.assertDatasetsEqual( + list(self.backend.map(range(5), lambda x: x**2)), [0, 1, 4, 9, 16]) @pytest.mark.timeout(10) def test_multiproc_map_tuple(self): tuple_list = [(1, 2), (2, 3), (3, 4)] self.assertDatasetsEqual( - list(self.ops.map_tuple(tuple_list, lambda k, v: k + v)), [3, 5, 7]) + list(self.backend.map_tuple(tuple_list, lambda k, v: k + v)), + [3, 5, 7]) self.assertDatasetsEqual( - list(self.ops.map_tuple(tuple_list, lambda k, v: (str(k), str(v)))), - [("1", "2"), ("2", "3"), ("3", "4")]) + list( + self.backend.map_tuple(tuple_list, lambda k, v: + (str(k), str(v)))), [("1", "2"), + ("2", "3"), + ("3", "4")]) @pytest.mark.timeout(10) def test_multiproc_map_values(self): - self.assertDatasetsEqual(list(self.ops.map_values([], lambda x: x / 0)), - []) + self.assertDatasetsEqual( + list(self.backend.map_values([], lambda x: x / 0)), []) tuple_list = [(1, 2), (2, 3), (3, 4)] - self.assertDatasetsEqual(list(self.ops.map_values(tuple_list, str)), + self.assertDatasetsEqual(list(self.backend.map_values(tuple_list, str)), [(1, "2"), (2, "3"), (3, "4")]) self.assertDatasetsEqual( - list(self.ops.map_values(tuple_list, lambda x: x**2)), [(1, 4), - (2, 9), - (3, 16)]) + list(self.backend.map_values(tuple_list, lambda x: x**2)), + [(1, 4), (2, 9), (3, 16)]) @pytest.mark.timeout(10) def test_multiproc_group_by_key(self): some_dict = [("cheese", "brie"), ("bread", "sourdough"), ("cheese", "swiss")] - self.assertDatasetsEqual(list(self.ops.group_by_key(some_dict)), + self.assertDatasetsEqual(list(self.backend.group_by_key(some_dict)), [("cheese", ["brie", "swiss"]), ("bread", ["sourdough"])]) @pytest.mark.timeout(10) def test_multiproc_filter(self): - self.assertDatasetsEqual(list(self.ops.filter([], lambda x: True)), []) - self.assertDatasetsEqual(list(self.ops.filter([], lambda x: False)), []) + self.assertDatasetsEqual(list(self.backend.filter([], lambda x: True)), + []) + self.assertDatasetsEqual(list(self.backend.filter([], lambda x: False)), + []) example_list = [1, 2, 2, 3, 3, 4, 2] self.assertDatasetsEqual( - list(self.ops.filter(example_list, lambda x: x % 2)), [1, 3, 3]) + list(self.backend.filter(example_list, lambda x: x % 2)), [1, 3, 3]) self.assertDatasetsEqual( - list(self.ops.filter(example_list, lambda x: x < 3)), [1, 2, 2, 2]) + list(self.backend.filter(example_list, lambda x: x < 3)), + [1, 2, 2, 2]) @pytest.mark.timeout(10) def test_multiproc_filter_by_key_empty_keys_to_keep(self): col = [(7, 1), (2, 1), (3, 9), (4, 1), (9, 10)] keys_to_keep = [] - result = self.ops.filter_by_key(col, keys_to_keep, "filter_by_key") + result = self.backend.filter_by_key(col, keys_to_keep, "filter_by_key") self.assertEqual(list(result), []) @pytest.mark.timeout(10) def test_multiproc_filter_by_key_remove(self): col = [(7, 1), (2, 1), (3, 9), (4, 1), (9, 10)] keys_to_keep = [7, 9] - result = self.ops.filter_by_key(col, keys_to_keep, "filter_by_keys") + result = self.backend.filter_by_key(col, keys_to_keep, "filter_by_keys") self.assertDatasetsEqual(list(result), [(7, 1), (9, 10)]) @pytest.mark.timeout(10) def test_multiproc_keys(self): - self.assertEqual(list(self.ops.keys([])), []) + self.assertEqual(list(self.backend.keys([])), []) example_list = [(1, 2), (2, 3), (3, 4), (4, 8)] - self.assertDatasetsEqual(list(self.ops.keys(example_list)), + self.assertDatasetsEqual(list(self.backend.keys(example_list)), [1, 2, 3, 4]) @pytest.mark.timeout(10) def test_multiproc_values(self): - self.assertEqual(list(self.ops.values([])), []) + self.assertEqual(list(self.backend.values([])), []) example_list = [(1, 2), (2, 3), (3, 4), (4, 8)] - self.assertDatasetsEqual(list(self.ops.values(example_list)), + self.assertDatasetsEqual(list(self.backend.values(example_list)), [2, 3, 4, 8]) @pytest.mark.timeout(10) def test_multiproc_count_per_element(self): example_list = [1, 2, 3, 4, 5, 6, 1, 4, 0, 1] - result = dict(self.ops.count_per_element(example_list)) + result = dict(self.backend.count_per_element(example_list)) self.assertDictEqual(result, {1: 3, 2: 1, 3: 1, 4: 2, 5: 1, 6: 1, 0: 1}) @@ -689,7 +713,7 @@ def test_multiproc_sample_fixed_per_key_requires_no_discarding(self): n = 3 sample_fixed_per_key_result = list( - self.ops.sample_fixed_per_key(input_col, n)) + self.backend.sample_fixed_per_key(input_col, n)) expected_result = [("pid1", [('pk1', 1), ('pk2', 1), ('pk3', 1)]), ("pid2", [('pk4', 1)])] @@ -704,7 +728,7 @@ def test_multiproc_sample_fixed_per_key_with_sampling(self): n = 3 sample_fixed_per_key_result = list( - self.ops.sample_fixed_per_key(input_col, n)) + self.backend.sample_fixed_per_key(input_col, n)) self.assertTrue( all( @@ -715,17 +739,17 @@ def test_multiproc_sample_fixed_per_key_with_sampling(self): def test_multiproc_flat_map(self): input_col = [[1, 2, 3, 4], [5, 6, 7, 8]] self.assertDatasetsEqual( - list(self.ops.flat_map(input_col, lambda x: x)), + list(self.backend.flat_map(input_col, lambda x: x)), [1, 2, 3, 4, 5, 6, 7, 8]) input_col = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])] self.assertDatasetsEqual( - list(self.ops.flat_map(input_col, lambda x: x[1])), + list(self.backend.flat_map(input_col, lambda x: x[1])), [1, 2, 3, 4, 5, 6, 7, 8]) self.assertDatasetsEqual( list( - self.ops.flat_map(input_col, - lambda x: [(x[0], y) for y in x[1]])), + self.backend.flat_map(input_col, + lambda x: [(x[0], y) for y in x[1]])), [("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 5), ("b", 6), ("b", 7), ("b", 8)]) @@ -734,7 +758,7 @@ def test_multiproc_group_by_key(self): some_dict = [("cheese", "brie"), ("bread", "sourdough"), ("cheese", "swiss")] - result = sorted(self.ops.group_by_key(some_dict)) + result = sorted(self.backend.group_by_key(some_dict)) result = [(k, sorted(v)) for k, v in result] self.assertEqual(result, [ @@ -759,14 +783,14 @@ def assert_laziness(operator, *args): # lazy operators accept exceptions_generator_function() # as argument without raising errors: - assert_laziness(self.ops.map, str) - assert_laziness(self.ops.map_values, str) - assert_laziness(self.ops.filter, bool) - assert_laziness(self.ops.values) - assert_laziness(self.ops.keys) - assert_laziness(self.ops.count_per_element) - assert_laziness(self.ops.flat_map, str) - assert_laziness(self.ops.sample_fixed_per_key, int) + assert_laziness(self.backend.map, str) + assert_laziness(self.backend.map_values, str) + assert_laziness(self.backend.filter, bool) + assert_laziness(self.backend.values) + assert_laziness(self.backend.keys) + assert_laziness(self.backend.count_per_element) + assert_laziness(self.backend.flat_map, str) + assert_laziness(self.backend.sample_fixed_per_key, int) # TODO: Extend the proper Accumulator class once it's available.