Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple stage names generator #128

Merged
merged 3 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 52 additions & 23 deletions pipeline_dp/pipeline_operations.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
"""Adapters for working with pipeline frameworks."""

from enum import Enum
from functools import partial
import os
import multiprocessing as mp
from pipeline_dp import accumulator
import random
import numpy as np

import abc
import apache_beam as beam
import apache_beam.transforms.combiners as combiners
import collections
import pipeline_dp.accumulator as accumulator
import typing
from typing import Any, Optional, Callable, Tuple
import collections
import itertools

Expand Down Expand Up @@ -95,20 +90,50 @@ def is_serialization_immediate_on_reduce_by_key(self):
return False


class UniqueLabelsGenerator:
"""Generate unique labels for each pipeline aggregation."""

def __init__(self, suffix):
self._labels = set()
self._suffix = suffix

def _add_if_unique(self, label):
if label in self._labels:
return False
self._labels.add(label)
return True

def unique(self, label):
if not label:
label = "UNDEFINED_STAGE_NAME"
suffix_label = label + "_" + self._suffix
if self._add_if_unique(suffix_label):
return suffix_label
for i in itertools.count(1):
label_candidate = f"{label}_{i}_{self._suffix}"
if self._add_if_unique(label_candidate):
return label_candidate


class BeamOperations(PipelineOperations):
"""Apache Beam adapter."""

def __init__(self, suffix: str = ""):
super().__init__()
self._ulg = UniqueLabelsGenerator(suffix)

def map(self, col, fn, stage_name: str):
return col | stage_name >> beam.Map(fn)
return col | self._ulg.unique(stage_name) >> beam.Map(fn)

def flat_map(self, col, fn, stage_name: str):
return col | stage_name >> beam.FlatMap(fn)
return col | self._ulg.unique(stage_name) >> beam.FlatMap(fn)

def map_tuple(self, col, fn, stage_name: str):
return col | stage_name >> beam.Map(lambda x: fn(*x))
return col | self._ulg.unique(stage_name) >> beam.Map(lambda x: fn(*x))

def map_values(self, col, fn, stage_name: str):
return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v)))
return col | self._ulg.unique(stage_name) >> beam.MapTuple(lambda k, v:
(k, fn(v)))

def group_by_key(self, col, stage_name: str):
"""Group the values for each key in the PCollection into a single sequence.
Expand All @@ -121,10 +146,10 @@ def group_by_key(self, col, stage_name: str):
An PCollection of tuples in which the type of the second item is list.

"""
return col | stage_name >> beam.GroupByKey()
return col | self._ulg.unique(stage_name) >> beam.GroupByKey()

def filter(self, col, fn, stage_name: str):
return col | stage_name >> beam.Filter(fn)
return col | self._ulg.unique(stage_name) >> beam.Filter(fn)

def filter_by_key(self, col, keys_to_keep, stage_name: str):

Expand Down Expand Up @@ -154,31 +179,34 @@ def does_keep(pk_val):
# Keys to keep are in memory.
if not isinstance(keys_to_keep, set):
keys_to_keep = set(keys_to_keep)
return col | "Filtering out" >> beam.Filter(does_keep)
return col | self._ulg.unique("Filtering out") >> beam.Filter(
does_keep)

# `keys_to_keep` are not in memory. Filter out with a join.
keys_to_keep = (keys_to_keep |
"Reformat PCollection" >> beam.Map(lambda x: (x, True)))
keys_to_keep = (keys_to_keep | self._ulg.unique("Reformat PCollection")
>> beam.Map(lambda x: (x, True)))
return ({
VALUES: col,
TO_KEEP: keys_to_keep
} | "CoGroup by values and to_keep partition flag " >>
beam.CoGroupByKey() |
"Filtering out" >> beam.ParDo(PartitionsFilterJoin()))
beam.CoGroupByKey() | self._ulg.unique("Partitions Filter Join")
>> beam.ParDo(PartitionsFilterJoin()))

def keys(self, col, stage_name: str):
return col | stage_name >> beam.Keys()
return col | self._ulg.unique(stage_name) >> beam.Keys()

def values(self, col, stage_name: str):
return col | stage_name >> beam.Values()
return col | self._ulg.unique(stage_name) >> beam.Values()

def sample_fixed_per_key(self, col, n: int, stage_name: str):
return col | stage_name >> combiners.Sample.FixedSizePerKey(n)
return col | self._ulg.unique(
stage_name) >> combiners.Sample.FixedSizePerKey(n)

def count_per_element(self, col, stage_name: str):
return col | stage_name >> combiners.Count.PerElement()
return col | self._ulg.unique(
stage_name) >> combiners.Count.PerElement()

def reduce_accumulators_per_key(self, col, stage_name: str = None):
def reduce_accumulators_per_key(self, col, stage_name: str):
# TODO: Use merge function from the accumulator framework.
def merge_accumulators(accumulators):
res = None
Expand All @@ -189,7 +217,8 @@ def merge_accumulators(accumulators):
res = acc
return res

return col | stage_name >> beam.CombinePerKey(merge_accumulators)
return col | self._ulg.unique(stage_name) >> beam.CombinePerKey(
merge_accumulators)


