Skip to content

Commit

Permalink
Handle thread safety wrt the messaging API in callbacks
Browse files Browse the repository at this point in the history
The message callbacks are run in a thread by Twisted, and must therefore
call the messaging API with the proper Twisted wrapper to ensure thread
safety.

See: https://fedora-messaging.readthedocs.io/en/stable/consuming.html#synchronous-and-asynchronous-calls

Signed-off-by: Aurélien Bompard <aurelien@bompard.org>
  • Loading branch information
abompard authored and bowlofeggs committed May 1, 2019
1 parent 7255a1c commit 5a456d8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
58 changes: 41 additions & 17 deletions bodhi/server/consumers/composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import jinja2
import fedora_messaging
from twisted.internet import reactor
from twisted.internet.threads import blockingCallFromThread

from bodhi.messages.schemas import compose as compose_schemas, update as update_schemas
from bodhi.server import bugs, initialize_db, buildsys, notifications, mail
Expand Down Expand Up @@ -184,8 +186,13 @@ def __call__(self, message: fedora_messaging.api.Message):
message = message.body
resume = message.get('resume', False)
agent = message.get('agent')
notifications.publish(compose_schemas.ComposeStartV1.from_dict(dict(agent=agent)),
force=True)
# This callback is run in a thread by Twisted, and must therefore call
# the messaging API with the proper Twisted wrapper to ensure thread
# safety.
blockingCallFromThread(
reactor, notifications.publish,
compose_schemas.ComposeStartV1.from_dict(dict(agent=agent)),
force=True)

results = []
threads = []
Expand Down Expand Up @@ -355,11 +362,14 @@ def work(self):

log.info('Running ComposerThread(%s)' % self.id)

notifications.publish(compose_schemas.ComposeComposingV1.from_dict(
dict(repo=self.id,
updates=[' '.join([b.nvr for b in u.builds]) for u in self.compose.updates],
agent=self.agent,
ctype=self.ctype.value)),
blockingCallFromThread(
reactor,
notifications.publish,
compose_schemas.ComposeComposingV1.from_dict(
dict(repo=self.id,
updates=[' '.join([b.nvr for b in u.builds]) for u in self.compose.updates],
agent=self.agent,
ctype=self.ctype.value)),
force=True,
)

Expand Down Expand Up @@ -470,7 +480,9 @@ def eject_from_compose(self, update, reason):
update.remove_tag(update.release.pending_testing_tag,
koji=buildsys.get_session())
update.request = None
notifications.publish(
blockingCallFromThread(
reactor,
notifications.publish,
update_schemas.UpdateEjectV1.from_dict(
dict(
repo=self.id,
Expand Down Expand Up @@ -516,8 +528,11 @@ def finish(self, success):
success (bool): True if the compose had been successful, False otherwise.
"""
log.info('Thread(%s) finished. Success: %r' % (self.id, success))
notifications.publish(compose_schemas.ComposeCompleteV1.from_dict(dict(
dict(success=success, repo=self.id, agent=self.agent, ctype=self.ctype.value))),
blockingCallFromThread(
reactor,
notifications.publish,
compose_schemas.ComposeCompleteV1.from_dict(dict(
dict(success=success, repo=self.id, agent=self.agent, ctype=self.ctype.value))),
force=True,
)

Expand Down Expand Up @@ -685,7 +700,7 @@ def send_notifications(self):
UpdateRequest.testing: update_schemas.UpdateCompleteTestingV1
}
message = messages[update.request].from_dict(dict(update=update, agent=agent))
notifications.publish(message, force=True)
blockingCallFromThread(reactor, notifications.publish, message, force=True)

@checkpoint
def modify_bugs(self):
Expand Down Expand Up @@ -1221,8 +1236,11 @@ def _wait_for_repo_signature(self):
"""Wait for a repo signature to appear."""
# This message indicates to consumers that the repos are fully created and ready to be
# signed or otherwise processed.
notifications.publish(compose_schemas.RepoDoneV1.from_dict(
dict(repo=self.id, agent=self.agent, path=self.path)),
blockingCallFromThread(
reactor,
notifications.publish,
compose_schemas.RepoDoneV1.from_dict(
dict(repo=self.id, agent=self.agent, path=self.path)),
force=True)
if config.get('wait_for_repo_sig'):
self.save_state(ComposeState.signing_repo)
Expand Down Expand Up @@ -1259,8 +1277,11 @@ def _wait_for_sync(self):
Exception: If no folder other than "source" was found in the compose_path.
"""
log.info('Waiting for updates to hit the master mirror')
notifications.publish(compose_schemas.ComposeSyncWaitV1.from_dict(
dict(repo=self.id, agent=self.agent)),
blockingCallFromThread(
reactor,
notifications.publish,
compose_schemas.ComposeSyncWaitV1.from_dict(
dict(repo=self.id, agent=self.agent)),
force=True)
compose_path = os.path.join(self.path, 'compose', 'Everything')
checkarch = None
Expand Down Expand Up @@ -1294,8 +1315,11 @@ def _wait_for_sync(self):
continue
if newsum == checksum:
log.info("master repomd.xml matches!")
notifications.publish(compose_schemas.ComposeSyncDoneV1.from_dict(
dict(repo=self.id, agent=self.agent)),
blockingCallFromThread(
reactor,
notifications.publish,
compose_schemas.ComposeSyncDoneV1.from_dict(
dict(repo=self.id, agent=self.agent)),
force=True)
return

Expand Down
37 changes: 37 additions & 0 deletions bodhi/tests/server/consumers/test_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
}


def fake_blockingCallFromThread(reactor, function, *args, **kwargs):
"""Just run the function, no messing with threads."""
return function(*args, **kwargs)


class TestCheckpoint(unittest.TestCase):
"""Test the checkpoint() decorator."""
def test_with_return(self):
Expand Down Expand Up @@ -121,6 +126,7 @@ def _make_msg(transactional_session_maker, extra_push_args=None):
return publish.mock_calls[0][1][0]


@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread", fake_blockingCallFromThread)
# We don't need real pungi config files, we just need them to exist. Let's also mock all calls to
# pungi.
@mock.patch.dict(
Expand Down Expand Up @@ -2148,6 +2154,7 @@ def test_expire_buildroot_overrides_exception(self, expire, exception_log, *args
exception_log.assert_called_once_with("Problem expiring override")


@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread", fake_blockingCallFromThread)
class ComposerThreadBaseTestCase(base.BaseTestCase):
"""Methods that are useful for testing ComposerThread subclasses."""

Expand Down Expand Up @@ -2571,6 +2578,8 @@ def test_primary_arches_undefined(self):
class TestComposerThread_perform_gating(ComposerThreadBaseTestCase):
"""Test the ComposerThread.perform_gating() method."""

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
def test_expires_compose_updates(self):
"""Ensure that the method expires the compose's updates attribute."""
msg = self._make_msg()
Expand Down Expand Up @@ -2661,6 +2670,8 @@ def test_BodhiException(self, mocked_log):
class TestComposerThread__determine_tag_actions(ComposerThreadBaseTestCase):
"""Test ComposerThread._determine_tag_actions()."""

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch('bodhi.server.models.buildsys.get_session')
def test_from_tag_not_found(self, get_session):
"""Updates should be ejected if the from tag cannot be determined."""
Expand Down Expand Up @@ -2697,6 +2708,8 @@ def test_from_tag_not_found(self, get_session):

class TestComposerThread_eject_from_compose(ComposerThreadBaseTestCase):
"""This test class contains tests for the ComposerThread.eject_from_compose() method."""
@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
def test_testing_request(self):
"""
Assert correct behavior when the update's request is set to testing.
Expand Down Expand Up @@ -2841,6 +2854,8 @@ def test_without_state(self):

class TestPungiComposerThread__wait_for_sync(ComposerThreadBaseTestCase):
"""This test class contains tests for the PungiComposerThread._wait_for_sync() method."""
@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -2883,6 +2898,8 @@ def test_checksum_match_immediately(self, urlopen, save):
self.assertTrue(urlopen.mock_calls[0] in expected_calls)
save.assert_called_once_with(ComposeState.syncing_repo)

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -2913,6 +2930,8 @@ def test_no_checkarch(self, urlopen, save):
self.assertEqual(str(exc.exception), "Not found an arch to _wait_for_sync with")
save.assert_not_called()

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -2955,6 +2974,8 @@ def test_checksum_match_third_try(self, urlopen, sleep, save):
sleep.assert_has_calls([mock.call(200), mock.call(200)])
save.assert_called_with(ComposeState.syncing_repo)

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -3001,6 +3022,8 @@ def test_httperror(self, mocked_log, urlopen, sleep, save):
sleep.assert_called_once_with(200)
save.assert_called_once_with(ComposeState.syncing_repo)

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -3048,6 +3071,8 @@ def test_connectionreseterror(self, mocked_log, urlopen, sleep, save):
sleep.assert_called_once_with(200)
save.assert_called_once_with(ComposeState.syncing_repo)

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -3094,6 +3119,8 @@ def test_incompleteread(self, mocked_log, urlopen, sleep, save):
sleep.assert_called_once_with(200)
save.assert_called_once_with(ComposeState.syncing_repo)

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd': None})
Expand Down Expand Up @@ -3127,6 +3154,8 @@ def test_missing_config_key(self, save):
'fedora_testing_master_repomd in the config file')
save.assert_called_once_with(ComposeState.syncing_repo)

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch('bodhi.server.consumers.composer.PungiComposerThread.save_state')
@mock.patch('bodhi.server.consumers.composer.time.sleep',
mock.MagicMock(side_effect=Exception('This should not happen during this test.')))
Expand Down Expand Up @@ -3154,6 +3183,8 @@ def test_missing_repomd(self, mocked_log, save):
'Cannot find local repomd: %s', os.path.join(repodata, 'repomd.xml'))
save.assert_not_called()

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch.dict(
'bodhi.server.consumers.composer.config',
{'fedora_testing_master_repomd':
Expand Down Expand Up @@ -3251,6 +3282,8 @@ def test_testing_update(self):
class TestComposerThread_send_notifications(ComposerThreadBaseTestCase):
"""Test ComposerThread.send_notifications."""

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
def test_getlogin_raising_oserror(self):
"""Assert that "composer" is used as the agent if getlogin() raises OSError."""
t = ComposerThread(self.semmock, self._make_msg().body['composes'][0],
Expand Down Expand Up @@ -3456,6 +3489,8 @@ def test_stage_dir_dne(self, mocked_log):
class TestPungiComposerThread__wait_for_repo_signature(ComposerThreadBaseTestCase):
"""Test PungiComposerThread._wait_for_repo_signature()."""

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch('bodhi.server.consumers.composer.log')
def test_dont_wait_for_signatures(self, mocked_log):
"""Test that if wait_for_repo_sig is disabled, nothing happens."""
Expand All @@ -3474,6 +3509,8 @@ def test_dont_wait_for_signatures(self, mocked_log):
mocked_log.info.mock_calls,
[mock.call('Not waiting for a repo signature')])

@mock.patch("bodhi.server.consumers.composer.blockingCallFromThread",
fake_blockingCallFromThread)
@mock.patch('os.path.exists', side_effect=[
# First time, none of the signatures exist
False, False, False,
Expand Down
1 change: 1 addition & 0 deletions docs/user/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Dependency changes
* ``bodhi-server`` now depends on ``bodhi-messages``.
* kitchen is no longer required.
* hawkey is no longer required.
* Twisted is now required (:issue:`3145`).


Server upgrade instructions
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ colander
cornice>=3.1.0
dogpile.cache
pyasn1-modules # Due to an unfortunate dash in its name, installs break if pyasn1 is installed first
fedora_messaging
fedora_messaging>=1.6.0
Twisted
feedgen
jinja2
markdown
Expand Down

0 comments on commit 5a456d8

Please sign in to comment.