diff --git a/pipeline_dp/pipeline_operations.py b/pipeline_dp/pipeline_operations.py index 3ebf423a..6ef9b651 100644 --- a/pipeline_dp/pipeline_operations.py +++ b/pipeline_dp/pipeline_operations.py @@ -1,20 +1,15 @@ """Adapters for working with pipeline frameworks.""" -from enum import Enum from functools import partial -import os import multiprocessing as mp -from pipeline_dp import accumulator import random import numpy as np import abc import apache_beam as beam import apache_beam.transforms.combiners as combiners -import collections import pipeline_dp.accumulator as accumulator import typing -from typing import Any, Optional, Callable, Tuple import collections import itertools @@ -95,20 +90,50 @@ def is_serialization_immediate_on_reduce_by_key(self): return False +class UniqueLabelsGenerator: + """Generate unique labels for each pipeline aggregation.""" + + def __init__(self, suffix): + self._labels = set() + self._suffix = suffix + + def _add_if_unique(self, label): + if label in self._labels: + return False + self._labels.add(label) + return True + + def unique(self, label): + if not label: + label = "UNDEFINED_STAGE_NAME" + suffix_label = label + "_" + self._suffix + if self._add_if_unique(suffix_label): + return suffix_label + for i in itertools.count(1): + label_candidate = f"{label}_{i}_{self._suffix}" + if self._add_if_unique(label_candidate): + return label_candidate + + class BeamOperations(PipelineOperations): """Apache Beam adapter.""" + def __init__(self, suffix: str = ""): + super().__init__() + self._ulg = UniqueLabelsGenerator(suffix) + def map(self, col, fn, stage_name: str): - return col | stage_name >> beam.Map(fn) + return col | self._ulg.unique(stage_name) >> beam.Map(fn) def flat_map(self, col, fn, stage_name: str): - return col | stage_name >> beam.FlatMap(fn) + return col | self._ulg.unique(stage_name) >> beam.FlatMap(fn) def map_tuple(self, col, fn, stage_name: str): - return col | stage_name >> beam.Map(lambda x: fn(*x)) + return col | self._ulg.unique(stage_name) >> beam.Map(lambda x: fn(*x)) def map_values(self, col, fn, stage_name: str): - return col | stage_name >> beam.MapTuple(lambda k, v: (k, fn(v))) + return col | self._ulg.unique(stage_name) >> beam.MapTuple(lambda k, v: + (k, fn(v))) def group_by_key(self, col, stage_name: str): """Group the values for each key in the PCollection into a single sequence. @@ -121,10 +146,10 @@ def group_by_key(self, col, stage_name: str): An PCollection of tuples in which the type of the second item is list. """ - return col | stage_name >> beam.GroupByKey() + return col | self._ulg.unique(stage_name) >> beam.GroupByKey() def filter(self, col, fn, stage_name: str): - return col | stage_name >> beam.Filter(fn) + return col | self._ulg.unique(stage_name) >> beam.Filter(fn) def filter_by_key(self, col, keys_to_keep, stage_name: str): @@ -154,31 +179,34 @@ def does_keep(pk_val): # Keys to keep are in memory. if not isinstance(keys_to_keep, set): keys_to_keep = set(keys_to_keep) - return col | "Filtering out" >> beam.Filter(does_keep) + return col | self._ulg.unique("Filtering out") >> beam.Filter( + does_keep) # `keys_to_keep` are not in memory. Filter out with a join. - keys_to_keep = (keys_to_keep | - "Reformat PCollection" >> beam.Map(lambda x: (x, True))) + keys_to_keep = (keys_to_keep | self._ulg.unique("Reformat PCollection") + >> beam.Map(lambda x: (x, True))) return ({ VALUES: col, TO_KEEP: keys_to_keep } | "CoGroup by values and to_keep partition flag " >> - beam.CoGroupByKey() | - "Filtering out" >> beam.ParDo(PartitionsFilterJoin())) + beam.CoGroupByKey() | self._ulg.unique("Partitions Filter Join") + >> beam.ParDo(PartitionsFilterJoin())) def keys(self, col, stage_name: str): - return col | stage_name >> beam.Keys() + return col | self._ulg.unique(stage_name) >> beam.Keys() def values(self, col, stage_name: str): - return col | stage_name >> beam.Values() + return col | self._ulg.unique(stage_name) >> beam.Values() def sample_fixed_per_key(self, col, n: int, stage_name: str): - return col | stage_name >> combiners.Sample.FixedSizePerKey(n) + return col | self._ulg.unique( + stage_name) >> combiners.Sample.FixedSizePerKey(n) def count_per_element(self, col, stage_name: str): - return col | stage_name >> combiners.Count.PerElement() + return col | self._ulg.unique( + stage_name) >> combiners.Count.PerElement() - def reduce_accumulators_per_key(self, col, stage_name: str = None): + def reduce_accumulators_per_key(self, col, stage_name: str): # TODO: Use merge function from the accumulator framework. def merge_accumulators(accumulators): res = None @@ -189,7 +217,8 @@ def merge_accumulators(accumulators): res = acc return res - return col | stage_name >> beam.CombinePerKey(merge_accumulators) + return col | self._ulg.unique(stage_name) >> beam.CombinePerKey( + merge_accumulators) class SparkRDDOperations(PipelineOperations): @@ -264,7 +293,7 @@ def sample_fixed_per_key(self, rdd, n: int, stage_name: str = None): def count_per_element(self, rdd, stage_name: str = None): return rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: (x + y)) - def reduce_accumulators_per_key(self, rdd, stage_name: str = None): + def reduce_accumulators_per_key(self, rdd, stage_name: str): return rdd.reduceByKey(lambda acc1, acc2: acc1.add_accumulator(acc2)) def is_serialization_immediate_on_reduce_by_key(self): diff --git a/tests/pipeline_operations_test.py b/tests/pipeline_operations_test.py index 30a625b0..fc77a10a 100644 --- a/tests/pipeline_operations_test.py +++ b/tests/pipeline_operations_test.py @@ -5,6 +5,7 @@ import apache_beam.testing.util as beam_util import pytest import sys +from unittest.mock import Mock, MagicMock, patch from pipeline_dp import DataExtractors from pipeline_dp.pipeline_operations import MultiProcLocalPipelineOperations, SparkRDDOperations @@ -72,7 +73,8 @@ def test_reduce_accumulators_per_key(self): (8, 1)]) col = self.ops.map_values(col, SumAccumulator, "Wrap into accumulators") - col = self.ops.reduce_accumulators_per_key(col) + col = self.ops.reduce_accumulators_per_key( + col, "Reduce accumulators per key") result = col | "Get accumulated values" >> beam.Map( lambda row: (row[0], row[1].get_metrics())) @@ -80,6 +82,128 @@ def test_reduce_accumulators_per_key(self): beam_util.equal_to([(6, 2), (7, 2), (8, 1)])) +class BeamOperationsStageNameTest(unittest.TestCase): + + class MockUniqueLabelGenerators: + + def unique(self, stage_name: str = ""): + return "unique_label" + + @staticmethod + def _create_mock_pcollection(): + mock = Mock() + mock.__or__ = MagicMock(return_value=mock) + return mock + + @staticmethod + def _test_helper(): + mock_pcollection = BeamOperationsStageNameTest._create_mock_pcollection( + ) + ops = BeamOperations() + ops._ulg = BeamOperationsStageNameTest.MockUniqueLabelGenerators() + return mock_pcollection, ops + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_map(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.map(mock_pcollection, lambda x: x, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_map_values(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.map_values(mock_pcollection, lambda x: x, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_flat_map(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.flat_map(mock_pcollection, lambda x: x, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_map_tuple(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.map_tuple(mock_pcollection, lambda x: x, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_group_by_key(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.group_by_key(mock_pcollection, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_filter(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.filter(mock_pcollection, lambda x: True, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_filter_by_key(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.filter_by_key(mock_pcollection, [1], "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_keys(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.keys(mock_pcollection, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_values(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.values(mock_pcollection, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_sample_fixed_per_key(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.sample_fixed_per_key(mock_pcollection, 1, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_count_per_element(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.count_per_element(mock_pcollection, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + @patch("apache_beam.transforms.ptransform.PTransform.__rrshift__") + def test_reduce_accumulators_per_key(self, mock_rrshift): + mock_pcollection, ops = self._test_helper() + ops.reduce_accumulators_per_key(mock_pcollection, "stage_name") + mock_rrshift.assert_called_once_with("unique_label") + + def test_ops_stage_name_must_be_unique(self): + ops_1 = BeamOperations("SAME_OPS_SUFFIX") + ops_2 = BeamOperations("SAME_OPS_SUFFIX") + with test_pipeline.TestPipeline() as p: + col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), + (6, 2)]) + ops_1.map(col, lambda x: x, "SAME_MAP_NAME") + with self.assertRaisesRegex(RuntimeError, + expected_regex="A transform with label " + "\"SAME_MAP_NAME_SAME_OPS_SUFFIX\" " + "already exists in the pipeline"): + ops_2.map(col, lambda x: x, "SAME_MAP_NAME") + + def test_one_suffix_multiple_same_stage_name(self): + ops = BeamOperations("UNIQUE_OPS_SUFFIX") + with test_pipeline.TestPipeline() as p: + col = p | f"UNIQUE_BEAM_CREATE_NAME" >> beam.Create([(6, 1), + (6, 2)]) + ops.map(col, lambda x: x, "SAME_MAP_NAME") + ops.map(col, lambda x: x, "SAME_MAP_NAME") + ops.map(col, lambda x: x, "SAME_MAP_NAME") + + self.assertEqual("UNIQUE_OPS_SUFFIX", ops._ulg._suffix) + self.assertEqual(3, len(ops._ulg._labels)) + self.assertIn("SAME_MAP_NAME_UNIQUE_OPS_SUFFIX", ops._ulg._labels) + self.assertIn("SAME_MAP_NAME_1_UNIQUE_OPS_SUFFIX", ops._ulg._labels) + self.assertIn("SAME_MAP_NAME_2_UNIQUE_OPS_SUFFIX", ops._ulg._labels) + + @unittest.skipIf(sys.platform == "win32" or sys.platform == 'darwin' or ( sys.version_info.minor <= 7 and sys.version_info.major == 3 ), "There are some problems with PySpark setup on older python and Windows and macOS"