Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement Accumulator Framework and Compound Accumulator for aggregation #47

Merged
merged 41 commits into from
Jul 2, 2021
Merged

[WIP] Implement Accumulator Framework and Compound Accumulator for aggregation #47

merged 41 commits into from
Jul 2, 2021

Conversation

preethiraghavan1
Copy link
Contributor

Description

Implementing an abstract class for Accumulator and CompoundAccumulator class. Have not implemented serialization yet. Wanted to share this to see if this is in the right direction.

How has this been tested?

  • Unit tests

Checklist


@classmethod
def merge(cls, accumulators: typing.Iterable[
'Accumulator'] = None) -> 'Accumulator':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you like the idea of args type argument here.

merge(cls, *accumulators: 'Accumulator' = None) 

A list need not be created when a single accumulator is involved.
But when a list is involved, then they have to unpack it merge(*list_of_acc)

May not be super helpful, but wanted to know your thoughts on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice idea. For now, as I foresee that the main usecase of calling of this function, there will be a list of accumulators. So let's keep for now iterable and if it's convenient in future we can change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Thanks!

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks it looks great! I left some comments


@classmethod
def merge(cls, accumulators: typing.Iterable[
'Accumulator'] = None) -> 'Accumulator':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice idea. For now, as I foresee that the main usecase of calling of this function, there will be a list of accumulators. So let's keep for now iterable and if it's convenient in future we can change.


class Accumulator(abc.ABC):
"""
Performs aggregations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

"""Base class for all accumulators.

Accumulators are objects that encapsulate aggregations and computations of differential private metrics.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return cls(accumulators)

@abc.abstractmethod
def add_value(self, v):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: v -> value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

def add_value(self, v):
"""
Adds the value to each of the accumulator constituting the
CompoundAccumulator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add '.' in the end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Adds the value to each of the accumulator constituting the
CompoundAccumulator
Args:
v: value to be added
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add '.' in the end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


class CompoundAccumulator(Accumulator):
"""
Performs aggregations for a compund accumulators
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the comment to

"""Accumulator for computing multiple metrics.

CompoundAccumulator contains one or more accumulators of other types for computing multiple metrics. For example it can contain [CountAccumulator, SumAccumulator]. CompoundAccumulator delegates all operation to the internal accumulators.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Performs aggregations for a compund accumulators
"""

