Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contribution Bounding - per partition and cross partition bounding - continued from #26 pull request #32

Merged
merged 26 commits into from
May 27, 2021

Conversation

preethiraghavan1
Copy link
Contributor

Description

Implementing Per partition - bounding of each contribution and cross partition. Added flat map in pipeline_operations.py to accommodate this functionality. Continuation of #26

How has this been tested?

  • Unit tests for bounding

Checklist

max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])
col = self._ops.map(col, lambda pid_pk: (pid_pk[0], aggregator_fn(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please apply aggregator_fn per-partition contribution bounding, no need to do any aggregation after cross-partition bounding

Clarification:
eventually we need to aggregate per pk, we do it in 2 steps:
1.Aggregate per (pid, pk) (after per-partition contributions, this PR)
2.Aggregate per pk (after cross-partition contributions), but that's outside of the scope of this task

Aggregation in 2 is slightly more complicated, because in 1 we can assume that data per key is in memory (because we've done bounding), but in 2 we can't that assume, so it's needed to use group by key.

That makes sense! Thanks!
Moved it to after per partition

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks it looks great! I've left comments (mostly minor ones)

@@ -49,3 +49,45 @@ def aggregate(self, col, params: AggregateParams,
# TODO: implement aggregate().
# It returns input for now, just to ensure that the an example works.
return col

def _bound_cross_partition_contributions(self, col,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rename to _bound_contributions (sry I realized that the name I provided is not correct)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])
col = self._ops.map(col, lambda pid_pk: (pid_pk[0], aggregator_fn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

pipeline_dp/dp_engine.py Show resolved Hide resolved
max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
# ((privacy_id, partition_key), [value])
col = self._ops.map(col, lambda pid_pk: (pid_pk[0], aggregator_fn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (optional/style improvements): using map_values() instead of map() is simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! Done

# Cross partition bounding
col = self._ops.map_tuple(col, lambda pid_pk, v: (pid_pk[0],
(pid_pk[1], v)),
"To (privacy_id, (partition_key, aggregator))")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update a stage name to "Rekey to ...".

Clarification:the operation of changing keys is needed very often and usually we use Rekey name for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -176,8 +188,19 @@ def keys(self, col, stage_name: str):
def values(self, col, stage_name: typing.Optional[str] = None):
return (v for k, v in col)

def sample_fixed_per_key(self, col, n: int, stage_name: str):
pass
def sample_fixed_per_key(self, col, n: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

col = self._ops.sample_fixed_per_key(col, max_partitions_contributed,
"Sample per privacy_id")
# (privacy_id, [(partition_key, aggregator)])
return self._ops.flat_map(col, lambda pid: [((pid[0], pk_v[0]), pk_v[1])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: lambda in this line is pretty complicated, maybe try a local function instead of lambda.

and please use lazy evaluations (there are 2 options "yield" or "(...)" generator)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

tests/dp_engine_test.py Show resolved Hide resolved
max_contributions_per_partition: int,
aggregator_fn):
"""
Bounds the contribution by privacy_id in and cross partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add "." after this phrase and then after each args description and return description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few minor suggestions

self._unnest_cross_partition_bound_sampled_per_key,
"Unnest")

def _unnest_cross_partition_bound_sampled_per_key(self, pid):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to make it internal function of _bound_contributions, and no need to make comments then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it makes it better to be a nested function. Style guide suggested that we use nested function only for closing over a variable, so, added as a module level. Moved it to inner function.

self._unnest_cross_partition_bound_sampled_per_key,
"Unnest")

def _unnest_cross_partition_bound_sampled_per_key(self, pid):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename pid to pid_pk_v (since it's not pid, but a tuple)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Returns: tuple of the form ((privacy_id, partition_key), values)

"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please unpack arguments for readability:
pid, pk_values = pid_pk_v

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor test improvement suggestion

@@ -176,8 +188,19 @@ def keys(self, col, stage_name: str):
def values(self, col, stage_name: typing.Optional[str] = None):
return (v for k, v in col)

def sample_fixed_per_key(self, col, n: int, stage_name: str):
pass
def sample_fixed_per_key(self, col, n: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

@@ -133,6 +153,52 @@ def assert_laziness(operator, *args):
assert_laziness(self.ops.values)
assert_laziness(self.ops.count_per_element)

def test_local_sample_fixed_per_key_requires_no_discarding(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add
assert_laziness(self.ops.sample_fixed_per_key)
assert_laziness(self.ops.flat_map)

in test_laziness a few lines above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot of contributing!

@@ -133,6 +153,52 @@ def assert_laziness(operator, *args):
assert_laziness(self.ops.values)
assert_laziness(self.ops.count_per_element)

def test_local_sample_fixed_per_key_requires_no_discarding(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

@dvadym dvadym merged commit 8db30eb into OpenMined:main May 27, 2021
@preethiraghavan1 preethiraghavan1 linked an issue May 27, 2021 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Contribution bounding
2 participants