Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DP Count combiner #144

Merged
merged 5 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import abc
import copy

import pipeline_dp
from pipeline_dp import dp_computations
from pipeline_dp import budget_accounting


class Combiner(abc.ABC):
Expand All @@ -9,6 +14,20 @@ class Combiner(abc.ABC):
aggregation state. Combiners contain logic, while accumulators contain data.
The API of combiners are inspired by Apache Beam CombineFn class.
https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn

Here's how PipelineDP uses combiners to performs an aggregation on some
dataset X:
1. Split dataset X on sub-datasets which might be kept in memory.
2. Call create_accumulators() for each sub-dataset and keep resulting accumulators.
3. Choosing any pair of accumulators and merge them by calling merge_accumulators().
4. Continue 3 until 1 accumulator is left.
5. Call compute_metrics() for the accumulator that left.

Assumption: merge_accumulators is associative binary operation.

The type of accumulator depends on the aggregation performed by this Combiner.
For example, this can be a primitive type (e.g. int for a simple "count"
aggregation) or a more complex structure (e.g. for "mean")
"""

@abc.abstractmethod
Expand All @@ -22,6 +41,59 @@ def merge_accumulators(self, accumulator1, accumulator2):
pass

@abc.abstractmethod
def compute_metrics(self, accumulator: 'Accumulator'):
def compute_metrics(self, accumulator):
"""Computes and returns the result of aggregation."""
pass


class CombinerParams:
"""Parameters for a combiner.

Wraps all the information needed by the combiner to do the
differentially-private computation, e.g. privacy budget and mechanism.

Note: 'aggregate_params' is copied.
"""

def __init__(self, spec: budget_accounting.MechanismSpec,
aggregate_params: pipeline_dp.AggregateParams):
self._mechanism_spec = spec
self.aggregate_params = copy.copy(aggregate_params)

@property
def eps(self):
return self._mechanism_spec.eps

@property
def delta(self):
return self._mechanism_spec.delta

@property
def mean_var_params(self):
return dp_computations.MeanVarParams(
self.eps, self.delta, self.aggregate_params.low,
self.aggregate_params.high,
self.aggregate_params.max_partitions_contributed,
self.aggregate_params.max_contributions_per_partition,
self.aggregate_params.noise_kind)


class CountCombiner(Combiner):
"""Combiner for computing DP Count.

The type of the accumulator is int, which represents count of the elements
in the dataset for which this accumulator is computed.
"""

def __init__(self, params: CombinerParams):
self._params = params

def create_accumulator(self, values) -> int:
return len(values)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this return length of the array? Maybe we can add some docs to the parent method to document the expectation for the return type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more comments on Combiner based class on how the Combiner framework works and on CountCombiner class. Please check whether it becomes more clear

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's much clearer, thank you!


def merge_accumulators(self, count1: int, count2: int):
return count1 + count2

def compute_metrics(self, count: int) -> float:
return dp_computations.compute_dp_count(count,
self._params.mean_var_params)
61 changes: 61 additions & 0 deletions tests/combiners_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,66 @@
import pipeline_dp
import pipeline_dp.combiners as combiners
import pipeline_dp.budget_accounting as ba

import numpy as np
import unittest


def _create_mechism_spec(no_noise):
if no_noise:
eps, delta = 1e5, 1.0 - 1e-5
else:
eps, delta = 10, 1e-5

return ba.MechanismSpec(ba.MechanismType.GAUSSIAN, None, eps, delta)


def _create_aggregate_params():
return pipeline_dp.AggregateParams(
low=0,
high=1,
max_partitions_contributed=1,
max_contributions_per_partition=3,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[pipeline_dp.Metrics.COUNT])


class CountAccumulatorTest(unittest.TestCase):

def _create_combiner(self, no_noise):
mechanism_spec = _create_mechism_spec(no_noise)
aggregate_params = _create_aggregate_params()
params = combiners.CombinerParams(mechanism_spec, aggregate_params)
return combiners.CountCombiner(params)

def test_create_accumulator(self):
for no_noise in [False, True]:
combiner = self._create_combiner(no_noise)
self.assertEqual(0, combiner.create_accumulator([]))
self.assertEqual(2, combiner.create_accumulator([1, 2]))

def test_merge_accumulators(self):
for no_noise in [False, True]:
combiner = self._create_combiner(no_noise)
self.assertEqual(0, combiner.merge_accumulators(0, 0))
self.assertEqual(5, combiner.merge_accumulators(1, 4))

def test_compute_metrics_no_noise(self):
combiner = self._create_combiner(no_noise=True)
self.assertAlmostEqual(3, combiner.compute_metrics(3), delta=1e-5)

def test_compute_metrics_with_noise(self):
combiner = self._create_combiner(no_noise=False)
accumulator = 5
noisified_values = [
combiner.compute_metrics(accumulator) for _ in range(1000)
]
self.assertAlmostEqual(accumulator,
np.mean(noisified_values),
delta=1e-1)
self.assertTrue(
np.var(noisified_values) > 1) # check that noise is added


if __name__ == '__main__':
unittest.main()