def __init__(self, accumulators: typing.Iterable['Accumulator'] = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove = None

self.accumulators = []
if accumulators:
# flatten the accumulators if the input is a CompundAccumulator
self.accumulators = [accumulator_expanded for accumulator in accumulators
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting idea to flatten accumulators (I didn't think about that). Let's do simpler for now and remove flattening.
I think that probably for now we will not have nested accumulators. When we need that, let's implement flattening

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! Done!

def add_accumulator(self, accumulator: 'CompoundAccumulator') -> \
'CompoundAccumulator':
# merges the accumulators of the CompoundAccumulators
self.accumulators.extend(accumulator.accumulators)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry, I didn't explained that, the idea that we merge CompoundAcculators which contains accumulators of the same type and in the same order.
add_accumulator() will be called when we will have 1 accumulators, which have results from different worker machines.

E.g. let's assume that

compound_accumulator1.accumulators = [count_accumulator1, sum_accumulator1]
compound_accumulator2.accumulators = [count_accumulator2, sum_accumulator2]

then compound_accumulator1.add_accumulator(compound_accumulator2) should do

count_accumulator1.add_accumulator(count_accumulator2)
sum_accumulator1.add_accumulator(sum_accumulator2)

i.e. add_accumulator(acc) accumulates data from acc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining! I misunderstood the first time!

self.assertEqual(computed_metrics, [4.5, 41])


class MeanAccumulator(pipeline_dp.Accumulator):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like idea with Mean and SumOfSquares accumulators for testing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It looks great! I left some minor comments

return self

def add_accumulator(self,
accumulator: 'MeanAccumulator') -> 'MeanAccumulator':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: MeanAccumulator -> SumOfSquaresAccumulator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

pipeline_dp/accumulator.py Outdated Show resolved Hide resolved

def add_accumulator(self, accumulator: 'CompoundAccumulator') -> \
'CompoundAccumulator':
# merges the accumulators of the CompoundAccumulators.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
""""Merges the accumulators of ...

The expectation ...
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# the expectation is that the input accumulators are of the same type and
# are in the same order.

if (len(accumulator.accumulators) != len(self.accumulators)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this thorough checks!

Nit: split these checks in 2:
1.The check of len()
2.The check of types:
for ... in zip():
if type() != type():
.... # and here we can tell which exactly types

This would allow us to provide more detailed error information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the change looks good.

return self

def compute_metrics(self):
# Computes the metrics for individual accumulator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
'''Computes and returns a list of metrics computed by internal accumulators""""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

pipeline_dp/accumulator.py Outdated Show resolved Hide resolved
def add_accumulator(self, accumulator: 'Accumulator') -> 'Accumulator':
"""
Merges the accumulator.
The difference between this and the merge function is that here it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After moving to merge functions to CompoundAccumulator. The comment "The difference... " is not needed. Please remove (maybe 1 line comment is enough)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the line.

pipeline_dp/accumulator.py Outdated Show resolved Hide resolved

class CompoundAccumulator(Accumulator):
"""
Accumulator for computing multiple metrics.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
"""Accumulator ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOne

@@ -1,7 +1,6 @@
"""Adapters for working with pipeline frameworks."""

import random
import collections
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it's an incorrect merge in git. Please return back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

differential private metrics.
"""

@classmethod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the reviewing PR I've realized that it's more convenient to have merge as a standalone function.

Could you please make merge a standalone function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it a standalone function. What do you think of making it staticmethod in Accumulator?

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks it looks great! Just some minor comments

CompoundAccumulator contains one or more accumulators of other types for
computing multiple metrics.
For example it can contain [CountAccumulator, SumAccumulator].
CompoundAccumulator delegates all operation to the internal accumulators.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"Accumulators should all be of the same type. Found accumulators of "
+ f"different types: ({','.join(unique_accumulator_types)}).")

return reduce(lambda acc1, acc2: acc1.add_accumulator(acc2), accumulators)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice implementation!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! :)


Returns: Accumulator instance with merged values.
"""
unique_accumulator_types = {type(accumulator).__name__ for accumulator in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume that add_accumulator checks types:

  1. The correctness of add_accumulator depends on whether types the same and
  2. add_accumulator might be called without merge

Nit: remove types checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

'Accumulator'] = None) -> 'Accumulator':
"""Merges the accumulators.

Args:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the short description is enoughm remove starting from "Args:...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



def merge(accumulators: typing.Iterable[
'Accumulator'] = None) -> 'Accumulator':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def add_accumulator(self, accumulator: 'CompoundAccumulator') -> \
'CompoundAccumulator':
"""Merges the accumulators of the CompoundAccumulators.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a blank line after this line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

+ f" Expected size = {len(self.accumulators)}"
+ f" received size = {len(accumulator.accumulators)}.")

expected_type_order = ",".join([type(accumulator).__name__ for
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Checking of types might be done in slighly shorter and more efficient way (no need to create lists):

for acc1, acc2 in zip(..., ...):
  if type(acc1) is not type(acc2):
     raise ....        # here we can just provide the type of the 1st mismatched accumulators 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Let me know if this looks good!

str(context.exception))

def test_adding_mismatched_accumulator_length_raises_exception(self):
mean_acc1 = MeanAccumulator().add_value(11)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I like thorough testing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your thorough review!


@abc.abstractmethod
def add_accumulator(self, accumulator: 'Accumulator') -> 'Accumulator':
"""Merges the accumulator to self and returns self.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment:

That sub-class implementation is responsible for checking that types of self and accumulator are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@@ -6,3 +6,4 @@
from pipeline_dp.pipeline_operations import LocalPipelineOperations
from pipeline_dp.pipeline_operations import BeamOperations
from pipeline_dp.pipeline_operations import SparkRDDOperations
from pipeline_dp.accumulator import CompoundAccumulator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove from init.py

Explanation: CompoundAccumulator is not a part of Public API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry missed that.

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing Accumulator framework!

@dvadym dvadym merged commit 4d4fd45 into OpenMined:main Jul 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants