Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10856] Support for NestedValueProvider for Python SDK #12779

Merged
merged 11 commits into from
Dec 1, 2020

Conversation

epicfaace
Copy link
Contributor

@epicfaace epicfaace commented Sep 6, 2020

Support for NestedValueProvider for Python SDK. I've also added more comments to the pydoc for value_provider classes. R: @pabloem


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@pabloem
Copy link
Member

pabloem commented Sep 8, 2020

Thanks @epicfaace for implementing these! I will not be able to review them for a couple of weeks. I can review once I'm back, or I can help find a new reviewer. What do you prefer?
(tvalentyn, udim) would be good choices for reviewers if you'd like a review soon.

@epicfaace
Copy link
Contributor Author

Thanks for the heads up @pabloem . @tvalentyn , would you be able to review?

@epicfaace
Copy link
Contributor Author

By the way, how do you normally lint your code according to the Beam guidelines / PR checks? I couldn't find any guidance / a quick reference in the Beam development guide on how to do that.

@tvalentyn
Copy link
Contributor

https://cwiki.apache.org/confluence/display/BEAM/Python+Tips has some autoformatter tips that can help.

@tvalentyn
Copy link
Contributor

@epicfaace Curious, what use-case do you have in mind for this change?

@epicfaace
Copy link
Contributor Author

@epicfaace Curious, what use-case do you have in mind for this change?

I'm creating a Dataflow template in which I need to write to multiple different output tables, "[prefix]_a", "[prefix]_b", "[prefix]_c", etc. I'd like to be able to specify a single runtime parameter, "prefix", and derive the output table names from this parameter, rather than having to redundantly specify each table name as an input runtime parameter.

I noticed that this page said that the Apache Beam SDK for Python does not support NestedValueProvider, so I just implemented it in Python in a similar way to how the Java SDK does it.

@tvalentyn
Copy link
Contributor

tvalentyn commented Sep 11, 2020

Ok. We actually consider ValueProvider's an anti-pattern now, prefer not to extend their scope, and recommend to use FlexTemplates instead, see: https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates. Could you give that a try? I do appreciate your effort to improve the SDK and documentation though!

CC: @azurezyq who is an expert on Flex Templates.

@epicfaace
Copy link
Contributor Author

epicfaace commented Sep 11, 2020

Hmm, I did try using Flex Templates initially, but it took way too long -- maybe 7-8 minutes -- for the job to initialize, and it appears that the bottleneck is with was installing apache-beam and other pip dependencies. I switched to using ValueProviders because I needed a quick turnaround time for the job (the job itself takes around 10 minutes, so a 7-8 minute additional startup time is not worth it for me).

@tvalentyn
Copy link
Contributor

I see, I'd like @arvindram03 and @azurezyq chime in here on their recommendation. I think there are undergoing efforts to reduce the startup time, or perhaps there are some tips to reduce it?

@arvindram03
Copy link
Contributor

Hi @epicfaace ,

We agree the startup time with flex template python jobs is an issue. We are are working towards reducing the overall startup time to 3-4 mins by end of this year. We recommend using flex templates instead of classic ones purely for flexibility and also you can leverage more feature expansions in the near future.

One of the major objective of flex templates is to avoid using ValueProviders. So, further overloading them with new features is not aligned with the roadmap.

@epicfaace
Copy link
Contributor Author

@arvindram03 , thanks for the update. To clarify, this PR only aims for feature parity with the Java SDK, which already supports NestedValueProvider -- I'm not "overloading them with new features".

Additionally, Flex Templates is still a Pre-GA Offering, and as you mentioned, requires some work for it to be as efficient as regular templates. For these reasons, I don't think they're really a viable alternative to traditional templates at the moment (of course, with additional improvements by the end of the year, it could be!)

@pabloem
Copy link
Member

pabloem commented Sep 14, 2020

A user has written a feature that they would find useful, and that will not change the experience for other users (if anything, it should improve it). The feature looks correct, and similar to what we do in Java. If we reject the PR, we may push the user to run on a fork. Can we let this in? @tvalentyn
Flex templates have numerous benefits that are not being undone or discounted by adding this improvement to traditional templates, so I don't see a great disadvantage to merge this.

@tvalentyn
Copy link
Contributor

tvalentyn commented Sep 15, 2020

A user has written a feature that they would find useful, and that will not change the experience for other users (if anything, it should improve it). The feature looks correct, and similar to what we do in Java. If we reject the PR, we may push the user to run on a fork. Can we let this in? @tvalentyn

I agree with this assessment, feel free to merge once tests & linter pass.

@codecov
Copy link

codecov bot commented Sep 15, 2020

Codecov Report

Merging #12779 (af2c14c) into master (3d6cc0e) will decrease coverage by 0.20%.
The diff coverage is 93.33%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #12779      +/-   ##
==========================================
- Coverage   82.48%   82.28%   -0.21%     
==========================================
  Files         455      451       -4     
  Lines       54876    53738    -1138     
