Skip to content

Commit

Permalink
DP Count combiner (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Jan 17, 2022
1 parent 6172e7e commit 53e7a60
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 1 deletion.
74 changes: 73 additions & 1 deletion pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import abc
import copy

import pipeline_dp
from pipeline_dp import dp_computations
from pipeline_dp import budget_accounting


class Combiner(abc.ABC):
Expand All @@ -9,6 +14,20 @@ class Combiner(abc.ABC):
aggregation state. Combiners contain logic, while accumulators contain data.
The API of combiners are inspired by Apache Beam CombineFn class.
https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn
Here's how PipelineDP uses combiners to performs an aggregation on some
dataset X:
1. Split dataset X on sub-datasets which might be kept in memory.
2. Call create_accumulators() for each sub-dataset and keep resulting accumulators.
3. Choosing any pair of accumulators and merge them by calling merge_accumulators().
4. Continue 3 until 1 accumulator is left.
5. Call compute_metrics() for the accumulator that left.
Assumption: merge_accumulators is associative binary operation.
The type of accumulator depends on the aggregation performed by this Combiner.
For example, this can be a primitive type (e.g. int for a simple "count"
aggregation) or a more complex structure (e.g. for "mean")
"""

@abc.abstractmethod
Expand All @@ -22,6 +41,59 @@ def merge_accumulators(self, accumulator1, accumulator2):
pass

@abc.abstractmethod
def compute_metrics(self, accumulator: 'Accumulator'):
def compute_metrics(self, accumulator):
"""Computes and returns the result of aggregation."""
pass


class CombinerParams:
"""Parameters for a combiner.
Wraps all the information needed by the combiner to do the
differentially-private computation, e.g. privacy budget and mechanism.
Note: 'aggregate_params' is copied.
"""

def __init__(self, spec: budget_accounting.MechanismSpec,
aggregate_params: pipeline_dp.AggregateParams):
self._mechanism_spec = spec
self.aggregate_params = copy.copy(aggregate_params)

@property
def eps(self):
return self._mechanism_spec.eps

@property
def delta(self):
return self._mechanism_spec.delta

@property
def mean_var_params(self):
return dp_computations.MeanVarParams(
self.eps, self.delta, self.aggregate_params.low,
self.aggregate_params.high,
self.aggregate_params.max_partitions_contributed,
self.aggregate_params.max_contributions_per_partition,
self.aggregate_params.noise_kind)


class CountCombiner(Combiner):
"""Combiner for computing DP Count.
The type of the accumulator is int, which represents count of the elements
in the dataset for which this accumulator is computed.
"""

def __init__(self, params: CombinerParams):
self._params = params

def create_accumulator(self, values) -> int:
return len(values)

def merge_accumulators(self, count1: int, count2: int):
return count1 + count2

def compute_metrics(self, count: int) -> float:
return dp_computations.compute_dp_count(count,
self._params.mean_var_params)
61 changes: 61 additions & 0 deletions tests/combiners_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,66 @@
import pipeline_dp
import pipeline_dp.combiners as combiners
import pipeline_dp.budget_accounting as ba

import numpy as np
import unittest


def _create_mechism_spec(no_noise):
if no_noise:
eps, delta = 1e5, 1.0 - 1e-5
else:
eps, delta = 10, 1e-5

return ba.MechanismSpec(ba.MechanismType.GAUSSIAN, None, eps, delta)


def _create_aggregate_params():
return pipeline_dp.AggregateParams(
low=0,
high=1,
max_partitions_contributed=1,
max_contributions_per_partition=3,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.COUNT])


class CountAccumulatorTest(unittest.TestCase):

def _create_combiner(self, no_noise):
mechanism_spec = _create_mechism_spec(no_noise)
aggregate_params = _create_aggregate_params()
params = combiners.CombinerParams(mechanism_spec, aggregate_params)
return combiners.CountCombiner(params)

def test_create_accumulator(self):
for no_noise in [False, True]:
combiner = self._create_combiner(no_noise)
self.assertEqual(0, combiner.create_accumulator([]))
self.assertEqual(2, combiner.create_accumulator([1, 2]))

def test_merge_accumulators(self):
for no_noise in [False, True]:
combiner = self._create_combiner(no_noise)
self.assertEqual(0, combiner.merge_accumulators(0, 0))
self.assertEqual(5, combiner.merge_accumulators(1, 4))

def test_compute_metrics_no_noise(self):
combiner = self._create_combiner(no_noise=True)
self.assertAlmostEqual(3, combiner.compute_metrics(3), delta=1e-5)

def test_compute_metrics_with_noise(self):
combiner = self._create_combiner(no_noise=False)
accumulator = 5
noisified_values = [
combiner.compute_metrics(accumulator) for _ in range(1000)
]
self.assertAlmostEqual(accumulator,
np.mean(noisified_values),
delta=1e-1)
self.assertTrue(
np.var(noisified_values) > 1) # check that noise is added


if __name__ == '__main__':
unittest.main()

0 comments on commit 53e7a60

Please sign in to comment.