Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement Accumulator Framework and Compound Accumulator for aggregation #47

Merged
merged 41 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
53d5241
Implementing Per partition - bounding of each contribution and cross …
May 20, 2021
161ee09
empty test case
May 20, 2021
98efeaf
Comments
May 20, 2021
8763009
fixed the abstract method having beam operations in it.
May 20, 2021
d78506f
format the files
May 20, 2021
cf54652
format the files
May 20, 2021
2a3dc08
comment fixes
May 20, 2021
a0051da
misc - Fixes for the comments on cl
May 20, 2021
7605b1c
misc - Fixes for the comments on cl
May 20, 2021
6360c90
added
May 20, 2021
52d0577
formatting the files
May 20, 2021
2ca2235
I screwed up the formatting. Fixing now.
May 21, 2021
7dfffb0
nit fix
May 21, 2021
54144fc
fixing merge conflict
May 21, 2021
b5353bf
comment fixes
May 21, 2021
33aed25
add sample_fixed_per_key_generator for lazy impl
May 21, 2021
992e0ef
applying aggregate function after per partion bounding
May 24, 2021
22b4077
adding types of the values
May 24, 2021
0eb50cb
resolving conflicts
May 24, 2021
f59e2c7
applying group by key and a few fixes
May 25, 2021
ed0f632
function name change, comment change and extracting lambda to a function
May 26, 2021
591b620
test cases for sample_fixed_per_key and flat_map
May 27, 2021
0546975
1. create nested function, 2. rename variable pid to pid_pk_v to repr…
May 27, 2021
b2f951b
comment in the nested function
May 27, 2021
411fc06
removing the comment from the nested fn. It looks unnecessary.
May 27, 2021
e277898
laziness test for sample_fixed_per_key and flat_map
May 27, 2021
d7567f8
Merge branch 'OpenMined:main' into main
preethiraghavan1 May 27, 2021
963d943
Merge https://github.com/OpenMined/PipelineDP
Jun 16, 2021
8d7f420
Basic Accumulator and test cases
Jun 18, 2021
d41bef8
Adding comments
Jun 18, 2021
99ade2e
removing the unncessary formatting done
Jun 18, 2021
05717b4
removing the unncessary formatting done
Jun 18, 2021
041746c
Compoud accumulator add_accumulator fix, remove flattening in merging…
Jun 22, 2021
12c08c9
Comment fix
Jun 23, 2021
daefd5f
Compound Accumulator cleanup
Jun 23, 2021
58abd55
serialization
Jun 25, 2021
f029d51
1. Merge as standalone function. 2. Comment fixes. 3. import fix
Jul 1, 2021
3dbe745
format fix
Jul 1, 2021
24d9869
nit fixes
Jul 2, 2021
cb62ffd
merge function fixes
Jul 2, 2021
d77c286
merge function test fixes
Jul 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pipeline_dp/accumulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import abc
import typing
import pickle
from functools import reduce


def merge(accumulators: typing.Iterable['Accumulator']) -> 'Accumulator':
"""Merges the accumulators."""
return reduce(lambda acc1, acc2: acc1.add_accumulator(acc2), accumulators)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice implementation!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! :)



class Accumulator(abc.ABC):
"""Base class for all accumulators.

Accumulators are objects that encapsulate aggregations and computations of
differential private metrics.
"""

@abc.abstractmethod
def add_value(self, value):
"""Adds the value to each of the accumulator.
Args:
value: value to be added.

Returns: self.
"""
pass

@abc.abstractmethod
def add_accumulator(self, accumulator: 'Accumulator') -> 'Accumulator':
"""Merges the accumulator to self and returns self.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment:

That sub-class implementation is responsible for checking that types of self and accumulator are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


Sub-class implementation is responsible for checking that types of
self and accumulator are the same.
Args:
accumulator:

Returns: self
"""
pass

@abc.abstractmethod
def compute_metrics(self):
pass

def serialize(self):
return pickle.dumps(self)

@classmethod
def deserialize(cls, serialized_obj: str):
deserialized_obj = pickle.loads(serialized_obj)
if not isinstance(deserialized_obj, cls):
raise TypeError("The deserialized object is not of the right type.")
return deserialized_obj


class CompoundAccumulator(Accumulator):
"""Accumulator for computing multiple metrics.

CompoundAccumulator contains one or more accumulators of other types for
computing multiple metrics.
For example it can contain [CountAccumulator, SumAccumulator].
CompoundAccumulator delegates all operations to the internal accumulators.
"""

def __init__(self, accumulators: typing.Iterable['Accumulator']):
self.accumulators = accumulators

def add_value(self, value):
for accumulator in self.accumulators:
accumulator.add_value(value)
return self

def add_accumulator(self, accumulator: 'CompoundAccumulator') -> \
'CompoundAccumulator':
"""Merges the accumulators of the CompoundAccumulators.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a blank line after this line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


The expectation is that the internal accumulators are of the same type and
are in the same order."""

if len(accumulator.accumulators) != len(self.accumulators):
raise ValueError(
"Accumulators in the input are not of the same size."
+ f" Expected size = {len(self.accumulators)}"
+ f" received size = {len(accumulator.accumulators)}.")

for pos, (base_accumulator_type, to_add_accumulator_type) in enumerate(
zip(self.accumulators, accumulator.accumulators)):
if type(base_accumulator_type) != type(to_add_accumulator_type):
raise TypeError("The type of the accumulators don't match at "
f"index {pos}. {type(base_accumulator_type).__name__} "
f"!= {type(to_add_accumulator_type).__name__}.")

for (base_accumulator, to_add_accumulator) in zip(self.accumulators,
accumulator.accumulators):
base_accumulator.add_accumulator(to_add_accumulator)
return self

def compute_metrics(self):
"""Computes and returns a list of metrics computed by internal
accumulators."""
return [accumulator.compute_metrics() for accumulator in self.accumulators]
1 change: 0 additions & 1 deletion pipeline_dp/pipeline_operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Adapters for working with pipeline frameworks."""

import random
import collections
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it's an incorrect merge in git. Please return back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import numpy as np

import abc
Expand Down
191 changes: 191 additions & 0 deletions tests/accumulator_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import unittest
import pipeline_dp
import typing
import numpy as np
from pipeline_dp.accumulator import Accumulator
from pipeline_dp.accumulator import merge
from pipeline_dp.accumulator import CompoundAccumulator


class CompoundAccumulatorTest(unittest.TestCase):

def test_with_mean_and_sum_squares(self):
mean_acc = MeanAccumulator()
sum_squares_acc = SumOfSquaresAccumulator()
compound_accumulator = CompoundAccumulator(
[mean_acc, sum_squares_acc])

compound_accumulator.add_value(3)
compound_accumulator.add_value(4)

computed_metrics = compound_accumulator.compute_metrics()
self.assertTrue(
isinstance(compound_accumulator, CompoundAccumulator))
self.assertEqual(len(computed_metrics), 2)
self.assertEqual(computed_metrics, [3.5, 25])

def test_adding_accumulator(self):
mean_acc1 = MeanAccumulator().add_value(5)
sum_squares_acc1 = SumOfSquaresAccumulator().add_value(5)
compound_accumulator = CompoundAccumulator([mean_acc1,
sum_squares_acc1])

mean_acc2 = MeanAccumulator()
sum_squares_acc2 = SumOfSquaresAccumulator()
to_be_added_compound_accumulator = CompoundAccumulator(
[mean_acc2, sum_squares_acc2])

to_be_added_compound_accumulator.add_value(4)

compound_accumulator.add_accumulator(to_be_added_compound_accumulator)
compound_accumulator.add_value(3)

computed_metrics = compound_accumulator.compute_metrics()
self.assertEqual(len(computed_metrics), 2)
self.assertEqual(computed_metrics, [4, 50])

def test_adding_mismatched_accumulator_order_raises_exception(self):
mean_acc1 = MeanAccumulator().add_value(11)
sum_squares_acc1 = SumOfSquaresAccumulator().add_value(1)
mean_acc2 = MeanAccumulator().add_value(22)
sum_squares_acc2 = SumOfSquaresAccumulator().add_value(2)

base_compound_accumulator = CompoundAccumulator(
[mean_acc1, sum_squares_acc1])
to_add_compound_accumulator = CompoundAccumulator(
[sum_squares_acc2, mean_acc2])

with self.assertRaises(TypeError) as context:
base_compound_accumulator.add_accumulator(to_add_compound_accumulator)
self.assertEqual("The type of the accumulators don't match at index 0. "
"MeanAccumulator != SumOfSquaresAccumulator.""",
str(context.exception))

def test_adding_mismatched_accumulator_length_raises_exception(self):
mean_acc1 = MeanAccumulator().add_value(11)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I like thorough testing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your thorough review!

sum_squares_acc1 = SumOfSquaresAccumulator().add_value(1)
mean_acc2 = MeanAccumulator().add_value(22)

base_compound_accumulator = CompoundAccumulator(
[mean_acc1, sum_squares_acc1])
to_add_compound_accumulator = CompoundAccumulator(
[mean_acc2])

with self.assertRaises(ValueError) as context:
base_compound_accumulator.add_accumulator(to_add_compound_accumulator)
self.assertEqual("Accumulators in the input are not of the same size. "
"Expected size = 2 received size = 1.",
str(context.exception))

def test_serialization_single_accumulator(self):
accumulator = MeanAccumulator().add_value(5).add_value(6)

serialized_obj = accumulator.serialize()
deserialized_obj = Accumulator.deserialize(serialized_obj)

self.assertIsInstance(deserialized_obj, MeanAccumulator)
self.assertEqual(accumulator.sum, deserialized_obj.sum)
self.assertEqual(accumulator.count, deserialized_obj.count)

def test_serialization_compound_accumulator(self):
mean_acc = MeanAccumulator().add_value(15)
sum_squares_acc = SumOfSquaresAccumulator().add_value(1)
compound_accumulator = CompoundAccumulator(
[mean_acc, sum_squares_acc])

serialized_obj = compound_accumulator.serialize()
deserialized_obj = Accumulator.deserialize(serialized_obj)

self.assertIsInstance(deserialized_obj, CompoundAccumulator)
self.assertEqual(len(deserialized_obj.accumulators), 2)
self.assertIsInstance(deserialized_obj.accumulators[0], MeanAccumulator)
self.assertIsInstance(deserialized_obj.accumulators[1],
SumOfSquaresAccumulator)
self.assertEqual(deserialized_obj.compute_metrics(),
compound_accumulator.compute_metrics())

def test_serialization_with_incompatible_serialized_object(self):
mean_accumulator = MeanAccumulator().add_value(15)

serialized_obj = mean_accumulator.serialize()

with self.assertRaises(TypeError) as context:
SumOfSquaresAccumulator.deserialize(serialized_obj)
self.assertEqual("The deserialized object is not of the right type.",
str(context.exception))


class GenericAccumulatorTest(unittest.TestCase):

def test_merge_accumulators(self):
mean_accumulator1 = MeanAccumulator().add_value(15)
mean_accumulator2 = MeanAccumulator().add_value(5)

merged_accumulator = merge([mean_accumulator1, mean_accumulator2])

self.assertEqual(merged_accumulator.compute_metrics(), 10)

def test_merge_diff_type_throws_type_error(self):
mean_accumulator1 = MeanAccumulator().add_value(15)
sum_squares_acc = SumOfSquaresAccumulator().add_value(1)

with self.assertRaises(TypeError) as context:
merge([mean_accumulator1, sum_squares_acc])
self.assertIn("The accumulator to be added is not of the same type.""",
str(context.exception))


class MeanAccumulator(Accumulator):

def __init__(self, accumulators: typing.Iterable[
'MeanAccumulator'] = None):
self.sum = np.sum([concat_acc.sum
for concat_acc in accumulators]) if accumulators else 0
self.count = np.sum([concat_acc.count
for concat_acc in accumulators]) if accumulators else 0

def add_value(self, v):
self.sum += v
self.count += 1
return self

def add_accumulator(self,
accumulator: 'MeanAccumulator') -> 'MeanAccumulator':
if not isinstance(accumulator, MeanAccumulator):
raise TypeError("The accumulator to be added is not of the same type.")
self.sum += accumulator.sum
self.count += accumulator.count
return self

def compute_metrics(self):
if self.count == 0:
return float('NaN')
return self.sum / self.count


# Accumulator classes for testing
class SumOfSquaresAccumulator(Accumulator):

def __init__(self, accumulators: typing.Iterable[
'SumOfSquaresAccumulator'] = None):
self.sum_squares = np.sum([concat_acc.sum_squares
for concat_acc in
accumulators]) if accumulators else 0

def add_value(self, v):
self.sum_squares += v * v
return self

def add_accumulator(self,
accumulator: 'SumOfSquaresAccumulator') -> 'SumOfSquaresAccumulator':
if not isinstance(accumulator, SumOfSquaresAccumulator):
raise TypeError("The accumulator to be added is not of the same type.")
self.sum_squares += accumulator.sum_squares
return self

def compute_metrics(self):
return self.sum_squares


if __name__ == '__main__':
unittest.main()