Skip to content

Commit

Permalink
samplers
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed May 16, 2024
1 parent 1333a00 commit 32d6920
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class AggregateParams:
partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC
pre_threshold: Optional[int] = None
post_aggregation_thresholding: bool = False
perform_cross_partition_contribution_bounding: bool = True

@property
def metrics_str(self) -> str:
Expand Down
46 changes: 46 additions & 0 deletions pipeline_dp/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,52 @@ def rekey_per_privacy_id_per_partition_key(pid_pk_v_values):
"Apply aggregate_fn after cross-partition contribution bounding")


class SamplingPerPartitionContributionBounder(ContributionBounder):
"""Bounds the contribution of privacy_id cross partitions.
It ensures that each privacy_id contributes to not more than
max_partitions_contributed partitions (cross-partition contribution
bounding), by performing sampling if needed. It is assumed that the provided
aggregate_fn function bounds per-partition contributions.
"""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")

col = backend.sample_fixed_per_key(
col, params.max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), value)

return backend.map_values(
col, aggregate_fn,
"Apply aggregate_fn after cross-partition contribution bounding")


class NoOpContributionBounder(ContributionBounder):
"""Bounds the contribution of privacy_id cross partitions.
It ensures that each privacy_id contributes to not more than
max_partitions_contributed partitions (cross-partition contribution
bounding), by performing sampling if needed. It is assumed that the provided
aggregate_fn function bounds per-partition contributions.
"""

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")
# ((privacy_id, partition_key), value)

return backend.group_by_key(
col, aggregate_fn,
"Apply aggregate_fn after cross-partition contribution bounding")


def collect_values_per_partition_key_per_privacy_id(
col, backend: pipeline_backend.PipelineBackend):
"""Collects values into a list for each privacy_id and partition_key.
Expand Down

0 comments on commit 32d6920

Please sign in to comment.