Skip to content

Commit

Permalink
Multiple stage names generator
Browse files Browse the repository at this point in the history
  • Loading branch information
jspacek committed Jan 11, 2022
1 parent 8af40fb commit 990743c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 25 deletions.
73 changes: 51 additions & 22 deletions pipeline_dp/pipeline_operations.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
"""Adapters for working with pipeline frameworks."""

from enum import Enum
from functools import partial
import os
import multiprocessing as mp
from pipeline_dp import accumulator
import random
import numpy as np

import abc
import apache_beam as beam
import apache_beam.transforms.combiners as combiners
import collections
import pipeline_dp.accumulator as accumulator
import typing
from typing import Any, Optional, Callable, Tuple
import collections
import itertools

Expand Down Expand Up @@ -95,20 +90,50 @@ def is_serialization_immediate_on_reduce_by_key(self):
return False


class UniqueLabelsGenerator:
"""Generate unique labels for each pipeline aggregation."""

def __init__(self, suffix):
self._labels = set()
self._suffix = suffix

def _add_if_unique(self, label):
if label in self._labels:
return False
self._labels.add(label)
return True

def unique(self, label):
if not label:
label = "UNDEFINED_STAGE_NAME"
suffix_label = label + "_" + self._suffix
if self._add_if_unique(suffix_label):
return suffix_label
for i in itertools.count(1):
label_candidate = f"{label}_{i}_{self._suffix}"
if self._add_if_unique(label_candidate):
return label_candidate


class BeamOperations(PipelineOperations):
"""Apache Beam adapter."""

def __init__(self, suffix: str = ""):
super().__init__()
self._ulg = UniqueLabelsGenerator(suffix)

def map(self, col, fn, stage_name: str):
return col | stage_name >> beam.Map(fn)
return col | self._ulg.unique(stage_name) >> beam.Map(fn)

def flat_map(self, col, fn, stage_name: str):
return col | stage_name >> beam.FlatMap(fn)
return col | self._ulg.unique(stage_name) >> beam.FlatMap(fn)

def map_tuple(self, col, fn, stage_name: str):
return col | stage_name >> beam.Map(lambda x: fn(*x))
return col | self._ulg.unique(stage_name) >> beam.Map(lambda x: fn(*x))

def map_values(self, col, fn, stage_name: str):
return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v)))
return col | self._ulg.unique(stage_name) >> beam.MapTuple(lambda k, v:
(k, fn(v)))

def group_by_key(self, col, stage_name: str):
"""Group the values for each key in the PCollection into a single sequence.
Expand All @@ -121,10 +146,10 @@ def group_by_key(self, col, stage_name: str):
An PCollection of tuples in which the type of the second item is list.
"""
return col | stage_name >> beam.GroupByKey()
return col | self._ulg.unique(stage_name) >> beam.GroupByKey()

def filter(self, col, fn, stage_name: str):
return col | stage_name >> beam.Filter(fn)
return col | self._ulg.unique(stage_name) >> beam.Filter(fn)

def filter_by_key(self, col, keys_to_keep, stage_name: str):

Expand Down Expand Up @@ -154,31 +179,34 @@ def does_keep(pk_val):
# Keys to keep are in memory.
if not isinstance(keys_to_keep, set):
keys_to_keep = set(keys_to_keep)
return col | "Filtering out" >> beam.Filter(does_keep)
return col | self._ulg.unique("Filtering out") >> beam.Filter(
does_keep)

# `keys_to_keep` are not in memory. Filter out with a join.
keys_to_keep = (keys_to_keep |
"Reformat PCollection" >> beam.Map(lambda x: (x, True)))
keys_to_keep = (keys_to_keep | self._ulg.unique("Reformat PCollection")
>> beam.Map(lambda x: (x, True)))
return ({
VALUES: col,
TO_KEEP: keys_to_keep
} | "CoGroup by values and to_keep partition flag " >>
beam.CoGroupByKey() |
"Filtering out" >> beam.ParDo(PartitionsFilterJoin()))
beam.CoGroupByKey() | self._ulg.unique("Partitions Filter Join")
>> beam.ParDo(PartitionsFilterJoin()))

def keys(self, col, stage_name: str):
return col | stage_name >> beam.Keys()
return col | self._ulg.unique(stage_name) >> beam.Keys()

def values(self, col, stage_name: str):
return col | stage_name >> beam.Values()
return col | self._ulg.unique(stage_name) >> beam.Values()

def sample_fixed_per_key(self, col, n: int, stage_name: str):
return col | stage_name >> combiners.Sample.FixedSizePerKey(n)
return col | self._ulg.unique(
stage_name) >> combiners.Sample.FixedSizePerKey(n)

def count_per_element(self, col, stage_name: str):
return col | stage_name >> combiners.Count.PerElement()
return col | self._ulg.unique(
stage_name) >> combiners.Count.PerElement()

def reduce_accumulators_per_key(self, col, stage_name: str = None):
def reduce_accumulators_per_key(self, col, stage_name: str):
# TODO: Use merge function from the accumulator framework.
def merge_accumulators(accumulators):
res = None
Expand All @@ -189,7 +217,8 @@ def merge_accumulators(accumulators):
res = acc
return res

