diff --git a/pipeline_dp/combiners.py b/pipeline_dp/combiners.py index 83b1542b..d66b298e 100644 --- a/pipeline_dp/combiners.py +++ b/pipeline_dp/combiners.py @@ -1,4 +1,9 @@ import abc +import copy + +import pipeline_dp +from pipeline_dp import dp_computations +from pipeline_dp import budget_accounting class Combiner(abc.ABC): @@ -9,6 +14,20 @@ class Combiner(abc.ABC): aggregation state. Combiners contain logic, while accumulators contain data. The API of combiners are inspired by Apache Beam CombineFn class. https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn + + Here's how PipelineDP uses combiners to performs an aggregation on some + dataset X: + 1. Split dataset X on sub-datasets which might be kept in memory. + 2. Call create_accumulators() for each sub-dataset and keep resulting accumulators. + 3. Choosing any pair of accumulators and merge them by calling merge_accumulators(). + 4. Continue 3 until 1 accumulator is left. + 5. Call compute_metrics() for the accumulator that left. + + Assumption: merge_accumulators is associative binary operation. + + The type of accumulator depends on the aggregation performed by this Combiner. + For example, this can be a primitive type (e.g. int for a simple "count" + aggregation) or a more complex structure (e.g. for "mean") """ @abc.abstractmethod @@ -22,6 +41,59 @@ def merge_accumulators(self, accumulator1, accumulator2): pass @abc.abstractmethod - def compute_metrics(self, accumulator: 'Accumulator'): + def compute_metrics(self, accumulator): """Computes and returns the result of aggregation.""" pass + + +class CombinerParams: + """Parameters for a combiner. + + Wraps all the information needed by the combiner to do the + differentially-private computation, e.g. privacy budget and mechanism. + + Note: 'aggregate_params' is copied. + """ + + def __init__(self, spec: budget_accounting.MechanismSpec, + aggregate_params: pipeline_dp.AggregateParams): + self._mechanism_spec = spec + self.aggregate_params = copy.copy(aggregate_params) + + @property + def eps(self): + return self._mechanism_spec.eps + + @property + def delta(self): + return self._mechanism_spec.delta + + @property + def mean_var_params(self): + return dp_computations.MeanVarParams( + self.eps, self.delta, self.aggregate_params.low, + self.aggregate_params.high, + self.aggregate_params.max_partitions_contributed, + self.aggregate_params.max_contributions_per_partition, + self.aggregate_params.noise_kind) + + +class CountCombiner(Combiner): + """Combiner for computing DP Count. + + The type of the accumulator is int, which represents count of the elements + in the dataset for which this accumulator is computed. + """ + + def __init__(self, params: CombinerParams): + self._params = params + + def create_accumulator(self, values) -> int: + return len(values) + + def merge_accumulators(self, count1: int, count2: int): + return count1 + count2 + + def compute_metrics(self, count: int) -> float: + return dp_computations.compute_dp_count(count, + self._params.mean_var_params) diff --git a/tests/combiners_test.py b/tests/combiners_test.py index 1fc35ac7..5631b706 100644 --- a/tests/combiners_test.py +++ b/tests/combiners_test.py @@ -1,5 +1,66 @@ +import pipeline_dp import pipeline_dp.combiners as combiners +import pipeline_dp.budget_accounting as ba + +import numpy as np import unittest + +def _create_mechism_spec(no_noise): + if no_noise: + eps, delta = 1e5, 1.0 - 1e-5 + else: + eps, delta = 10, 1e-5 + + return ba.MechanismSpec(ba.MechanismType.GAUSSIAN, None, eps, delta) + + +def _create_aggregate_params(): + return pipeline_dp.AggregateParams( + low=0, + high=1, + max_partitions_contributed=1, + max_contributions_per_partition=3, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + metrics=[pipeline_dp.Metrics.COUNT]) + + +class CountAccumulatorTest(unittest.TestCase): + + def _create_combiner(self, no_noise): + mechanism_spec = _create_mechism_spec(no_noise) + aggregate_params = _create_aggregate_params() + params = combiners.CombinerParams(mechanism_spec, aggregate_params) + return combiners.CountCombiner(params) + + def test_create_accumulator(self): + for no_noise in [False, True]: + combiner = self._create_combiner(no_noise) + self.assertEqual(0, combiner.create_accumulator([])) + self.assertEqual(2, combiner.create_accumulator([1, 2])) + + def test_merge_accumulators(self): + for no_noise in [False, True]: + combiner = self._create_combiner(no_noise) + self.assertEqual(0, combiner.merge_accumulators(0, 0)) + self.assertEqual(5, combiner.merge_accumulators(1, 4)) + + def test_compute_metrics_no_noise(self): + combiner = self._create_combiner(no_noise=True) + self.assertAlmostEqual(3, combiner.compute_metrics(3), delta=1e-5) + + def test_compute_metrics_with_noise(self): + combiner = self._create_combiner(no_noise=False) + accumulator = 5 + noisified_values = [ + combiner.compute_metrics(accumulator) for _ in range(1000) + ] + self.assertAlmostEqual(accumulator, + np.mean(noisified_values), + delta=1e-1) + self.assertTrue( + np.var(noisified_values) > 1) # check that noise is added + + if __name__ == '__main__': unittest.main()