Skip to content

Commit

Permalink
poll periodically in kip-345 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuaherrera committed Nov 30, 2023
1 parent 140d115 commit d9bc186
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2151,14 +2151,18 @@ async def test_kip_345_enabled(self):
# partitions after rebalance
all_partitions = frozenset(list(c1_partitions) + list(c2_partitions))
await consumer2.stop()
await asyncio.sleep(25)
# since the timeout has passed, a rebalance should occur
listener1.revoke_mock.assert_called_with(c1_assignment)
listener1.assign_mock.assert_called_with(c1_assignment.union(c2_assignment))
while True:
if (listener1.revoke_mock.call_count > 2
and listener1.assign_mock.call_count > 2):
break
await asyncio.sleep(1)
# this is the last rebalance for consumer1, so the count should now be
# 3.
self.assertEqual(listener1.revoke_mock.call_count, 3)
self.assertEqual(listener1.assign_mock.call_count, 3)
# since the timeout has passed, a rebalance should occur
listener1.revoke_mock.assert_called_with(c1_assignment)
listener1.assign_mock.assert_called_with(c1_assignment.union(c2_assignment))
assert all_partitions == consumer1.assignment()

@kafka_versions('>=2.3.0')
Expand Down Expand Up @@ -2226,7 +2230,11 @@ async def test_kip_345_disabled(self):
# It should since KIP-345 is inactive.
consumer2.unsubscribe()
# need to wait for rebalance
await asyncio.sleep(5)
while True:
if (listener1.revoke_mock.call_count > 2
and listener1.assign_mock.call_count > 2):
break
await asyncio.sleep(1)
self.assertEqual(listener1.revoke_mock.call_count, 3)
self.assertEqual(listener1.assign_mock.call_count, 3)
# ensure that consumer2's assigned partitions
Expand All @@ -2238,14 +2246,24 @@ async def test_kip_345_disabled(self):
await consumer2._subscription.wait_for_assignment()
# since consumer2 rejoins the group, a rebalance should occur
# for both consumers
await asyncio.sleep(5)
while True:
if (listener1.revoke_mock.call_count > 3
and listener1.assign_mock.call_count > 3
and listener2.revoke_mock.call_count > 1
and listener2.assign_mock.call_count > 1):
break
await asyncio.sleep(1)
self.assertEqual(listener2.revoke_mock.call_count, 2)
self.assertEqual(listener2.assign_mock.call_count, 2)
self.assertEqual(listener1.revoke_mock.call_count, 4)
self.assertEqual(listener1.assign_mock.call_count, 4)

# stop consumer2, which will trigger yet another rebalance
await consumer2.stop()
await asyncio.sleep(15)
while True:
if (listener1.revoke_mock.call_count > 4
and listener1.assign_mock.call_count > 4):
break
await asyncio.sleep(1)
self.assertEqual(listener1.revoke_mock.call_count, 5)
self.assertEqual(listener1.assign_mock.call_count, 5)

0 comments on commit d9bc186

Please sign in to comment.