==========================================
- Hits        45266    44217    -1049     
+ Misses       9610     9521      -89     
Impacted Files Coverage Δ
sdks/python/apache_beam/options/value_provider.py 91.76% <93.33%> (+0.21%) ⬆️
sdks/python/apache_beam/utils/profiler.py 32.11% <0.00%> (-54.35%) ⬇️
sdks/python/apache_beam/utils/interactive_utils.py 83.33% <0.00%> (-9.53%) ⬇️
...eam/runners/interactive/options/capture_control.py 92.00% <0.00%> (-8.00%) ⬇️
conftest.py 77.77% <0.00%> (-7.94%) ⬇️
...s/snippets/transforms/aggregation/combinevalues.py 87.36% <0.00%> (-7.37%) ⬇️
sdks/python/apache_beam/__init__.py 80.00% <0.00%> (-5.72%) ⬇️
sdks/python/apache_beam/dataframe/partitionings.py 83.60% <0.00%> (-5.44%) ⬇️
.../python/apache_beam/io/gcp/bigquery_io_metadata.py 86.95% <0.00%> (-3.67%) ⬇️
...pache_beam/runners/interactive/interactive_beam.py 76.02% <0.00%> (-3.51%) ⬇️
... and 87 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1a34854...912e912. Read the comment docs.

@pabloem
Copy link
Member

pabloem commented Sep 16, 2020

@epicfaace can you ensure PythonDocs and PythonLint checks pass?

@pabloem
Copy link
Member

pabloem commented Sep 29, 2020

@epicfaace LMK if you can take a look at the lint issues

@aaltay
Copy link
Member

aaltay commented Oct 8, 2020

What is the next step on this PR?

@pabloem
Copy link
Member

pabloem commented Oct 8, 2020

@epicfaace there's a number of failing tests. (Python Docs and Lint) - can you fix those?

@nikie
Copy link
Contributor

nikie commented Nov 15, 2020

@epicfaace @pabloem
I have opened PR into @epicfaace 's branch to fix pylint errors.

@nikie
Copy link
Contributor

nikie commented Nov 15, 2020

@tvalentyn
Another possible use case for NestedValueProvider is to fetch external secrets based on provided values from, for example, Vault.

@nikie
Copy link
Contributor

nikie commented Nov 16, 2020

@epicfaace
I am not sure why the tests are still failing.
Rebasing against current master might help.

Fixed in yet another pr: epicfaace#2

@nikie
Copy link
Contributor

nikie commented Nov 20, 2020

@pabloem, Python Docs and Lint tests now succeed.

@epicfaace
Copy link
Contributor Author

epicfaace commented Nov 22, 2020

@pabloem I'm not quite sure why CI is still failing, it gives this error:

    return self.do_fn_invoker.invoke_process(windowed_value)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 570, in invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 1374, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\worker\operations.py", line 220, in receive
    self.consumer.process(windowed_value)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\worker\operations.py", line 718, in process
    delayed_application = self.dofn_runner.process(o)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 1215, in process
    self._reraise_augmented(exn)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 1294, in _reraise_augmented
    raise_with_traceback(new_exn)
  File "d:\a\beam\beam\sdks\python\target\.tox\py36-win\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 1213, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 743, in invoke_process
    windowed_value, additional_args, additional_kwargs)
  File "D:\a\beam\beam\sdks\python\apache_beam\runners\common.py", line 867, in _invoke_process_per_window
    self.process_method(*args_for_process),
  File "D:\a\beam\beam\sdks\python\apache_beam\transforms\core.py", line 1669, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "D:\a\beam\beam\sdks\python\apache_beam\testing\util.py", line 197, in _equal
    raise BeamAssertException(msg)
apache_beam.testing.util.BeamAssertException: Failed assert: ['a'] == ['a', 'b'], unexpected elements ['b'] [while running 'assert_that/Match']

Any ideas? Or is the CI just flaky?

@aaltay
Copy link
Member

aaltay commented Nov 23, 2020

BeamAssertException

This one looks like a test on the asserts.

The actual error (which is likely a flake) is:

logs: https://github.com/apache/beam/pull/12779/checks?check_run_id=1433935136

apache_beam\runners\interactive\recording_manager_test.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam\runners\interactive\interactive_environment.py:118: in new_env
    _interactive_beam_env.cleanup()
apache_beam\runners\interactive\interactive_environment.py:272: in cleanup
    cache_manager.cleanup()
apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup
    shutil.rmtree(self._cache_dir)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:500: in rmtree
    return _rmtree_unsafe(path, onerror)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:390: in _rmtree_unsafe
    _rmtree_unsafe(fullname, onerror)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:395: in _rmtree_unsafe
    onerror(os.unlink, fullname, sys.exc_info())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

...

