Skip to content

Commit

Permalink
Fix CombineGlobally with GlobalWindows (apache#26922)
Browse files Browse the repository at this point in the history
Adds a better error message when this is used in an unsupported way as well as more testing.
  • Loading branch information
liferoad authored Jul 7, 2023
1 parent 7ce2d00 commit 4ba5e43
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
44 changes: 44 additions & 0 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import itertools
import random
import time
import unittest

import hamcrest as hc
Expand All @@ -44,6 +45,7 @@
from apache_beam.transforms.core import Map
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.trigger import AfterAll
from apache_beam.transforms.trigger import AfterCount
Expand Down Expand Up @@ -977,5 +979,47 @@ def test_combiner_latest(self):
label='assert per window')


class CombineGloballyTest(unittest.TestCase):
def test_combine_globally_for_unbounded_source_with_default(self):
# this error is logged since the below combination is ill-defined.
with self.assertLogs() as captured_logs:
with TestPipeline() as p:
_ = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 4,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: ('c', 1))
| beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(2)),
accumulation_mode=trigger.AccumulationMode.DISCARDING,
)
| beam.combiners.Count.Globally())
self.assertIn('unbounded collections', '\n'.join(captured_logs.output))

def test_combine_globally_for_unbounded_source_without_defaults(self):
# this is the supported case
with TestPipeline() as p:
_ = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 4,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: 1)
| beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(2)),
accumulation_mode=trigger.AccumulationMode.DISCARDING,
)
| beam.CombineGlobally(sum).without_defaults())


if __name__ == '__main__':
unittest.main()
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2665,6 +2665,15 @@ def add_input_types(transform):
"or CombineGlobally().as_singleton_view() to get the default "
"output of the CombineFn if the input PCollection is empty.")

# log the error for this ill-defined streaming case now
if not pcoll.is_bounded and not pcoll.windowing.is_default():
_LOGGER.error(
"When combining elements in unbounded collections with "
"the non-default windowing strategy, you must explicitly "
"specify how to define the combined result of an empty window. "
"Please use CombineGlobally().without_defaults() to output "
"an empty PCollection if the input PCollection is empty.")

def typed(transform):
# TODO(robertwb): We should infer this.
if combined.element_type:
Expand All @@ -2676,6 +2685,11 @@ def typed(transform):

def inject_default(_, combined):
if combined:
if len(combined) > 1:
_LOGGER.error(
"Multiple combined values unexpectedly provided"
" for a global combine: %s",
combined)
assert len(combined) == 1
return combined[0]
else:
Expand Down

0 comments on commit 4ba5e43

Please sign in to comment.