diff --git a/pipeline_dp/aggregate_params.py b/pipeline_dp/aggregate_params.py index ff777f24..dbbc880d 100644 --- a/pipeline_dp/aggregate_params.py +++ b/pipeline_dp/aggregate_params.py @@ -254,6 +254,7 @@ class AggregateParams: partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC pre_threshold: Optional[int] = None post_aggregation_thresholding: bool = False + perform_cross_partition_contribution_bounding: bool = True @property def metrics_str(self) -> str: diff --git a/pipeline_dp/contribution_bounders.py b/pipeline_dp/contribution_bounders.py index 2ac2eed8..2901cfe0 100644 --- a/pipeline_dp/contribution_bounders.py +++ b/pipeline_dp/contribution_bounders.py @@ -195,6 +195,52 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values): "Apply aggregate_fn after cross-partition contribution bounding") +class SamplingPerPartitionContributionBounder(ContributionBounder): + """Bounds the contribution of privacy_id cross partitions. + + It ensures that each privacy_id contributes to not more than + max_partitions_contributed partitions (cross-partition contribution + bounding), by performing sampling if needed. It is assumed that the provided + aggregate_fn function bounds per-partition contributions. + """ + + def bound_contributions(self, col, params, backend, report_generator, + aggregate_fn): + col = backend.map_tuple( + col, lambda pid, pk, v: ((pid, pk), v), + "Rekey to ((privacy_id, partition_key), value)") + + col = backend.sample_fixed_per_key( + col, params.max_contributions_per_partition, + "Sample per (privacy_id, partition_key)") + # ((privacy_id, partition_key), value) + + return backend.map_values( + col, aggregate_fn, + "Apply aggregate_fn after cross-partition contribution bounding") + + +class NoOpContributionBounder(ContributionBounder): + """Bounds the contribution of privacy_id cross partitions. + + It ensures that each privacy_id contributes to not more than + max_partitions_contributed partitions (cross-partition contribution + bounding), by performing sampling if needed. It is assumed that the provided + aggregate_fn function bounds per-partition contributions. + """ + + def bound_contributions(self, col, params, backend, report_generator, + aggregate_fn): + col = backend.map_tuple( + col, lambda pid, pk, v: ((pid, pk), v), + "Rekey to ((privacy_id, partition_key), value)") + # ((privacy_id, partition_key), value) + + return backend.group_by_key( + col, aggregate_fn, + "Apply aggregate_fn after cross-partition contribution bounding") + + def collect_values_per_partition_key_per_privacy_id( col, backend: pipeline_backend.PipelineBackend): """Collects values into a list for each privacy_id and partition_key.