Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contribution Bounding - per partition and cross partition bounding - continued from #26 pull request #32

Merged
merged 26 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipeline_dp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from pipeline_dp.aggregate_params import Metrics
from pipeline_dp.dp_engine import DataExtractors
from pipeline_dp.dp_engine import DPEngine
from pipeline_dp.pipeline_operations import LocalPipelineOperations
from pipeline_dp.pipeline_operations import BeamOperations
from pipeline_dp.pipeline_operations import SparkRDDOperations
44 changes: 43 additions & 1 deletion pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from pipeline_dp.pipeline_operations import PipelineOperations
from pipeline_dp.report_generator import ReportGenerator


@dataclass
class DataExtractors:
"""Data extractors

A set of functions that, given an input, return the privacy id, partition key,
and value.
"""
Expand Down Expand Up @@ -49,3 +49,45 @@ def aggregate(self, col, params: AggregateParams,
# TODO: implement aggregate().
# It returns input for now, just to ensure that the an example works.
return col

def _bound_cross_partition_contributions(self, col,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rename to _bound_contributions (sry I realized that the name I provided is not correct)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

max_partitions_contributed: int,
max_contributions_per_partition: int,
aggregator_fn):
"""
Bounds the contribution by privacy_id in and cross partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add "." after this phrase and then after each args description and return description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Args:
col: collection, with types of each element: (privacy_id,
partition_key, value)
max_partitions_contributed: maximum number of partitions that one
privacy id can contribute to
max_contributions_per_partition: maximum number of records that one
privacy id can contribute to one partition
aggregator_fn: function that takes a list of values and returns an
aggregator object which handles all aggregation logic.

return: collection with elements ((privacy_id, partition_key),
aggregator)
"""
# per partition-contribution bounding with bounding of each contribution
col = self._ops.map_tuple(col, lambda pid, pk, v: ((pid, pk), v),
"To (privacy_id, partition_key), value))")
col = self._ops.sample_fixed_per_key(col,
max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])
col = self._ops.map(col, lambda pid_pk: (pid_pk[0], aggregator_fn(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please apply aggregator_fn per-partition contribution bounding, no need to do any aggregation after cross-partition bounding

Clarification:
eventually we need to aggregate per pk, we do it in 2 steps:
1.Aggregate per (pid, pk) (after per-partition contributions, this PR)
2.Aggregate per pk (after cross-partition contributions), but that's outside of the scope of this task

Aggregation in 2 is slightly more complicated, because in 1 we can assume that data per key is in memory (because we've done bounding), but in 2 we can't that assume, so it's needed to use group by key.

That makes sense! Thanks!
Moved it to after per partition

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (optional/style improvements): using map_values() instead of map() is simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! Done

pid_pk[1])), "Apply aggregate_fn after per partition bounding")
# ((privacy_id, partition_key), aggregator)
dvadym marked this conversation as resolved.
Show resolved Hide resolved

# Cross partition bounding
col = self._ops.map_tuple(col, lambda pid_pk, v: (pid_pk[0],
(pid_pk[1], v)),
"To (privacy_id, (partition_key, aggregator))")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update a stage name to "Rekey to ...".

Clarification:the operation of changing keys is needed very often and usually we use Rekey name for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

col = self._ops.sample_fixed_per_key(col, max_partitions_contributed,
"Sample per privacy_id")
# (privacy_id, [(partition_key, aggregator)])
return self._ops.flat_map(col, lambda pid: [((pid[0], pk_v[0]), pk_v[1])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: lambda in this line is pretty complicated, maybe try a local function instead of lambda.

and please use lazy evaluations (there are 2 options "yield" or "(...)" generator)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for pk_v in pid[1]],
"Unnest")

33 changes: 28 additions & 5 deletions pipeline_dp/pipeline_operations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Adapters for working with pipeline frameworks."""

import random
import collections
import numpy as np

import abc
import apache_beam as beam
Expand All @@ -16,6 +18,10 @@ class PipelineOperations(abc.ABC):
def map(self, col, fn, stage_name: str):
pass

@abc.abstractmethod
def flat_map(self, col, fn, stage_name: str):
pass

@abc.abstractmethod
def map_tuple(self, col, fn, stage_name: str):
pass
Expand Down Expand Up @@ -55,8 +61,11 @@ class BeamOperations(PipelineOperations):
def map(self, col, fn, stage_name: str):
return col | stage_name >> beam.Map(fn)

def flat_map(self, col, fn, stage_name: str):
return col | stage_name >> beam.FlatMap(fn)

def map_tuple(self, col, fn, stage_name: str):
return col | stage_name >> beam.MapTuple(fn)
return col | stage_name >> beam.Map(lambda x: fn(*x))

def map_values(self, col, fn, stage_name: str):
return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v)))
Expand Down Expand Up @@ -152,8 +161,11 @@ class LocalPipelineOperations(PipelineOperations):
def map(self, col, fn, stage_name: typing.Optional[str] = None):
return map(fn, col)

def map_tuple(self, col, fn, stage_name: typing.Optional[str] = None):
return (fn(k, v) for k, v in col)
def flat_map(self, col, fn, stage_name: str):
return (x for el in col for x in fn(el))

def map_tuple(self, col, fn, stage_name: str = None):
return map(lambda x: fn(*x), col)

def map_values(self, col, fn, stage_name: typing.Optional[str] = None):
return ((k, fn(v)) for k, v in col)
Expand All @@ -176,8 +188,19 @@ def keys(self, col, stage_name: str):
def values(self, col, stage_name: typing.Optional[str] = None):
return (v for k, v in col)

def sample_fixed_per_key(self, col, n: int, stage_name: str):
pass
def sample_fixed_per_key(self, col, n: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

stage_name: typing.Optional[str] = None):
def sample_fixed_per_key_generator():
for item in self.group_by_key(col):
dvadym marked this conversation as resolved.
Show resolved Hide resolved
key = item[0]
values = item[1]
if len(values) > n:
sampled_indices = np.random.choice(range(len(values)), n,
replace=False)
values = [values[i] for i in sampled_indices]
yield key, values

return sample_fixed_per_key_generator()
dvadym marked this conversation as resolved.
Show resolved Hide resolved

def count_per_element(self, col, stage_name: typing.Optional[str] = None):
yield from collections.Counter(col).items()
129 changes: 118 additions & 11 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,141 @@
"""DPEngine Test"""

import collections
import numpy as np
import unittest

from pipeline_dp.aggregate_params import AggregateParams, Metrics
from pipeline_dp.dp_engine import DPEngine
import pipeline_dp

"""DPEngine Test"""


class dp_engineTest(unittest.TestCase):
aggregator_fn = lambda input_values: (len(input_values),
np.sum(input_values),
np.sum(np.square(input_values)))

def test_contribution_bounding_empty_col(self):
input_col = []
max_partitions_contributed = 2
max_contributions_per_partition = 2

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_cross_partition_contributions(
dvadym marked this conversation as resolved.
Show resolved Hide resolved
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

self.assertFalse(bound_result)

def test_contribution_bounding_bound_input_nothing_dropped(self):
input_col = [("pid1", 'pk1', 1),
("pid1", 'pk1', 2),
("pid1", 'pk2', 3),
("pid1", 'pk2', 4)]
max_partitions_contributed = 2
max_contributions_per_partition = 2

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_cross_partition_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

expected_result = [(('pid1', 'pk2'), (2, 7, 25)),
(('pid1', 'pk1'), (2, 3, 5))]
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_per_partition_bounding_applied(self):
input_col = [("pid1", 'pk1', 1),
("pid1", 'pk1', 2),
("pid1", 'pk2', 3),
("pid1", 'pk2', 4),
("pid1", 'pk2', 5),
("pid2", 'pk2', 6)]
max_partitions_contributed = 5
max_contributions_per_partition = 2

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_cross_partition_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

self.assertEqual(len(bound_result), 3)
# Check contributions per partitions
dvadym marked this conversation as resolved.
Show resolved Hide resolved
self.assertTrue(all(map(
lambda op_val: op_val[1][0] <= max_contributions_per_partition,
bound_result)))

def test_contribution_bounding_cross_partition_bounding_applied(self):
input_col = [("pid1", 'pk1', 1),
("pid1", 'pk1', 2),
("pid1", 'pk2', 3),
("pid1", 'pk2', 4),
("pid1", 'pk2', 5),
("pid1", 'pk3', 6),
("pid1", 'pk4', 7),
("pid2", 'pk4', 8)]
max_partitions_contributed = 3
max_contributions_per_partition = 5

dp_engine = pipeline_dp.DPEngine(
pipeline_dp.BudgetAccountant(epsilon=1, delta=1e-10),
pipeline_dp.LocalPipelineOperations())
bound_result = list(dp_engine._bound_cross_partition_contributions(
input_col,
max_partitions_contributed=max_partitions_contributed,
max_contributions_per_partition=max_contributions_per_partition,
aggregator_fn=dp_engineTest.aggregator_fn))

self.assertEqual(len(bound_result), 4)
# Check contributions per partitions
self.assertTrue(all(map(
lambda op_val: op_val[1][0] <= max_contributions_per_partition,
bound_result)))
# Check cross partition contributions
dict_of_pid_to_pk = collections.defaultdict(lambda: [])
for key, _ in bound_result:
dict_of_pid_to_pk[key[0]].append(key[1])
self.assertEqual(len(dict_of_pid_to_pk), 2)
self.assertTrue(
all(map(lambda key: len(
dict_of_pid_to_pk[key]) <= max_partitions_contributed,
dict_of_pid_to_pk)))

class DPEngineTest(unittest.TestCase):
def test_aggregate_none(self):
self.assertIsNone(DPEngine(None, None).aggregate(None, None, None))
self.assertIsNone(pipeline_dp.DPEngine(None, None).aggregate(None, None,
None))

def test_aggregate_report(self):
params1 = AggregateParams(
params1 = pipeline_dp.AggregateParams(
dvadym marked this conversation as resolved.
Show resolved Hide resolved
max_partitions_contributed=3,
max_contributions_per_partition=2,
low=1,
high=5,
metrics=[Metrics.PRIVACY_ID_COUNT, Metrics.COUNT, Metrics.MEAN],
metrics=[pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.MEAN],
)
params2 = AggregateParams(
params2 = pipeline_dp.AggregateParams(
max_partitions_contributed=1,
max_contributions_per_partition=3,
low=2,
high=10,
metrics=[Metrics.VAR, Metrics.SUM, Metrics.MEAN],
public_partitions = list(range(1,40)),
metrics=[pipeline_dp.Metrics.VAR, pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN],
public_partitions = list(range(1, 40)),
)
engine = DPEngine(None, None)
engine = pipeline_dp.DPEngine(None, None)
engine.aggregate(None, params1, None)
engine.aggregate(None, params2, None)
self.assertEqual(len(engine._report_generators), 2) # pylint: disable=protected-access


if __name__ == '__main__':
unittest.main()