>                   os.unlink(fullname)
E                   PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py36-win\\tmp\\it-4m1c1oje2145793178144\\full\\fb91a47796-2145832985040-2145832986608-2145793178144'

Maybe clean should catch the error and not fail? Or try again?

@kevingg - Could you check this error?

@pabloem @epicfaace - I believe you can ignore this error for the purposes of this PR.

@nika-qubit
Copy link
Contributor

BeamAssertException

This one looks like a test on the asserts.

The actual error (which is likely a flake) is:

logs: https://github.com/apache/beam/pull/12779/checks?check_run_id=1433935136

apache_beam\runners\interactive\recording_manager_test.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam\runners\interactive\interactive_environment.py:118: in new_env
    _interactive_beam_env.cleanup()
apache_beam\runners\interactive\interactive_environment.py:272: in cleanup
    cache_manager.cleanup()
apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup
    shutil.rmtree(self._cache_dir)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:500: in rmtree
    return _rmtree_unsafe(path, onerror)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:390: in _rmtree_unsafe
    _rmtree_unsafe(fullname, onerror)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:395: in _rmtree_unsafe
    onerror(os.unlink, fullname, sys.exc_info())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

...

>                   os.unlink(fullname)
E                   PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py36-win\\tmp\\it-4m1c1oje2145793178144\\full\\fb91a47796-2145832985040-2145832986608-2145793178144'

Maybe clean should catch the error and not fail? Or try again?

@kevingg - Could you check this error?

The problem is windows specific. I tried on Mac OS, the shutil.rmtree works even if the file handler is still open:
Screen Shot 2020-11-23 at 5 11 51 PM
It's said that on windows, if a file is opened, even if in the same process deleting it (or its parent directory), you will see this error: https://stackoverflow.com/questions/27215462/permissionerror-winerror-32-the-process-cannot-access-the-file-because-it-is.

Potential breaking point is in the streaming cache where the open and close are explicitly invoked (where with is inappropriate to use):
Screen Shot 2020-11-23 at 5 44 53 PM
Is it possible that a finish_bundle would fail silently or not executed at all?

This probably does not happen that often, we can revisit this if it happens frequently and ignore this by specifying shutil.rmtree(path, ignore_errors=True, onerror=...) since it's just failing to clean up useless temp files.

@aaltay
Copy link
Member

aaltay commented Nov 24, 2020

BeamAssertException

This one looks like a test on the asserts.
The actual error (which is likely a flake) is:
logs: #12779 (checks)

apache_beam\runners\interactive\recording_manager_test.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam\runners\interactive\interactive_environment.py:118: in new_env
    _interactive_beam_env.cleanup()
apache_beam\runners\interactive\interactive_environment.py:272: in cleanup
    cache_manager.cleanup()
apache_beam\runners\interactive\caching\streaming_cache.py:391: in cleanup
    shutil.rmtree(self._cache_dir)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:500: in rmtree
    return _rmtree_unsafe(path, onerror)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:390: in _rmtree_unsafe
    _rmtree_unsafe(fullname, onerror)
c:\hostedtoolcache\windows\python\3.6.8\x64\lib\shutil.py:395: in _rmtree_unsafe
    onerror(os.unlink, fullname, sys.exc_info())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

...

>                   os.unlink(fullname)
E                   PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'D:\\a\\beam\\beam\\sdks\\python\\target\\.tox\\py36-win\\tmp\\it-4m1c1oje2145793178144\\full\\fb91a47796-2145832985040-2145832986608-2145793178144'

Maybe clean should catch the error and not fail? Or try again?
@kevingg - Could you check this error?

The problem is windows specific. I tried on Mac OS, the shutil.rmtree works even if the file handler is still open:
Screen Shot 2020-11-23 at 5 11 51 PM
It's said that on windows, if a file is opened, even if in the same process deleting it (or its parent directory), you will see this error: stackoverflow.com/questions/27215462/permissionerror-winerror-32-the-process-cannot-access-the-file-because-it-is.

Potential breaking point is in the streaming cache where the open and close are explicitly invoked (where with is inappropriate to use):
Screen Shot 2020-11-23 at 5 44 53 PM
Is it possible that a finish_bundle would fail silently or not executed at all?

This probably does not happen that often, we can revisit this if it happens frequently and ignore this by specifying shutil.rmtree(path, ignore_errors=True, onerror=...) since it's just failing to clean up useless temp files.

Maybe file a JIRA and we can track how often this happens? I do not think people triage all failures so it is hard to tell how frequently this happens in pre commit tests.

@nika-qubit
Copy link
Contributor

Maybe file a JIRA and we can track how often this happens? I do not think people triage all failures so it is hard to tell how frequently this happens in pre commit tests.

Filed BEAM-11339 for this.

@pabloem pabloem merged commit 87f3138 into apache:master Dec 1, 2020
@pabloem
Copy link
Member

pabloem commented Dec 1, 2020

thank you @epicfaace !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants