Skip to content

Commit

Permalink
Add test of with_exception_handling side effects.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Aug 20, 2024
1 parent 049e4b3 commit 36e5eff
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions sdks/python/apache_beam/transforms/error_handling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,32 @@ def test_error_handling_pardo(self):
assert_that(result, equal_to(['A', 'Bb']), label='CheckGood')
assert_that(error_pcoll, equal_to(['error: cccc']), label='CheckBad')

def test_error_handling_pardo_with_exception_handling_kwargs(self):
def side_effect(*args):
beam._test_error_handling_pardo_with_exception_handling_kwargs_val = True

def check_side_effect():
return getattr(
beam,
'_test_error_handling_pardo_with_exception_handling_kwargs_val',
False)

self.assertFalse(check_side_effect())

with beam.Pipeline() as p:
pcoll = p | beam.Create(['a', 'bb', 'cccc'])
with error_handling.ErrorHandler(
beam.Map(lambda x: "error: %s" % x[0])) as error_handler:
result = pcoll | beam.Map(
exception_throwing_map, limit=3).with_error_handler(
error_handler, on_failure_callback=side_effect)
error_pcoll = error_handler.output()

assert_that(result, equal_to(['A', 'Bb']), label='CheckGood')
assert_that(error_pcoll, equal_to(['error: cccc']), label='CheckBad')

self.assertTrue(check_side_effect())

def test_error_on_unclosed_error_handler(self):
with self.assertRaisesRegex(RuntimeError, r'.*Unclosed error handler.*'):
with beam.Pipeline() as p:
Expand Down

0 comments on commit 36e5eff

Please sign in to comment.