Skip to content

Commit

Permalink
perform_cross_partition
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed May 17, 2024
1 parent 32d6920 commit c591590
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
5 changes: 5 additions & 0 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class AggregateParams:
is ignored when public partitions are used.
More details on pre-thresholding are in
https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md
perform_cross_partition_contribution_bounding: whether to perform cross
partition contribution bounding.
Warning: turn off cross partition contribution bounding only when the
number of contributed partitions per privacy unit is already bounded
by max_partitions_contributed.
"""
metrics: List[Metric]
noise_kind: NoiseKind = NoiseKind.LAPLACE
Expand Down
27 changes: 14 additions & 13 deletions pipeline_dp/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pipeline_dp import sampling_utils


# TODO: rename ContributionBounder -> Sampler
class ContributionBounder(abc.ABC):
"""Interface for objects which perform contribution bounding."""

Expand Down Expand Up @@ -76,8 +77,8 @@ def bound_contributions(self, col, params, backend, report_generator,
"Sample per (privacy_id, partition_key)")
report_generator.add_stage(
f"Per-partition contribution bounding: for each privacy_id and each"
f"partition, randomly select max(actual_contributions_per_partition"
f", {max_contributions_per_partition}) contributions.")
f" partition, randomly select max(actual_contributions_per_partitio"
f"n, {max_contributions_per_partition}) contributions.")
# ((privacy_id, partition_key), [value])
col = backend.map_values(
col, aggregate_fn,
Expand Down Expand Up @@ -195,7 +196,7 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values):
"Apply aggregate_fn after cross-partition contribution bounding")


class SamplingPerPartitionContributionBounder(ContributionBounder):
class LinfSampler(ContributionBounder):
"""Bounds the contribution of privacy_id cross partitions.
It ensures that each privacy_id contributes to not more than
Expand All @@ -215,19 +216,18 @@ def bound_contributions(self, col, params, backend, report_generator,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), value)

report_generator.add_stage(
f"Per-partition contribution bounding: for each privacy_id and each"
f" partition, randomly select max(actual_contributions_per_partitio"
f"n, {params.max_contributions_per_partition}) contributions.")

return backend.map_values(
col, aggregate_fn,
"Apply aggregate_fn after cross-partition contribution bounding")


class NoOpContributionBounder(ContributionBounder):
"""Bounds the contribution of privacy_id cross partitions.
It ensures that each privacy_id contributes to not more than
max_partitions_contributed partitions (cross-partition contribution
bounding), by performing sampling if needed. It is assumed that the provided
aggregate_fn function bounds per-partition contributions.
"""
"""Does no sampling."""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
Expand All @@ -236,9 +236,10 @@ def bound_contributions(self, col, params, backend, report_generator,
"Rekey to ((privacy_id, partition_key), value)")
# ((privacy_id, partition_key), value)

return backend.group_by_key(
col, aggregate_fn,
"Apply aggregate_fn after cross-partition contribution bounding")
col = backend.group_by_key(col, "Group by (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])

return backend.map_values(col, aggregate_fn, "Apply aggregate_fn")


def collect_values_per_partition_key_per_privacy_id(
Expand Down
14 changes: 11 additions & 3 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,18 @@ def _create_contribution_bounder(
return \
contribution_bounders.SamplingPerPrivacyIdContributionBounder(
)
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
if params.perform_cross_partition_contribution_bounding:
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder()
# no cross partition contribution
if expects_per_partition_sampling:
return contribution_bounders.LinfSampler()
# No sampling, but combiners themselves do per partition contribution
# bounding.
return contribution_bounders.NoOpContributionBounder()

def _extract_columns(self, col,
data_extractors: pipeline_dp.DataExtractors):
Expand Down

0 comments on commit c591590

Please sign in to comment.