Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed May 17, 2024
1 parent c591590 commit e56ceca
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 8 deletions.
18 changes: 11 additions & 7 deletions pipeline_dp/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
from pipeline_dp import pipeline_backend
from pipeline_dp import sampling_utils

# TODO(dvadym):
# 1. rename ContributionBounder -> ContributionSampler, because all those
# classes do contribution bounding only by sampling.
# 2. Introduce L0/Linf/L1 sampling in names (the current names are too long
# and not readable).


# TODO: rename ContributionBounder -> Sampler
class ContributionBounder(abc.ABC):
"""Interface for objects which perform contribution bounding."""

Expand Down Expand Up @@ -197,12 +202,11 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values):


class LinfSampler(ContributionBounder):
"""Bounds the contribution of privacy_id cross partitions.
"""Bounds the contribution of privacy_id per partition.
It ensures that each privacy_id contributes to not more than
max_partitions_contributed partitions (cross-partition contribution
bounding), by performing sampling if needed. It is assumed that the provided
aggregate_fn function bounds per-partition contributions.
It ensures that each privacy_id contributes to each partition not more than
max_contributions_per_partition records (per-partition contribution
bounding), by performing sampling if needed.
"""

def bound_contributions(self, col, params, backend, report_generator,
Expand All @@ -226,7 +230,7 @@ def bound_contributions(self, col, params, backend, report_generator,
"Apply aggregate_fn after cross-partition contribution bounding")


class NoOpContributionBounder(ContributionBounder):
class NoOpSampler(ContributionBounder):
"""Does no sampling."""

def bound_contributions(self, col, params, backend, report_generator,
Expand Down
2 changes: 1 addition & 1 deletion pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def _create_contribution_bounder(
return contribution_bounders.LinfSampler()
# No sampling, but combiners themselves do per partition contribution
# bounding.
return contribution_bounders.NoOpContributionBounder()
return contribution_bounders.NoOpSampler()

def _extract_columns(self, col,
data_extractors: pipeline_dp.DataExtractors):
Expand Down
57 changes: 57 additions & 0 deletions tests/contribution_bounders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
CrossAndPerPartitionContributionParams = collections.namedtuple(
"CrossAndPerPartitionContributionParams",
["max_partitions_contributed", "max_contributions_per_partition"])
PerPartitionContributionParams = collections.namedtuple(
"PerPartitionContributionParams", ["max_contributions_per_partition"])

aggregate_fn = lambda input_value: (len(input_value), np.sum(input_value),
np.sum(np.square(input_value)))
Expand Down Expand Up @@ -150,3 +152,58 @@ def test_contribution_bounding_empty_col(self):
bound_result = self._run_contribution_bounding(input, max_contributions)

self.assertEmpty(bound_result)


class LinfSampler(parameterized.TestCase):

def _run_sampling(self, input, max_contributions_per_partition):
params = PerPartitionContributionParams(max_contributions_per_partition)

bounder = contribution_bounders.LinfSampler()
return list(
bounder.bound_contributions(input, params,
pipeline_dp.LocalBackend(),
_create_report_generator(),
lambda x: x))

def test_samping_applied(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3),
('pid2', 'pk1', 4)]
max_contributions_per_partition = 1
bound_result = self._run_sampling(input,
max_contributions_per_partition)
bound_result = dict(bound_result)
# {(privacy_id, partition_key), [values])
self.assertLen(bound_result, 2)
self.assertLen(bound_result[('pid1', 'pk1')], 1)
self.assertLen(bound_result[('pid2', 'pk1')], 1)

def test_sampling_applied_nothing_dropped(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid1', 'pk1', 3)]
max_contributions_per_partition = 3

bound_result = self._run_sampling(input,
max_contributions_per_partition)
print(bound_result)
# self.assertEqual(set(expected_result), set(bound_result))

def test_empty_col(self):
self.assertEmpty(
self._run_sampling([], max_contributions_per_partition=1))


class NoOpContributionBounderTest(parameterized.TestCase):

def test_contribution_bounding_applied(self):
input = [('pid1', 'pk1', 1), ('pid1', 'pk1', 2), ('pid2', 'pk1', 3),
('pid3', 'pk2', 4)]
bounder = contribution_bounders.NoOpSampler()
bound_result = bounder.bound_contributions(
input,
params=(),
backend=pipeline_dp.LocalBackend(),
report_generator=_create_report_generator(),
aggregate_fn=lambda x: x)
self.assertEqual(list(bound_result), [(('pid1', 'pk1'), [1, 2]),
(('pid2', 'pk1'), [3]),
(('pid3', 'pk2'), [4])])
54 changes: 54 additions & 0 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,60 @@ def test_aggregate_computation_graph_per_partition_bounding(
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders.LinfSampler.bound_contributions')
def test_aggregate_computation_graph_only_linf_sampling(
self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.SUM],
min_value=0,
max_value=1,
max_partitions_contributed=1,
max_contributions_per_partition=1,
perform_cross_partition_contribution_bounding=False)

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders.NoOpSampler.bound_contributions')
def test_aggregate_computation_graph_only_no_sampling_for_sum_when_no_cross_partition(
self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.SUM],
min_sum_per_partition=0,
max_sum_per_partition=1,
max_partitions_contributed=1,
max_contributions_per_partition=1,
perform_cross_partition_contribution_bounding=False)

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.dp_engine.DPEngine._drop_partitions',)
def test_aggregate_no_partition_filtering_public_partitions(
self, mock_drop_partitions):
Expand Down

0 comments on commit e56ceca

Please sign in to comment.