Skip to content

Commit

Permalink
Contribution Bounding - per partition and cross partition bounding - …
Browse files Browse the repository at this point in the history
…continued from #26 pull request (#32)
  • Loading branch information
preethiraghavan1 authored May 27, 2021
1 parent 0b5505d commit 8db30eb
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 17 deletions.
1 change: 1 addition & 0 deletions pipeline_dp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from pipeline_dp.aggregate_params import Metrics
from pipeline_dp.dp_engine import DataExtractors
from pipeline_dp.dp_engine import DPEngine
from pipeline_dp.pipeline_operations import LocalPipelineOperations
from pipeline_dp.pipeline_operations import BeamOperations
from pipeline_dp.pipeline_operations import SparkRDDOperations
49 changes: 48 additions & 1 deletion pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from pipeline_dp.pipeline_operations import PipelineOperations
from pipeline_dp.report_generator import ReportGenerator


@dataclass
class DataExtractors:
"""Data extractors
A set of functions that, given an input, return the privacy id, partition key,
and value.
"""
Expand Down Expand Up @@ -49,3 +49,50 @@ def aggregate(self, col, params: AggregateParams,
# TODO: implement aggregate().
# It returns input for now, just to ensure that the an example works.
return col

def _bound_contributions(self, col,
max_partitions_contributed: int,
max_contributions_per_partition: int,
aggregator_fn):
"""
Bounds the contribution by privacy_id in and cross partitions.
Args:
col: collection, with types of each element: (privacy_id,
partition_key, value).
max_partitions_contributed: maximum number of partitions that one
privacy id can contribute to.
max_contributions_per_partition: maximum number of records that one
privacy id can contribute to one partition.
aggregator_fn: function that takes a list of values and returns an
aggregator object which handles all aggregation logic.
return: collection with elements ((privacy_id, partition_key),
aggregator).
"""
# per partition-contribution bounding with bounding of each contribution
col = self._ops.map_tuple(col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ( (privacy_id, partition_key), value))")
col = self._ops.sample_fixed_per_key(col,
max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])
col = self._ops.map_values(col, aggregator_fn,
"Apply aggregate_fn after per partition bounding")
# ((privacy_id, partition_key), aggregator)

# Cross partition bounding
col = self._ops.map_tuple(col, lambda pid_pk, v: (pid_pk[0],
(pid_pk[1], v)),
"Rekey to (privacy_id, (partition_key, "
"aggregator))")
col = self._ops.sample_fixed_per_key(col, max_partitions_contributed,
"Sample per privacy_id")
# (privacy_id, [(partition_key, aggregator)])

def unnest_cross_partition_bound_sampled_per_key(pid_pk_v):
pid, pk_values = pid_pk_v
return (((pid, pk), v) for (pk, v) in pk_values)

return self._ops.flat_map(col,
unnest_cross_partition_bound_sampled_per_key,
"Unnest")
36 changes: 31 additions & 5 deletions pipeline_dp/pipeline_operations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Adapters for working with pipeline frameworks."""

import random
import collections
import numpy as np

import abc
import apache_beam as beam
Expand All @@ -16,6 +18,10 @@ class PipelineOperations(abc.ABC):
def map(self, col, fn, stage_name: str):
pass

@abc.abstractmethod
def flat_map(self, col, fn, stage_name: str):
pass

@abc.abstractmethod
def map_tuple(self, col, fn, stage_name: str):
pass
Expand Down Expand Up @@ -55,8 +61,11 @@ class BeamOperations(PipelineOperations):
def map(self, col, fn, stage_name: str):
return col | stage_name >> beam.Map(fn)

def flat_map(self, col, fn, stage_name: str):
return col | stage_name >> beam.FlatMap(fn)

def map_tuple(self, col, fn, stage_name: str):
return col | stage_name >> beam.MapTuple(fn)
return col | stage_name >> beam.Map(lambda x: fn(*x))

def map_values(self, col, fn, stage_name: str):
return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v)))
Expand Down Expand Up @@ -96,6 +105,9 @@ class SparkRDDOperations(PipelineOperations):
def map(self, rdd, fn, stage_name: str = None):
return rdd.map(fn)

def flat_map(self, rdd, fn, stage_name: str = None):
return rdd.flatMap(fn)

def map_tuple(self, rdd, fn, stage_name: str = None):
return rdd.map(fn)

Expand Down Expand Up @@ -152,8 +164,11 @@ class LocalPipelineOperations(PipelineOperations):
def map(self, col, fn, stage_name: typing.Optional[str] = None):
return map(fn, col)

def map_tuple(self, col, fn, stage_name: typing.Optional[str] = None):
return (fn(k, v) for k, v in col)
def flat_map(self, col, fn, stage_name: str = None):
return (x for el in col for x in fn(el))

def map_tuple(self, col, fn, stage_name: str = None):
return map(lambda x: fn(*x), col)

def map_values(self, col, fn, stage_name: typing.Optional[str] = None):
return ((k, fn(v)) for k, v in col)
Expand All @@ -176,8 +191,19 @@ def keys(self, col, stage_name: str):
def values(self, col, stage_name: typing.Optional[str] = None):
return (v for k, v in col)

def sample_fixed_per_key(self, col, n: int, stage_name: str):
pass
def sample_fixed_per_key(self, col, n: int,
stage_name: typing.Optional[str] = None):
def sample_fixed_per_key_generator():
for item in self.group_by_key(col):
key = item[0]
values = item[1]
if len(values) > n:
sampled_indices = np.random.choice(range(len(values)), n,
replace=False)
values = [values[i] for i in sampled_indices]
yield key, values

return sample_fixed_per_key_generator()

def count_per_element(self, col, stage_name: typing.Optional[str] = None):
yield from collections.Counter(col).items()
129 changes: 118 additions & 11 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,141 @@
"""DPEngine Test"""

import collections
import numpy as np
import unittest

from pipeline_dp.aggregate_params import AggregateParams, Metrics
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp

"""DPEngine Test"""


class dp_engineTest(unittest.TestCase):
aggregator_fn = lambda input_values: (len(input_values),
np.sum(input_values),
np.sum(np.square(input_values)))

def test_contribution_bounding_empty_col(self):
input_col = []
max_partitions_contributed = 2
max_contributions_per_partition = 2

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

self.assertFalse(bound_result)

def test_contribution_bounding_bound_input_nothing_dropped(self):
input_col = [("pid1", 'pk1', 1),
("pid1", 'pk1', 2),
("pid1", 'pk2', 3),
("pid1", 'pk2', 4)]
max_partitions_contributed = 2
max_contributions_per_partition = 2

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

expected_result = [(('pid1', 'pk2'), (2, 7, 25)),
(('pid1', 'pk1'), (2, 3, 5))]
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_per_partition_bounding_applied(self):
input_col = [("pid1", 'pk1', 1),
("pid1", 'pk1', 2),
("pid1", 'pk2', 3),
("pid1", 'pk2', 4),
("pid1", 'pk2', 5),
("pid2", 'pk2', 6)]
max_partitions_contributed = 5
max_contributions_per_partition = 2

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

self.assertEqual(len(bound_result), 3)
# Check contributions per partitions
self.assertTrue(all(map(
lambda op_val: op_val[1][0] <= max_contributions_per_partition,
bound_result)))

def test_contribution_bounding_cross_partition_bounding_applied(self):
input_col = [("pid1", 'pk1', 1),
("pid1", 'pk1', 2),
("pid1", 'pk2', 3),
("pid1", 'pk2', 4),
("pid1", 'pk2', 5),
("pid1", 'pk3', 6),
("pid1", 'pk4', 7),
("pid2", 'pk4', 8)]
max_partitions_contributed = 3
max_contributions_per_partition = 5

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

self.assertEqual(len(bound_result), 4)
# Check contributions per partitions
self.assertTrue(all(map(
lambda op_val: op_val[1][0] <= max_contributions_per_partition,
bound_result)))
# Check cross partition contributions
dict_of_pid_to_pk = collections.defaultdict(lambda: [])
for key, _ in bound_result:
dict_of_pid_to_pk[key[0]].append(key[1])
self.assertEqual(len(dict_of_pid_to_pk), 2)
self.assertTrue(
all(map(lambda key: len(
dict_of_pid_to_pk[key]) <= max_partitions_contributed,
dict_of_pid_to_pk)))

class DPEngineTest(unittest.TestCase):
def test_aggregate_none(self):
self.assertIsNone(DPEngine(None, None).aggregate(None, None, None))
self.assertIsNone(pipeline_dp.DPEngine(None, None).aggregate(None, None,
None))

def test_aggregate_report(self):
params1 = AggregateParams(
params1 = pipeline_dp.AggregateParams(
max_partitions_contributed=3,
max_contributions_per_partition=2,
low=1,
high=5,
metrics=[Metrics.PRIVACY_ID_COUNT, Metrics.COUNT, Metrics.MEAN],
metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.MEAN],
)
params2 = AggregateParams(
params2 = pipeline_dp.AggregateParams(
max_partitions_contributed=1,
max_contributions_per_partition=3,
low=2,
high=10,
metrics=[Metrics.VAR, Metrics.SUM, Metrics.MEAN],
public_partitions = list(range(1,40)),
metrics=[pipeline_dp.Metrics.VAR, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN],
public_partitions = list(range(1, 40)),
)
engine = DPEngine(None, None)
engine = pipeline_dp.DPEngine(None, None)
engine.aggregate(None, params1, None)
engine.aggregate(None, params2, None)
self.assertEqual(len(engine._report_generators), 2) # pylint: disable=protected-access


if __name__ == '__main__':
unittest.main()
68 changes: 68 additions & 0 deletions tests/pipeline_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ def test_count_per_element(self):
def tearDownClass(cls):
cls.sc.stop()

def test_flat_map(self):
spark_operations = SparkRDDOperations()
data = [[1, 2, 3, 4], [5, 6, 7, 8]]
dist_data = SparkRDDOperationsTest.sc.parallelize(data)
self.assertEqual(spark_operations.flat_map(dist_data, lambda x:
x).collect(),
[1, 2, 3, 4, 5, 6, 7, 8])

data = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])]
dist_data = SparkRDDOperationsTest.sc.parallelize(data)
self.assertEqual(spark_operations.flat_map(dist_data, lambda x: x[
1]).collect(),
[1, 2, 3, 4, 5, 6, 7, 8])
self.assertEqual(
spark_operations.flat_map(dist_data, lambda x: [(x[0], y) for
y in x[
1]]).collect(),
[("a", 1), ("a", 2), ("a", 3), ("a", 4),
("b", 5), ("b", 6), ("b", 7), ("b", 8)])


class LocalPipelineOperationsTest(unittest.TestCase):
@classmethod
Expand Down Expand Up @@ -132,6 +152,54 @@ def assert_laziness(operator, *args):
assert_laziness(self.ops.filter, bool)
assert_laziness(self.ops.values)
assert_laziness(self.ops.count_per_element)
assert_laziness(self.ops.flat_map, str)
assert_laziness(self.ops.sample_fixed_per_key, int)

def test_local_sample_fixed_per_key_requires_no_discarding(self):
input_col = [("pid1", ('pk1', 1)),
("pid1", ('pk2', 1)),
("pid1", ('pk3', 1)),
("pid2", ('pk4', 1))]
n = 3

sample_fixed_per_key_result = list(self.ops.sample_fixed_per_key(
input_col, n))

expected_result = [("pid1", [('pk1', 1), ('pk2', 1), ('pk3', 1)]),
("pid2", [('pk4', 1)])]
self.assertEqual(sample_fixed_per_key_result, expected_result)

def test_local_sample_fixed_per_key_with_sampling(self):
input_col = [(("pid1", "pk1"), 1),
(("pid1", "pk1"), 1),
(("pid1", "pk1"), 1),
(("pid1", "pk1"), 1),
(("pid1", "pk1"), 1),
(("pid1", "pk2"), 1),
(("pid1", "pk2"), 1)]
n = 3

sample_fixed_per_key_result = list(self.ops.sample_fixed_per_key(
input_col, n))

self.assertTrue(
all(map(lambda pid_pk_v: len(pid_pk_v[1]) <= n,
sample_fixed_per_key_result)))

def test_local_flat_map(self):
input_col = [[1, 2, 3, 4], [5, 6, 7, 8]]
self.assertEqual(list(self.ops.flat_map(input_col, lambda x: x)),
[1, 2, 3, 4, 5, 6, 7, 8])

input_col = [("a", [1, 2, 3, 4]), ("b", [5, 6, 7, 8])]
self.assertEqual(list(self.ops.flat_map(input_col, lambda x: x[1])),
[1, 2, 3, 4, 5, 6, 7, 8])
self.assertEqual(list(self.ops.flat_map(input_col,
lambda x: [(x[0], y) for
y in x[1]]
)),
[("a", 1), ("a", 2), ("a", 3), ("a", 4),
("b", 5), ("b", 6), ("b", 7), ("b", 8)])


if __name__ == '__main__':
Expand Down

0 comments on commit 8db30eb

Please sign in to comment.