Skip to content

Commit

Permalink
Leftovers of PipeineOperations -> PipelineBackend renaming (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Jan 22, 2022
1 parent 177e015 commit 1da549f
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 251 deletions.
16 changes: 8 additions & 8 deletions examples/restaurant_visits.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
"\n",
"# 1. Create PipelineBackend\n",
"# PipelineBackend is an object which encapsulates pipeline framework.\n",
"ops = pipeline_dp.LocalBackend()\n",
"backend = pipeline_dp.LocalBackend()\n",
"\n",
"# 2. Set the total privacy budget.\n",
"# BudgetAccountant automatically splits over all DP aggregations.\n",
Expand All @@ -170,7 +170,7 @@
"\n",
"# 3. Create a DPEngine instance.\n",
"# DPEngine object performs all DP aggregations.\n",
"dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)\n",
"dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)\n",
"\n",
"# 4. Specify which DP aggregated metrics to compute.\n",
"# AggregateParams constains specifications of metrics that we would like \n",
Expand Down Expand Up @@ -228,10 +228,10 @@
},
"source": [
"# Framework independent function\n",
"def run_pipeline(data, ops):\n",
"def run_pipeline(data, backend):\n",
" budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-7)\n",
"\n",
" dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)\n",
" dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)\n",
"\n",
" params = pipeline_dp.AggregateParams(noise_kind = pipeline_dp.NoiseKind.LAPLACE,\n",
" metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM],\n",
Expand Down Expand Up @@ -266,8 +266,8 @@
"cell_type": "code",
"source": [
"# Local demo with run_pipeline\n",
"ops = pipeline_dp.LocalBackend()\n",
"dp_result = list(run_pipeline(rows, ops))\n",
"backend = pipeline_dp.LocalBackend()\n",
"dp_result = list(run_pipeline(rows, backend))\n",
"print(dp_result)"
],
"metadata": {
Expand Down Expand Up @@ -296,8 +296,8 @@
"\n",
"with beam.Pipeline(runner=runner) as pipeline:\n",
" beam_data = pipeline | beam.Create(rows)\n",
" ops = pipeline_dp.BeamBackend()\n",
" dp_result = run_pipeline(beam_data, ops) \n",
" backend = pipeline_dp.BeamBackend()\n",
" dp_result = run_pipeline(beam_data, backend) \n",
" dp_result | beam.Map(print)\n"
],
"execution_count": null,
Expand Down
5 changes: 3 additions & 2 deletions pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ def __init__(self, params: CombinerParams):
self._params = params

def create_accumulator(self, values: Iterable[float]) -> AccumulatorType:
return len(values), np.clip(values, self._params.aggregate_params.min_value,
self._params.aggregate_params.max_value).sum()
return len(values), np.clip(
values, self._params.aggregate_params.min_value,
self._params.aggregate_params.max_value).sum()

def merge_accumulators(self, accum1: AccumulatorType,
accum2: AccumulatorType):
Expand Down
6 changes: 3 additions & 3 deletions pipeline_dp/dp_computations.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def squares_interval(self):
return 0, max(self.min_value**2, self.max_value**2)
return self.min_value**2, self.max_value**2


def compute_middle(min_value: float, max_value: float):
""""Returns the middle point of the interval [min_value, max_value]."""
""""Returns the middle point of the interval [min_value, max_value]."""
# (min_value + max_value) / 2 may cause an overflow or loss of precision if
# min_value and max_value are large.
return min_value + (max_value - min_value) / 2


def compute_l1_sensitivity(l0_sensitivity: float, linf_sensitivity: float):
"""Calculates the L1 sensitivity based on the L0 and Linf sensitivities.
Expand Down
71 changes: 36 additions & 35 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class DPEngine:
"""Performs DP aggregations."""

def __init__(self, budget_accountant: BudgetAccountant,
ops: PipelineBackend):
backend: PipelineBackend):
self._budget_accountant = budget_accountant
self._ops = ops
self._backend = backend
self._report_generators = []

def _add_report_stage(self, text):
Expand Down Expand Up @@ -80,7 +80,7 @@ def _aggregate(self, col, params: AggregateParams,
data_extractors)

# Extract the columns.
col = self._ops.map(
col = self._backend.map(
col, lambda row: (data_extractors.privacy_id_extractor(row),
data_extractors.partition_extractor(row),
data_extractors.value_extractor(row)),
Expand All @@ -91,8 +91,8 @@ def _aggregate(self, col, params: AggregateParams,
combiner.create_accumulator)
# col : ((privacy_id, partition_key), accumulator)

col = self._ops.map_tuple(col, lambda pid_pk, v: (pid_pk[1], v),
"Drop privacy id")
col = self._backend.map_tuple(col, lambda pid_pk, v: (pid_pk[1], v),
"Drop privacy id")
# col : (partition_key, accumulator)

if params.public_partitions:
Expand All @@ -101,7 +101,7 @@ def _aggregate(self, col, params: AggregateParams,
combiner.create_accumulator)
# col : (partition_key, accumulator)

col = self._ops.combine_accumulators_per_key(
col = self._backend.combine_accumulators_per_key(
col, combiner, "Reduce accumulators per partition key")
# col : (partition_key, accumulator)

Expand All @@ -115,11 +115,11 @@ def _aggregate(self, col, params: AggregateParams,
# col = self._fix_budget_accounting_if_needed(col, accumulator_factory)

# Compute DP metrics.
col = self._ops.map_values(col, combiner.compute_metrics,
"Compute DP` metrics")
col = self._backend.map_values(col, combiner.compute_metrics,
"Compute DP` metrics")

col = self._ops.map_values(col, lambda result: result[1],
"Extract results")
col = self._backend.map_values(col, lambda result: result[1],
"Extract results")

return col

Expand Down Expand Up @@ -198,14 +198,14 @@ def select_private_partitions(self, col,
max_partitions_contributed = params.max_partitions_contributed

# Extract the columns.
col = self._ops.map(
col = self._backend.map(
col, lambda row: (data_extractors.privacy_id_extractor(row),
data_extractors.partition_extractor(row)),
"Extract (privacy_id, partition_key))")
# col : (privacy_id, partition_key)

# Apply cross-partition contribution bounding
col = self._ops.group_by_key(col, "Group by privacy_id")
col = self._backend.group_by_key(col, "Group by privacy_id")

# col : (privacy_id, [partition_key])

Expand All @@ -232,38 +232,39 @@ def sample_unique_elements_fn(pid_and_pks):

return ((pid, pk) for pk in sampled_elements)

col = self._ops.flat_map(col, sample_unique_elements_fn,
"Sample cross-partition contributions")
col = self._backend.flat_map(col, sample_unique_elements_fn,
"Sample cross-partition contributions")
# col : (privacy_id, partition_key)

# A compound accumulator without any child accumulators is used to calculate the raw privacy ID count.
compound_combiner = CompoundCombiner([])
col = self._ops.map_tuple(
col = self._backend.map_tuple(
col, lambda pid, pk: (pk, compound_combiner.create_accumulator([])),
"Drop privacy id and add accumulator")
# col : (partition_key, accumulator)

col = self._ops.combine_accumulators_per_key(
col = self._backend.combine_accumulators_per_key(
col, compound_combiner, "Combine accumulators per partition key")
# col : (partition_key, accumulator)

col = self._select_private_partitions_internal(
col, max_partitions_contributed)
col = self._ops.keys(col, "Drop accumulators, keep only partition keys")
col = self._backend.keys(col,
"Drop accumulators, keep only partition keys")

return col

def _drop_not_public_partitions(self, col, public_partitions,
data_extractors: DataExtractors):
"""Drops partitions in `col` which are not in `public_partitions`."""
col = self._ops.map(
col = self._backend.map(
col, lambda row: (data_extractors.partition_extractor(row), row),
"Extract partition id")
col = self._ops.filter_by_key(col, public_partitions,
"Filtering out non-public partitions")
col = self._backend.filter_by_key(
col, public_partitions, "Filtering out non-public partitions")
self._add_report_stage(
f"Public partition selection: dropped non public partitions")
return self._ops.map_tuple(col, lambda k, v: v, "Drop key")
return self._backend.map_tuple(col, lambda k, v: v, "Drop key")

def _add_empty_public_partitions(self, col, public_partitions,
aggregator_fn):
Expand All @@ -272,11 +273,11 @@ def _add_empty_public_partitions(self, col, public_partitions,
self._add_report_stage(
"Adding empty partitions to public partitions that are missing in "
"data")
empty_accumulators = self._ops.map(
empty_accumulators = self._backend.map(
public_partitions, lambda partition_key:
(partition_key, aggregator_fn([])), "Build empty accumulators")

return self._ops.flatten(
return self._backend.flatten(
col, empty_accumulators,
"Join public partitions with partitions from data")

Expand All @@ -299,28 +300,29 @@ def _bound_contributions(self, col, max_partitions_contributed: int,
accumulator).
"""
# per partition-contribution bounding with bounding of each contribution
col = self._ops.map_tuple(
col = self._backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ( (privacy_id, partition_key), value))")
col = self._ops.sample_fixed_per_key(
col = self._backend.sample_fixed_per_key(
col, max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
self._add_report_stage(
f"Per-partition contribution bounding: randomly selected not "
f"more than {max_contributions_per_partition} contributions")
# ((privacy_id, partition_key), [value])
col = self._ops.map_values(
col = self._backend.map_values(
col, aggregator_fn,
"Apply aggregate_fn after per partition bounding")
# ((privacy_id, partition_key), accumulator)

# Cross partition bounding
col = self._ops.map_tuple(
col = self._backend.map_tuple(
col, lambda pid_pk, v: (pid_pk[0], (pid_pk[1], v)),
"Rekey to (privacy_id, (partition_key, "
"accumulator))")
col = self._ops.sample_fixed_per_key(col, max_partitions_contributed,
"Sample per privacy_id")
col = self._backend.sample_fixed_per_key(col,
max_partitions_contributed,
"Sample per privacy_id")

self._add_report_stage(
f"Cross-partition contribution bounding: randomly selected not more than "
Expand All @@ -331,9 +333,8 @@ def unnest_cross_partition_bound_sampled_per_key(pid_pk_v):
pid, pk_values = pid_pk_v
return (((pid, pk), v) for (pk, v) in pk_values)

return self._ops.flat_map(col,
unnest_cross_partition_bound_sampled_per_key,
"Unnest")
return self._backend.flat_map(
col, unnest_cross_partition_bound_sampled_per_key, "Unnest")

def _select_private_partitions_internal(self, col,
max_partitions_contributed: int):
Expand Down Expand Up @@ -371,7 +372,7 @@ def filter_fn(
f"Private Partition selection: using {budget.mechanism_type.value} "
f"method with (eps= {budget.eps}, delta = {budget.delta})")

return self._ops.filter(col, filter_fn, "Filter private partitions")
return self._backend.filter(col, filter_fn, "Filter private partitions")

def _fix_budget_accounting_if_needed(self, col, accumulator_factory):
"""Adds MechanismSpec to accumulators.
Expand All @@ -391,11 +392,11 @@ def _fix_budget_accounting_if_needed(self, col, accumulator_factory):
Returns:
col: collection with elements (key, accumulator).
"""
if not self._ops.is_serialization_immediate_on_reduce_by_key():
if not self._backend.is_serialization_immediate_on_reduce_by_key():
# No need to fix, since accumulators contain correct MechanismSpec.
return col
mechanism_specs = accumulator_factory.get_mechanism_specs()
return self._ops.map_values(
return self._backend.map_values(
col, lambda acc: acc.set_mechanism_specs(mechanism_specs))

def _not_a_proper_number(self, num):
Expand Down
18 changes: 9 additions & 9 deletions pipeline_dp/private_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def __init__(self,
self._sum_params = sum_params

def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
ops = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops)
backend = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

params = pipeline_dp.AggregateParams(
noise_kind=self._sum_params.noise_kind,
Expand All @@ -103,7 +103,7 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:

# aggregate() returns a list of metrics for each partition key.
# Here is only one metric - sum. Remove list.
dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list")
dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
# dp_result : (partition_key, dp_sum)

return dp_result
Expand All @@ -119,8 +119,8 @@ def __init__(self,
self._count_params = count_params

def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
ops = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops)
backend = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

params = pipeline_dp.AggregateParams(
noise_kind=self._count_params.noise_kind,
Expand All @@ -144,7 +144,7 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:

# aggregate() returns a list of metrics for each partition key.
# Here is only one metric - count. Remove list.
dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list")
dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
# dp_result : (partition_key, dp_count)

return dp_result
Expand All @@ -160,8 +160,8 @@ def __init__(self,
self._privacy_id_count_params = privacy_id_count_params

def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
ops = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, ops)
backend = pipeline_dp.BeamBackend()
dp_engine = pipeline_dp.DPEngine(self._budget_accountant, backend)

params = pipeline_dp.AggregateParams(
noise_kind=self._privacy_id_count_params.noise_kind,
Expand All @@ -183,7 +183,7 @@ def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:

# aggregate() returns a list of metrics for each partition key.
# Here is only one metric - privacy_id_count. Remove list.
dp_result = ops.map_values(dp_result, lambda v: v[0], "Unnest list")
dp_result = backend.map_values(dp_result, lambda v: v[0], "Unnest list")
# dp_result : (partition_key, dp_privacy_id_count)

return dp_result
Expand Down
Loading

0 comments on commit 1da549f

Please sign in to comment.