return col | stage_name >> beam.CombinePerKey(merge_accumulators)
return col | self._ulg.unique(stage_name) >> beam.CombinePerKey(
merge_accumulators)


class SparkRDDOperations(PipelineOperations):
Expand Down
2 changes: 1 addition & 1 deletion pipeline_dp/private_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self,
self._sum_params = sum_params

def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
beam_operations = pipeline_dp.BeamOperations()
beam_operations = pipeline_dp.BeamOperations("Expand")
dp_engine = pipeline_dp.DPEngine(self._budget_accountant,
beam_operations)

Expand Down
2 changes: 1 addition & 1 deletion tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def test_run_e2e_beam(self):
input = p | "Create input" >> beam.Create(list(range(10)))

output = self.run_e2e_private_partition_selection_large_budget(
input, pipeline_dp.BeamOperations())
input, pipeline_dp.BeamOperations("e2e_test"))

beam_util.assert_that(output, beam_util.is_not_empty())

Expand Down
60 changes: 59 additions & 1 deletion tests/pipeline_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,72 @@ def test_reduce_accumulators_per_key(self):
(8, 1)])
col = self.ops.map_values(col, SumAccumulator,
"Wrap into accumulators")
col = self.ops.reduce_accumulators_per_key(col)
col = self.ops.reduce_accumulators_per_key(
col, "Reduce accumulators per key")
result = col | "Get accumulated values" >> beam.Map(
lambda row: (row[0], row[1].get_metrics()))

beam_util.assert_that(result,
beam_util.equal_to([(6, 2), (7, 2), (8, 1)]))


class BeamOperationsStageNameTest(unittest.TestCase):

def test_ops_stage_name_must_be_unique(self):
ops_1 = BeamOperations("SAME_OPS_SUFFIX")
ops_2 = BeamOperations("SAME_OPS_SUFFIX")
with test_pipeline.TestPipeline() as p:
col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1),
(6, 2)])
ops_1.map(col, lambda x: x, "SAME_MAP_NAME")
with self.assertRaisesRegex(RuntimeError,
expected_regex="A transform with label "
"\"SAME_MAP_NAME_SAME_OPS_SUFFIX\" "
"already exists in the pipeline"):
ops_2.map(col, lambda x: x, "SAME_MAP_NAME")

def test_one_suffix_multiple_same_stage_name(self):
ops = BeamOperations("UNIQUE_OPS_SUFFIX")
with test_pipeline.TestPipeline() as p:
col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1),
(6, 2)])
ops.map(col, lambda x: x, "SAME_MAP_NAME")
ops.map(col, lambda x: x, "SAME_MAP_NAME")
ops.map(col, lambda x: x, "SAME_MAP_NAME")

self.assertEqual("UNIQUE_OPS_SUFFIX", ops._ulg._suffix)
self.assertEqual(3, len(ops._ulg._labels))
self.assertIn("SAME_MAP_NAME_UNIQUE_OPS_SUFFIX", ops._ulg._labels)
self.assertIn("SAME_MAP_NAME_1_UNIQUE_OPS_SUFFIX", ops._ulg._labels)
self.assertIn("SAME_MAP_NAME_2_UNIQUE_OPS_SUFFIX", ops._ulg._labels)

def test_multiple_suffix_same_ops_stage_name(self):
ops = []
with test_pipeline.TestPipeline() as p:
for i in range(0, 2):
ops.insert(i, BeamOperations(f"UNIQUE_OPS_SUFFIX_{i}"))
col = p | f"UNIQUE_BEAM_CREATE_NAME_{i}" >> beam.Create([(6, 1),
(6, 2)
])
ops[i].map(col, lambda x: x, "SAME_MAP_NAME")
ops[i].map_values(col, lambda x: x, "SAME_MAP_VALUES_NAME")
ops[i].flat_map(col, lambda x: x, "SAME_FLAT_MAP_NAME")
ops[i].map_tuple(col, lambda k, v: k + v,
"SAME_MAP_TUPLES_NAME")
ops[i].group_by_key(col, "SAME_GROUP_BY_KEY_NAME")
ops[i].filter(col, lambda x: True, "SAME_FILTER_NAME")
ops[i].filter_by_key(col, [1], "SAME_FILTER_BY_KEY_NAME")
ops[i].keys(col, "SAME_KEYS_NAME")
ops[i].values(col, "SAME_VALUES_NAME")
ops[i].sample_fixed_per_key(col, 1, "SAME_SAMPLE_NAME")
ops[i].count_per_element(col, "SAME_COUNT_NAME")

self.assertEqual("UNIQUE_OPS_SUFFIX_0", ops[0]._ulg._suffix)
self.assertEqual("UNIQUE_OPS_SUFFIX_1", ops[1]._ulg._suffix)
self.assertEqual(11, len(ops[0]._ulg._labels))
self.assertEqual(11, len(ops[1]._ulg._labels))


@unittest.skipIf(sys.platform == "win32",
"There are some problems with PySpark setup on Windows")
class SparkRDDOperationsTest(parameterized.TestCase):
Expand Down

0 comments on commit 990743c

Please sign in to comment.