class SparkRDDOperations(PipelineOperations):
Expand Down Expand Up @@ -264,7 +293,7 @@ def sample_fixed_per_key(self, rdd, n: int, stage_name: str = None):
def count_per_element(self, rdd, stage_name: str = None):
return rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: (x + y))

def reduce_accumulators_per_key(self, rdd, stage_name: str = None):
def reduce_accumulators_per_key(self, rdd, stage_name: str):
return rdd.reduceByKey(lambda acc1, acc2: acc1.add_accumulator(acc2))

def is_serialization_immediate_on_reduce_by_key(self):
Expand Down
126 changes: 125 additions & 1 deletion tests/pipeline_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import apache_beam.testing.util as beam_util
import pytest
import sys
from unittest.mock import Mock, MagicMock, patch

from pipeline_dp import DataExtractors
from pipeline_dp.pipeline_operations import MultiProcLocalPipelineOperations, SparkRDDOperations
Expand Down Expand Up @@ -72,14 +73,137 @@ def test_reduce_accumulators_per_key(self):
(8, 1)])
col = self.ops.map_values(col, SumAccumulator,
"Wrap into accumulators")
col = self.ops.reduce_accumulators_per_key(col)
col = self.ops.reduce_accumulators_per_key(
col, "Reduce accumulators per key")
result = col | "Get accumulated values" >> beam.Map(
lambda row: (row[0], row[1].get_metrics()))

beam_util.assert_that(result,
beam_util.equal_to([(6, 2), (7, 2), (8, 1)]))


class BeamOperationsStageNameTest(unittest.TestCase):

class MockUniqueLabelGenerators:

def unique(self, stage_name: str = ""):
return "unique_label"

@staticmethod
def _create_mock_pcollection():
mock = Mock()
mock.__or__ = MagicMock(return_value=mock)
return mock

@staticmethod
def _test_helper():
mock_pcollection = BeamOperationsStageNameTest._create_mock_pcollection(
)
ops = BeamOperations()
ops._ulg = BeamOperationsStageNameTest.MockUniqueLabelGenerators()
return mock_pcollection, ops

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_map(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.map(mock_pcollection, lambda x: x, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_map_values(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.map_values(mock_pcollection, lambda x: x, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_flat_map(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.flat_map(mock_pcollection, lambda x: x, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_map_tuple(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.map_tuple(mock_pcollection, lambda x: x, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_group_by_key(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.group_by_key(mock_pcollection, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_filter(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.filter(mock_pcollection, lambda x: True, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_filter_by_key(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.filter_by_key(mock_pcollection, [1], "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_keys(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.keys(mock_pcollection, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_values(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.values(mock_pcollection, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_sample_fixed_per_key(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.sample_fixed_per_key(mock_pcollection, 1, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_count_per_element(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.count_per_element(mock_pcollection, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

@patch("apache_beam.transforms.ptransform.PTransform.__rrshift__")
def test_reduce_accumulators_per_key(self, mock_rrshift):
mock_pcollection, ops = self._test_helper()
ops.reduce_accumulators_per_key(mock_pcollection, "stage_name")
mock_rrshift.assert_called_once_with("unique_label")

def test_ops_stage_name_must_be_unique(self):
jspacek marked this conversation as resolved.
Show resolved Hide resolved
ops_1 = BeamOperations("SAME_OPS_SUFFIX")
ops_2 = BeamOperations("SAME_OPS_SUFFIX")
with test_pipeline.TestPipeline() as p:
col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1),
(6, 2)])
ops_1.map(col, lambda x: x, "SAME_MAP_NAME")
with self.assertRaisesRegex(RuntimeError,
expected_regex="A transform with label "
"\"SAME_MAP_NAME_SAME_OPS_SUFFIX\" "
"already exists in the pipeline"):
ops_2.map(col, lambda x: x, "SAME_MAP_NAME")

def test_one_suffix_multiple_same_stage_name(self):
ops = BeamOperations("UNIQUE_OPS_SUFFIX")
with test_pipeline.TestPipeline() as p:
col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1),
(6, 2)])
ops.map(col, lambda x: x, "SAME_MAP_NAME")
ops.map(col, lambda x: x, "SAME_MAP_NAME")
ops.map(col, lambda x: x, "SAME_MAP_NAME")

self.assertEqual("UNIQUE_OPS_SUFFIX", ops._ulg._suffix)
self.assertEqual(3, len(ops._ulg._labels))
self.assertIn("SAME_MAP_NAME_UNIQUE_OPS_SUFFIX", ops._ulg._labels)
self.assertIn("SAME_MAP_NAME_1_UNIQUE_OPS_SUFFIX", ops._ulg._labels)
self.assertIn("SAME_MAP_NAME_2_UNIQUE_OPS_SUFFIX", ops._ulg._labels)


@unittest.skipIf(sys.platform == "win32" or sys.platform == 'darwin' or (
sys.version_info.minor <= 7 and sys.version_info.major == 3
), "There are some problems with PySpark setup on older python and Windows and macOS"
Expand Down