Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hierarchical accountant #123

Merged
merged 7 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class AggregateParams:
metrics: Iterable[Metrics]
max_partitions_contributed: int
max_contributions_per_partition: int
budget_weight: float = 1
low: float = None
high: float = None
budget_weight: float = 1
sushkoy marked this conversation as resolved.
Show resolved Hide resolved
public_partitions: Any = None


Expand Down Expand Up @@ -90,7 +90,6 @@ class SumParams:
max_contributions_per_partition: int
low: float
high: float
budget_weight: float
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return back this as well

public_partitions: Union[list, 'PCollection', 'RDD']
partition_extractor: Callable
value_extractor: Callable
87 changes: 83 additions & 4 deletions pipeline_dp/budget_accounting.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class MechanismSpecInternal:
class BudgetAccountant(abc.ABC):
"""Base class for budget accountants."""

def __init__(self):
self._scopes_stack = []
self._mechanisms = []

@abc.abstractmethod
def request_budget(
self,
Expand All @@ -104,6 +108,69 @@ def request_budget(
def compute_budgets(self):
pass

def scope(self, weight: float):
"""Defines a scope for DP operations that should consume no more than "weight" proportion of the budget
of the parent scope.

The accountant will automatically scale the budgets of all sub-operations accordingly.

Example usage:
with accountant.scope(weight = 0.5):
... some code that consumes DP budget ...

Args:
weight: budget weight of all operations made within this scope as compared to.

Returns:
the scope that should be used in a "with" block enclosing the operations consuming the budget.
"""
return BudgetAccountantScope(self, weight)

def _register_mechanism(self, mechanism: MechanismSpecInternal):
"""Registers this mechanism for the future normalisation."""

# Register in the global list of mechanisms
self._mechanisms.append(mechanism)

# Register in all of the current scopes
for scope in self._scopes_stack:
scope.mechanisms.append(mechanism)

return mechanism

def _enter_scope(self, scope):
self._scopes_stack.append(scope)

def _exit_scope(self):
self._scopes_stack.pop()


@dataclass
class BudgetAccountantScope:

def __init__(self, accountant, weight):
self.weight = weight
self.accountant = accountant
self.mechanisms = []

def __enter__(self):
self.accountant._enter_scope(self)

def __exit__(self, exc_type, exc_val, exc_tb):
self.accountant._exit_scope()
self._normalise_mechanism_weights()

def _normalise_mechanism_weights(self):
"""Normalise all mechanism weights so that they sum up to the weight of the current scope."""

if not self.mechanisms:
return

total_weight = sum([m.weight for m in self.mechanisms])
normalisation_factor = self.weight / total_weight
for mechanism in self.mechanisms:
mechanism.weight *= normalisation_factor


class NaiveBudgetAccountant(BudgetAccountant):
"""Manages the privacy budget."""
Expand All @@ -118,12 +185,12 @@ def __init__(self, total_epsilon: float, total_delta: float):
Raises:
A ValueError if either argument is out of range.
"""
super().__init__()

_validate_epsilon_delta(total_epsilon, total_delta)

self._total_epsilon = total_epsilon
self._total_delta = total_delta
self._mechanisms = []

def request_budget(
self,
Expand Down Expand Up @@ -162,7 +229,8 @@ def request_budget(
mechanism_spec=mechanism_spec,
sensitivity=sensitivity,
weight=weight)
self._mechanisms.append(mechanism_spec_internal)

self._register_mechanism(mechanism_spec_internal)
return mechanism_spec

def compute_budgets(self):
Expand All @@ -171,6 +239,10 @@ def compute_budgets(self):
logging.warning("No budgets were requested.")
return

if self._scopes_stack:
raise Exception(
"Cannot call compute_budgets from within a budget scope.")

total_weight_eps = total_weight_delta = 0
for mechanism in self._mechanisms:
total_weight_eps += mechanism.weight * mechanism.mechanism_spec.count
Expand Down Expand Up @@ -213,11 +285,12 @@ def __init__(self,
ValueError: Arguments are missing or out of range.
"""

super().__init__()

_validate_epsilon_delta(total_epsilon, total_delta)

self._total_epsilon = total_epsilon
self._total_delta = total_delta
self._mechanisms = []
self.minimum_noise_std = None
self._pld_discretization = pld_discretization

Expand Down Expand Up @@ -258,7 +331,7 @@ def request_budget(
mechanism_spec=mechanism_spec,
sensitivity=sensitivity,
weight=weight)
self._mechanisms.append(mechanism_spec_internal)
self._register_mechanism(mechanism_spec_internal)
return mechanism_spec

def compute_budgets(self):
Expand All @@ -269,7 +342,13 @@ def compute_budgets(self):
entire pipeline.
"""
if not self._mechanisms:
logging.warning("No budgets were requested.")
return

if self._scopes_stack:
raise Exception(
"Cannot call compute_budgets from within a budget scope.")

if self._total_delta == 0:
sum_weights = 0
for mechanism in self._mechanisms:
Expand Down
9 changes: 8 additions & 1 deletion pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ def aggregate(self, col, params: AggregateParams,
"""
if params is None:
return None

with self._budget_accountant.scope(weight=params.budget_weight):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like an idea to create internal aggregate instead of having with on the entire function!

return self._aggregate(col, params, data_extractors)

def _aggregate(self, col, params: AggregateParams,
data_extractors: DataExtractors):

self._report_generators.append(ReportGenerator(params))

accumulator_factory = AccumulatorFactory(
Expand Down Expand Up @@ -210,4 +217,4 @@ def _fix_budget_accounting_if_needed(self, col, accumulator_factory):
return col
mechanism_specs = accumulator_factory.get_mechanism_specs()
return self._ops.map_values(
col, lambda acc: acc.set_mechanism_specs(mechanism_specs))
col, lambda acc: acc.set_mechanism_specs(mechanism_specs))
23 changes: 23 additions & 0 deletions tests/budget_accounting_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ def test_compute_budgets(self):
self.assertEqual(budget2.eps, 0.75)
self.assertEqual(budget2.delta, 1e-6)

def test_budget_scopes(self):
budget_accountant = NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-6)

with budget_accountant.scope(weight=0.4):
budget1 = budget_accountant.request_budget(
mechanism_type=MechanismType.LAPLACE)
budget2 = budget_accountant.request_budget(
mechanism_type=MechanismType.LAPLACE, weight=3)

with budget_accountant.scope(weight=0.6):
budget3 = budget_accountant.request_budget(
mechanism_type=MechanismType.LAPLACE)
budget4 = budget_accountant.request_budget(
mechanism_type=MechanismType.LAPLACE, weight=4)

budget_accountant.compute_budgets()

self.assertEqual(budget1.eps, 0.4 * (1 / 4))
self.assertEqual(budget2.eps, 0.4 * (3 / 4))
self.assertEqual(budget3.eps, 0.6 * (1 / 5))
self.assertEqual(budget4.eps, 0.6 * (4 / 5))

def test_count(self):
budget_accountant = NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-6)
Expand Down