Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local map operation #16

Merged
merged 1 commit into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pipeline_dp/pipeline_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def count_per_element(self, col, stage_name: str):
class LocalPipelineOperations(PipelineOperations):
"""Local Pipeline adapter."""

def map(self, col, fn, stage_name: str):
pass
def map(self, col, fn, stage_name: str = None):
return map(fn, col)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the order is reversed: fn is passed before col. That's how map works in most languages, so current order of the arguments in PipelineOperations.map may be confusing to people. Can we reverse it or should col stay as the first argument? Same thing with filter: function is passed before iterable to Python's filter.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question! I think this is a common thing that different things are different in different languages/frameworks :)

For example in Spark and Beam collection is before fn in statement:

Beam: col | beam.Map(fn)
Spark: col.Map(fn)

Another thing is that for all PipelineOperations method col is always an argument, fn isn't (eg. in Keys()), so it's more uniform to have col always as the 1st argument.


def map_tuple(self, col, fn, stage_name: str):
pass
Expand Down
Empty file added tests/__init__.py
Empty file.
17 changes: 15 additions & 2 deletions tests/pipeline_operations_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import unittest

import pipeline_dp
from pipeline_dp.pipeline_operations import LocalPipelineOperations


class PipelineOperationsTest(unittest.TestCase):
pass

class LocalPipelineOperationsTest(unittest.TestCase):
pass
@classmethod
def setUpClass(cls):
cls.ops = LocalPipelineOperations()

def test_local_map(self):
some_map = self.ops.map([1,2,3], lambda x: x)
# some_map is its own consumable iterator
self.assertIs(some_map, iter(some_map))

self.assertEqual(list(self.ops.map([1,2,3], str)),
["1", "2", "3"])
self.assertEqual(list(self.ops.map(range(5), lambda x: x ** 2)),
[0, 1, 4, 9, 16])

if __name__ == '__main__':
unittest.main()