diff --git a/pipeline_dp/__init__.py b/pipeline_dp/__init__.py index 746b03ec..94485b0f 100644 --- a/pipeline_dp/__init__.py +++ b/pipeline_dp/__init__.py @@ -3,5 +3,6 @@ from pipeline_dp.aggregate_params import Metrics from pipeline_dp.dp_engine import DataExtractors from pipeline_dp.dp_engine import DPEngine +from pipeline_dp.pipeline_operations import LocalPipelineOperations from pipeline_dp.pipeline_operations import BeamOperations from pipeline_dp.pipeline_operations import SparkRDDOperations diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index 5ad4655a..5bcf7b0f 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -8,10 +8,10 @@ from pipeline_dp.pipeline_operations import PipelineOperations from pipeline_dp.report_generator import ReportGenerator + @dataclass class DataExtractors: """Data extractors - A set of functions that, given an input, return the privacy id, partition key, and value. """ @@ -49,3 +49,50 @@ def aggregate(self, col, params: AggregateParams, # TODO: implement aggregate(). # It returns input for now, just to ensure that the an example works. return col + + def _bound_contributions(self, col, + max_partitions_contributed: int, + max_contributions_per_partition: int, + aggregator_fn): + """ + Bounds the contribution by privacy_id in and cross partitions. + Args: + col: collection, with types of each element: (privacy_id, + partition_key, value). + max_partitions_contributed: maximum number of partitions that one + privacy id can contribute to. + max_contributions_per_partition: maximum number of records that one + privacy id can contribute to one partition. + aggregator_fn: function that takes a list of values and returns an + aggregator object which handles all aggregation logic. + + return: collection with elements ((privacy_id, partition_key), + aggregator). + """ + # per partition-contribution bounding with bounding of each contribution + col = self._ops.map_tuple(col, lambda pid, pk, v: ((pid, pk), v), + "Rekey to ( (privacy_id, partition_key), value))") + col = self._ops.sample_fixed_per_key(col, + max_contributions_per_partition, + "Sample per (privacy_id, partition_key)") + # ((privacy_id, partition_key), [value]) + col = self._ops.map_values(col, aggregator_fn, + "Apply aggregate_fn after per partition bounding") + # ((privacy_id, partition_key), aggregator) + + # Cross partition bounding + col = self._ops.map_tuple(col, lambda pid_pk, v: (pid_pk[0], + (pid_pk[1], v)), + "Rekey to (privacy_id, (partition_key, " + "aggregator))") + col = self._ops.sample_fixed_per_key(col, max_partitions_contributed, + "Sample per privacy_id") + # (privacy_id, [(partition_key, aggregator)]) + + def unnest_cross_partition_bound_sampled_per_key(pid_pk_v): + pid, pk_values = pid_pk_v + return (((pid, pk), v) for (pk, v) in pk_values) + + return self._ops.flat_map(col, + unnest_cross_partition_bound_sampled_per_key, + "Unnest") diff --git a/pipeline_dp/pipeline_operations.py b/pipeline_dp/pipeline_operations.py index 84200d8d..9d941031 100644 --- a/pipeline_dp/pipeline_operations.py +++ b/pipeline_dp/pipeline_operations.py @@ -1,6 +1,8 @@ """Adapters for working with pipeline frameworks.""" import random +import collections +import numpy as np import abc import apache_beam as beam @@ -16,6 +18,10 @@ class PipelineOperations(abc.ABC): def map(self, col, fn, stage_name: str): pass + @abc.abstractmethod + def flat_map(self, col, fn, stage_name: str): + pass + @abc.abstractmethod def map_tuple(self, col, fn, stage_name: str): pass @@ -55,8 +61,11 @@ class BeamOperations(PipelineOperations): def map(self, col, fn, stage_name: str): return col | stage_name >> beam.Map(fn) + def flat_map(self, col, fn, stage_name: str): + return col | stage_name >> beam.FlatMap(fn) + def map_tuple(self, col, fn, stage_name: str): - return col | stage_name >> beam.MapTuple(fn) + return col | stage_name >> beam.Map(lambda x: fn(*x)) def map_values(self, col, fn, stage_name: str): return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v))) @@ -96,6 +105,9 @@ class SparkRDDOperations(PipelineOperations): def map(self, rdd, fn, stage_name: str = None): return rdd.map(fn) + def flat_map(self, rdd, fn, stage_name: str = None): + return rdd.flatMap(fn) + def map_tuple(self, rdd, fn, stage_name: str = None): return rdd.map(fn) @@ -152,8 +164,11 @@ class LocalPipelineOperations(PipelineOperations): def map(self, col, fn, stage_name: typing.Optional[str] = None): return map(fn, col) - def map_tuple(self, col, fn, stage_name: typing.Optional[str] = None): - return (fn(k, v) for k, v in col) + def flat_map(self, col, fn, stage_name: str = None): + return (x for el in col for x in fn(el)) + + def map_tuple(self, col, fn, stage_name: str = None): + return map(lambda x: fn(*x), col) def map_values(self, col, fn, stage_name: typing.Optional[str] = None): return ((k, fn(v)) for k, v in col) @@ -176,8 +191,19 @@ def keys(self, col, stage_name: str): def values(self, col, stage_name: typing.Optional[str] = None): return (v for k, v in col) - def sample_fixed_per_key(self, col, n: int, stage_name: str): - pass + def sample_fixed_per_key(self, col, n: int, + stage_name: typing.Optional[str] = None): + def sample_fixed_per_key_generator(): + for item in self.group_by_key(col): + key = item[0] + values = item[1] + if len(values) > n: + sampled_indices = np.random.choice(range(len(values)), n, + replace=False) + values = [values[i] for i in sampled_indices] + yield key, values + + return sample_fixed_per_key_generator() def count_per_element(self, col, stage_name: typing.Optional[str] = None): yield from collections.Counter(col).items() diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index 7a6fc1c7..af6e3933 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -1,34 +1,141 @@ -"""DPEngine Test""" +import collections +import numpy as np import unittest -from pipeline_dp.aggregate_params import AggregateParams, Metrics -from pipeline_dp.dp_engine import DPEngine +import pipeline_dp + +"""DPEngine Test""" + + +class dp_engineTest(unittest.TestCase): + aggregator_fn = lambda input_values: (len(input_values), + np.sum(input_values), + np.sum(np.square(input_values))) + + def test_contribution_bounding_empty_col(self): + input_col = [] + max_partitions_contributed = 2 + max_contributions_per_partition = 2 + + dp_engine = pipeline_dp.DPEngine( + pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10), + pipeline_dp.LocalPipelineOperations()) + bound_result = list(dp_engine._bound_contributions( + input_col, + max_partitions_contributed=max_partitions_contributed, + max_contributions_per_partition=max_contributions_per_partition, + aggregator_fn=dp_engineTest.aggregator_fn)) + + self.assertFalse(bound_result) + + def test_contribution_bounding_bound_input_nothing_dropped(self): + input_col = [("pid1", 'pk1', 1), + ("pid1", 'pk1', 2), + ("pid1", 'pk2', 3), + ("pid1", 'pk2', 4)] + max_partitions_contributed = 2 + max_contributions_per_partition = 2 + + dp_engine = pipeline_dp.DPEngine( + pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10), + pipeline_dp.LocalPipelineOperations()) + bound_result = list(dp_engine._bound_contributions( + input_col, + max_partitions_contributed=max_partitions_contributed, + max_contributions_per_partition=max_contributions_per_partition, + aggregator_fn=dp_engineTest.aggregator_fn)) + + expected_result = [(('pid1', 'pk2'), (2, 7, 25)), + (('pid1', 'pk1'), (2, 3, 5))] + self.assertEqual(set(expected_result), set(bound_result)) + + def test_contribution_bounding_per_partition_bounding_applied(self): + input_col = [("pid1", 'pk1', 1), + ("pid1", 'pk1', 2), + ("pid1", 'pk2', 3), + ("pid1", 'pk2', 4), + ("pid1", 'pk2', 5), + ("pid2", 'pk2', 6)] + max_partitions_contributed = 5 + max_contributions_per_partition = 2 + + dp_engine = pipeline_dp.DPEngine( + pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10), + pipeline_dp.LocalPipelineOperations()) + bound_result = list(dp_engine._bound_contributions( + input_col, + max_partitions_contributed=max_partitions_contributed, + max_contributions_per_partition=max_contributions_per_partition, + aggregator_fn=dp_engineTest.aggregator_fn)) + + self.assertEqual(len(bound_result), 3) + # Check contributions per partitions + self.assertTrue(all(map( + lambda op_val: op_val[1][0] <= max_contributions_per_partition, + bound_result))) + + def test_contribution_bounding_cross_partition_bounding_applied(self): + input_col = [("pid1", 'pk1', 1), + ("pid1", 'pk1', 2), + ("pid1", 'pk2', 3), + ("pid1", 'pk2', 4), + ("pid1", 'pk2', 5), + ("pid1", 'pk3', 6), + ("pid1", 'pk4', 7), + ("pid2", 'pk4', 8)] + max_partitions_contributed = 3 + max_contributions_per_partition = 5 + + dp_engine = pipeline_dp.DPEngine( + pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10), + pipeline_dp.LocalPipelineOperations()) + bound_result = list(dp_engine._bound_contributions( + input_col, + max_partitions_contributed=max_partitions_contributed, + max_contributions_per_partition=max_contributions_per_partition, + aggregator_fn=dp_engineTest.aggregator_fn)) + + self.assertEqual(len(bound_result), 4) + # Check contributions per partitions + self.assertTrue(all(map( + lambda op_val: op_val[1][0] <= max_contributions_per_partition, + bound_result))) + # Check cross partition contributions + dict_of_pid_to_pk = collections.defaultdict(lambda: []) + for key, _ in bound_result: + dict_of_pid_to_pk[key[0]].append(key[1]) + self.assertEqual(len(dict_of_pid_to_pk), 2) + self.assertTrue( + all(map(lambda key: len( + dict_of_pid_to_pk[key]) <= max_partitions_contributed, + dict_of_pid_to_pk))) -class DPEngineTest(unittest.TestCase): def test_aggregate_none(self): - self.assertIsNone(DPEngine(None, None).aggregate(None, None, None)) + self.assertIsNone(pipeline_dp.DPEngine(None, None).aggregate(None, None, + None)) def test_aggregate_report(self): - params1 = AggregateParams( + params1 = pipeline_dp.AggregateParams( max_partitions_contributed=3, max_contributions_per_partition=2, low=1, high=5, - metrics=[Metrics.PRIVACY_ID_COUNT, Metrics.COUNT, Metrics.MEAN], + metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.MEAN], ) - params2 = AggregateParams( + params2 = pipeline_dp.AggregateParams( max_partitions_contributed=1, max_contributions_per_partition=3, low=2, high=10, - metrics=[Metrics.VAR, Metrics.SUM, Metrics.MEAN], - public_partitions = list(range(1,40)), + metrics=[pipeline_dp.Metrics.VAR, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN], + public_partitions = list(range(1, 40)), ) - engine = DPEngine(None, None) + engine = pipeline_dp.DPEngine(None, None) engine.aggregate(None, params1, None) engine.aggregate(None, params2, None) self.assertEqual(len(engine._report_generators), 2) # pylint: disable=protected-access + if __name__ == '__main__': unittest.main() diff --git a/tests/pipeline_operations_test.py b/tests/pipeline_operations_test.py index a5e824d3..4590872e 100644 --- a/tests/pipeline_operations_test.py +++ b/tests/pipeline_operations_test.py @@ -39,6 +39,26 @@ def test_count_per_element(self): def tearDownClass(cls): cls.sc.stop() + def test_flat_map(self): + spark_operations = SparkRDDOperations() + data = [[1, 2, 3, 4], [5, 6, 7, 8]] + dist_data = SparkRDDOperationsTest.sc.parallelize(data) + self.assertEqual(spark_operations.flat_map(dist_data, lambda x: + x).collect(), + [1, 2, 3, 4, 5, 6, 7, 8]) + + data = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])] + dist_data = SparkRDDOperationsTest.sc.parallelize(data) + self.assertEqual(spark_operations.flat_map(dist_data, lambda x: x[ + 1]).collect(), + [1, 2, 3, 4, 5, 6, 7, 8]) + self.assertEqual( + spark_operations.flat_map(dist_data, lambda x: [(x[0], y) for + y in x[ + 1]]).collect(), + [("a", 1), ("a", 2), ("a", 3), ("a", 4), + ("b", 5), ("b", 6), ("b", 7), ("b", 8)]) + class LocalPipelineOperationsTest(unittest.TestCase): @classmethod @@ -132,6 +152,54 @@ def assert_laziness(operator, *args): assert_laziness(self.ops.filter, bool) assert_laziness(self.ops.values) assert_laziness(self.ops.count_per_element) + assert_laziness(self.ops.flat_map, str) + assert_laziness(self.ops.sample_fixed_per_key, int) + + def test_local_sample_fixed_per_key_requires_no_discarding(self): + input_col = [("pid1", ('pk1', 1)), + ("pid1", ('pk2', 1)), + ("pid1", ('pk3', 1)), + ("pid2", ('pk4', 1))] + n = 3 + + sample_fixed_per_key_result = list(self.ops.sample_fixed_per_key( + input_col, n)) + + expected_result = [("pid1", [('pk1', 1), ('pk2', 1), ('pk3', 1)]), + ("pid2", [('pk4', 1)])] + self.assertEqual(sample_fixed_per_key_result, expected_result) + + def test_local_sample_fixed_per_key_with_sampling(self): + input_col = [(("pid1", "pk1"), 1), + (("pid1", "pk1"), 1), + (("pid1", "pk1"), 1), + (("pid1", "pk1"), 1), + (("pid1", "pk1"), 1), + (("pid1", "pk2"), 1), + (("pid1", "pk2"), 1)] + n = 3 + + sample_fixed_per_key_result = list(self.ops.sample_fixed_per_key( + input_col, n)) + + self.assertTrue( + all(map(lambda pid_pk_v: len(pid_pk_v[1]) <= n, + sample_fixed_per_key_result))) + + def test_local_flat_map(self): + input_col = [[1, 2, 3, 4], [5, 6, 7, 8]] + self.assertEqual(list(self.ops.flat_map(input_col, lambda x: x)), + [1, 2, 3, 4, 5, 6, 7, 8]) + + input_col = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])] + self.assertEqual(list(self.ops.flat_map(input_col, lambda x: x[1])), + [1, 2, 3, 4, 5, 6, 7, 8]) + self.assertEqual(list(self.ops.flat_map(input_col, + lambda x: [(x[0], y) for + y in x[1]] + )), + [("a", 1), ("a", 2), ("a", 3), ("a", 4), + ("b", 5), ("b", 6), ("b", 7), ("b", 8)]) if __name__ == '__main__':