From c59159046cfce4a881c31aa046f2b93b51bcdde3 Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko Date: Fri, 17 May 2024 15:32:51 +0200 Subject: [PATCH] perform_cross_partition --- pipeline_dp/aggregate_params.py | 5 +++++ pipeline_dp/contribution_bounders.py | 27 ++++++++++++++------------- pipeline_dp/dp_engine.py | 14 +++++++++++--- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/pipeline_dp/aggregate_params.py b/pipeline_dp/aggregate_params.py index dbbc880d..7d78b3ee 100644 --- a/pipeline_dp/aggregate_params.py +++ b/pipeline_dp/aggregate_params.py @@ -234,6 +234,11 @@ class AggregateParams: is ignored when public partitions are used. More details on pre-thresholding are in https://github.com/google/differential-privacy/blob/main/common_docs/pre_thresholding.md + perform_cross_partition_contribution_bounding: whether to perform cross + partition contribution bounding. + Warning: turn off cross partition contribution bounding only when the + number of contributed partitions per privacy unit is already bounded + by max_partitions_contributed. """ metrics: List[Metric] noise_kind: NoiseKind = NoiseKind.LAPLACE diff --git a/pipeline_dp/contribution_bounders.py b/pipeline_dp/contribution_bounders.py index 2901cfe0..fcf54f0f 100644 --- a/pipeline_dp/contribution_bounders.py +++ b/pipeline_dp/contribution_bounders.py @@ -22,6 +22,7 @@ from pipeline_dp import sampling_utils +# TODO: rename ContributionBounder -> Sampler class ContributionBounder(abc.ABC): """Interface for objects which perform contribution bounding.""" @@ -76,8 +77,8 @@ def bound_contributions(self, col, params, backend, report_generator, "Sample per (privacy_id, partition_key)") report_generator.add_stage( f"Per-partition contribution bounding: for each privacy_id and each" - f"partition, randomly select max(actual_contributions_per_partition" - f", {max_contributions_per_partition}) contributions.") + f" partition, randomly select max(actual_contributions_per_partitio" + f"n, {max_contributions_per_partition}) contributions.") # ((privacy_id, partition_key), [value]) col = backend.map_values( col, aggregate_fn, @@ -195,7 +196,7 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values): "Apply aggregate_fn after cross-partition contribution bounding") -class SamplingPerPartitionContributionBounder(ContributionBounder): +class LinfSampler(ContributionBounder): """Bounds the contribution of privacy_id cross partitions. It ensures that each privacy_id contributes to not more than @@ -215,19 +216,18 @@ def bound_contributions(self, col, params, backend, report_generator, "Sample per (privacy_id, partition_key)") # ((privacy_id, partition_key), value) + report_generator.add_stage( + f"Per-partition contribution bounding: for each privacy_id and each" + f" partition, randomly select max(actual_contributions_per_partitio" + f"n, {params.max_contributions_per_partition}) contributions.") + return backend.map_values( col, aggregate_fn, "Apply aggregate_fn after cross-partition contribution bounding") class NoOpContributionBounder(ContributionBounder): - """Bounds the contribution of privacy_id cross partitions. - - It ensures that each privacy_id contributes to not more than - max_partitions_contributed partitions (cross-partition contribution - bounding), by performing sampling if needed. It is assumed that the provided - aggregate_fn function bounds per-partition contributions. - """ + """Does no sampling.""" def bound_contributions(self, col, params, backend, report_generator, aggregate_fn): @@ -236,9 +236,10 @@ def bound_contributions(self, col, params, backend, report_generator, "Rekey to ((privacy_id, partition_key), value)") # ((privacy_id, partition_key), value) - return backend.group_by_key( - col, aggregate_fn, - "Apply aggregate_fn after cross-partition contribution bounding") + col = backend.group_by_key(col, "Group by (privacy_id, partition_key)") + # ((privacy_id, partition_key), [value]) + + return backend.map_values(col, aggregate_fn, "Apply aggregate_fn") def collect_values_per_partition_key_per_privacy_id( diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index 346dc7bf..5f4d2d4a 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -386,10 +386,18 @@ def _create_contribution_bounder( return \ contribution_bounders.SamplingPerPrivacyIdContributionBounder( ) - if expects_per_partition_sampling: - return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder( + if params.perform_cross_partition_contribution_bounding: + if expects_per_partition_sampling: + return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder( + ) + return contribution_bounders.SamplingCrossPartitionContributionBounder( ) - return contribution_bounders.SamplingCrossPartitionContributionBounder() + # no cross partition contribution + if expects_per_partition_sampling: + return contribution_bounders.LinfSampler() + # No sampling, but combiners themselves do per partition contribution + # bounding. + return contribution_bounders.NoOpContributionBounder() def _extract_columns(self, col, data_extractors: pipeline_dp.DataExtractors):