Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ErrorHandler DLQ API to Python #31856

Merged
merged 8 commits into from
Aug 21, 2024
Merged

Conversation

robertwb
Copy link
Contributor

It recently came to my attention that this was only added to Java.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@robertwb
Copy link
Contributor Author

R: @johnjcasey

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while pending @johnjcasey review also went through once and mostly LGTM

a fluent manner, disaggregating the error processing specification from
the main processing chain.

They is typically used as follows::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: They

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@robertwb
Copy link
Contributor Author

robertwb commented Aug 1, 2024

Thanks. Any other thoughts on this, @johnjcasey ?

@robertwb
Copy link
Contributor Author

robertwb commented Aug 6, 2024

Another ping on this.

@Abacn
Copy link
Contributor

Abacn commented Aug 13, 2024

would be nice to get this in by next release. For past contributor/reviewers on this topic, maybe @bzablocki (who reviewed #27145) could take another look if needed?

Copy link
Contributor

@bzablocki bzablocki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small comments, LGTM overall

sdks/python/apache_beam/transforms/error_handling.py Outdated Show resolved Hide resolved
with beam.Pipeline() as p:
pcoll = p | beam.Create(['a', 'bb', 'cccc'])
with error_handling.ErrorHandler(
beam.Map(lambda x: "error: %s" % x[0])) as error_handler:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we access here the first element in the array (Map(lambda x: "error: %s" % x[0]))) and in the test above we just access the entire element (Map(lambda x: "error: %s" % x))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTransformWithErrors.with_error_handling() returns a PCollection of elements as errors, whereas the standard ParDo(...).with_error_handling() returns the Python equivalent of bad records that attach the bad elements to the exception thrown.

assert_that(result, equal_to(['A', 'Bb']), label='CheckGood')
assert_that(error_pcoll, equal_to(['cccc']), label='CheckBad')

def test_error_on_collecting_error_handler_without_output_retrieval(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also add a unit test with the CollectingErrorHandler that is closed but not consumed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is closed (due to the context) but not consumed.

timeout,
error_handler)

def with_error_handler(self, error_handler, **exception_handling_kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also have a unit test that would show how the **exception_handling_kwargs can be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Done.

Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

@robertwb robertwb merged commit b3a874f into apache:master Aug 21, 2024
89 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants