Skip to content

Commit

Permalink
Clean up in test
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqian90 committed May 28, 2021
1 parent de7041c commit e86e079
Showing 1 changed file with 31 additions and 46 deletions.
77 changes: 31 additions & 46 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,64 +493,49 @@ class MockTask:
here because it's not picklable.
"""

def __init__(self, duration=None):
self.duration = duration

def apply_async(self, *args, **kwargs):
import time
return 1

if self.duration:
time.sleep(self.duration)

return 1
def _exit_gracefully(signum, _):
print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
sys.exit(signum)


@pytest.fixture
def register_signals():
"""
Register the same signals as scheduler does to test celery_executor to make sure it does not
hang.
"""

print(f"{os.getpid()} register_signals()")

def test_send_tasks_to_celery_hang():
def _exit_gracefully(signum, _):
print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
sys.exit(signum)
orig_sigint = orig_sigterm = orig_sigusr2 = signal.SIG_DFL

def register_signals():
"""Register signals that stop child processes"""
orig_sigint = signal.signal(signal.SIGINT, _exit_gracefully)
orig_sigterm = signal.signal(signal.SIGTERM, _exit_gracefully)
orig_sigusr2 = signal.signal(signal.SIGUSR2, _exit_gracefully)

print(f"{os.getpid()} register_signals()")
yield

signal.signal(signal.SIGINT, _exit_gracefully)
signal.signal(signal.SIGTERM, _exit_gracefully)
signal.signal(signal.SIGUSR2, _exit_gracefully)
# Restore original signal handlers after test
signal.signal(signal.SIGINT, orig_sigint)
signal.signal(signal.SIGTERM, orig_sigterm)
signal.signal(signal.SIGUSR2, orig_sigusr2)

register_signals()

def test_send_tasks_to_celery_hang(register_signals): # pylint: disable=unused-argument
"""
Test that celery_executor does not hang after many runs.
"""
executor = celery_executor.CeleryExecutor()
task_tuples_to_send = [
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
(None, None, None, None, MockTask()),
]

task = MockTask()
task_tuples_to_send = [(None, None, None, None, task) for _ in range(26)]

for _ in range(500):
# This loop can hang on Linux if celery_executor does something wrong with
# multiprocessing.
results = executor._send_tasks_to_celery(task_tuples_to_send)
assert results == [(None, None, 1) for _ in task_tuples_to_send]

0 comments on commit e86e079

Please sign in to comment.