Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Race condition in bidi.BackgroundConsumer? #4

Open
plamut opened this issue Apr 29, 2019 · 8 comments
Open

Core: Race condition in bidi.BackgroundConsumer? #4

plamut opened this issue Apr 29, 2019 · 8 comments
Labels
external This issue is blocked on a bug with the actual product. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@plamut
Copy link
Contributor

plamut commented Apr 29, 2019

Environment details

Any OS, any Python version, google-api-core==1.9.0.

Steps to reproduce

No actual steps, spotted this in bidi.py while working on a different issue. This is how the "Am I paused?" snippet in BackgroundConsumer._thread_main() looks like:

with self._wake:
    if self._paused:
        _LOGGER.debug("paused, waiting for waking.")
        self._wake.wait()
        _LOGGER.debug("woken.")

_LOGGER.debug("waiting for recv.")
response = self._bidi_rpc.recv()

The _paused state can be set / cleared by the pause() and resume() methods, respectively.

If paused, the code snippet from above blocks at the _wake.wait() call, and is unblocked some time after _wake.notifyAll() is invoked in the resume() method. When resume() method notifies the waiting threads and releases the internal lock held by the self._wake condition, _wake.wait() tries to re-obtain that lock.

Now, suppose that some other thread invokes the pause() method in the meantime, and the latter obtains the self._wake's lock before _wake.wait() can grab it. The _paused flag will again be set to True, and when _wake.wait() finally acquires the lock and resumes, the code will invoke _bidi_rpc.recv() in the paused state.

For this reason the self._paused condition must be checked in a loop, and not in a single if statement, as the docs on threading.Condition state:

The while loop checking for the application’s condition is necessary because wait() can return after an arbitrary long time, and the condition which prompted the notify() call may no longer hold true. This is inherent to multi-threaded programming.

How much of a problem this is, i.e. invoking recv() in a paused state?


Edit:
The same can happen when exiting the with self._wake block. The lock gets released, pause() can acquire it and set _self._paused to True, but _bidi_rpc.recv() will nevertheless be called, because it is placed outside the with block.


Code example

I created a demo script that demonstrates why if <condition> is not sufficient, and subject to race conditions (requires Python 3.6+). The "worker" thread waits until a shared global number is set to 10, while five "changer threads" randomly change that number, and notify other threads if they change it to 10.

Sometimes the "worker" is invoked too late, and another "changer" thread changes 10 to something else, causing the "worker" to continue running when the target condition is no longer true. Replacing if not <condition> with while not <condition> gets rid of the bug.

...
INFO     [2019-04-29 23:09:20,327] Thread-Changer-0: Changing number from  9 to 10
INFO     [2019-04-29 23:09:20,327] Thread-Changer-0: New number == 10, notifying others
INFO     [2019-04-29 23:09:20,327] Thread-Worker   : I see the number 10
INFO     [2019-04-29 23:09:20,327] Thread-Changer-1: Changing number from 10 to  4
INFO     [2019-04-29 23:09:20,327] Thread-Changer-4: Changing number from  4 to 10
INFO     [2019-04-29 23:09:20,327] Thread-Changer-4: New number == 10, notifying others
INFO     [2019-04-29 23:09:20,328] Thread-Changer-2: Changing number from 10 to  3
INFO     [2019-04-29 23:09:21,328] Thread-Changer-3: Changing number from  3 to  9
INFO     [2019-04-29 23:09:21,328] Thread-Changer-1: Changing number from  9 to  6
INFO     [2019-04-29 23:09:21,328] Thread-Changer-4: Changing number from  6 to  4
INFO     [2019-04-29 23:09:21,328] Thread-Worker   : the number is not 10, will wait for condition
INFO     [2019-04-29 23:09:21,328] Thread-Changer-0: Changing number from  4 to  6
INFO     [2019-04-29 23:09:21,329] Thread-Changer-2: Changing number from  6 to  9
INFO     [2019-04-29 23:09:22,329] Thread-Changer-3: Changing number from  9 to 10
INFO     [2019-04-29 23:09:22,329] Thread-Changer-3: New number == 10, notifying others
INFO     [2019-04-29 23:09:22,329] Thread-Changer-1: Changing number from 10 to  5
INFO     [2019-04-29 23:09:22,329] Thread-Worker   : I see the number 5
...

The expected behavior is that the worker thread always prints out "I see the number 10" (the desired condition), and never "I see <non 10>". In other words, it should only proceed when the condition number == 10 is currently fulfilled.

In the bidi.py case, this would translate to BackgroundConsumer._thread_main() only resuming when self._paused == False, and never resuming when self._paused == True.

@tseaver
Copy link
Contributor

tseaver commented Jul 23, 2019

@plamut I'm in agreement with your analysis here: the stdlib's docs hightlght the problem clearly.

@tseaver
Copy link
Contributor

tseaver commented Jul 23, 2019

The "obvious" fix:

--- a/api_core/google/api_core/bidi.py
+++ b/api_core/google/api_core/bidi.py
@@ -642,14 +642,15 @@ class BackgroundConsumer(object):
                 # In the future, we could use `Condition.wait_for` if we drop
                 # Python 2.7.
                 with self._wake:
-                    if self._paused:
+                    while self._paused:
                         _LOGGER.debug("paused, waiting for waking.")
                         self._wake.wait()
                         _LOGGER.debug("woken.")
 
-                _LOGGER.debug("waiting for recv.")
-                response = self._bidi_rpc.recv()
-                _LOGGER.debug("recved response.")
+                    _LOGGER.debug("waiting for recv.")
+                    response = self._bidi_rpc.recv()
+                    _LOGGER.debug("recved response.")
+
                 self._on_response(response)
 
         except exceptions.GoogleAPICallError as exc:

passes all unit tests in api_core, but causes system tests to hang for firestore:

$ .nox/system-3-7/bin/py.test --verbose tests/system.py 
============================= test session starts ==============================
platform linux -- Python 3.7.1, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 -- /.../firestore/.nox/system-3-7/bin/python3.7
cachedir: .pytest_cache
rootdir: /.../firestore
collected 25 items                                                             

tests/system.py::test_collections PASSED                                 [  4%]
tests/system.py::test_create_document PASSED                             [  8%]
tests/system.py::test_create_document_w_subcollection PASSED             [ 12%]
tests/system.py::test_cannot_use_foreign_key PASSED                      [ 16%]
tests/system.py::test_no_document PASSED                                 [ 20%]
tests/system.py::test_document_set PASSED                                [ 24%]
tests/system.py::test_document_integer_field PASSED                      [ 28%]
tests/system.py::test_document_set_merge PASSED                          [ 32%]
tests/system.py::test_document_set_w_int_field PASSED                    [ 36%]
tests/system.py::test_document_update_w_int_field PASSED                 [ 40%]
tests/system.py::test_update_document PASSED                             [ 44%]
tests/system.py::test_document_get PASSED                                [ 48%]
tests/system.py::test_document_delete PASSED                             [ 52%]
tests/system.py::test_collection_add PASSED                              [ 56%]
tests/system.py::test_query_stream PASSED                                [ 60%]
tests/system.py::test_query_unary PASSED                                 [ 64%]
tests/system.py::test_collection_group_queries PASSED                    [ 68%]
tests/system.py::test_collection_group_queries_startat_endat PASSED      [ 72%]
tests/system.py::test_collection_group_queries_filters PASSED            [ 76%]
tests/system.py::test_get_all PASSED                                     [ 80%]
tests/system.py::test_batch PASSED                                       [ 84%]
tests/system.py::test_watch_document PASSED                              [ 88%]^C
...
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! KeyboardInterrupt !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
/opt/Python-3.7.1/lib/python3.7/threading.py:241: KeyboardInterrupt

and pubsub:

$ .nox/system-3-7/bin/py.test --verbose tests/system.py tests/system/
============================= test session starts ==============================
platform linux -- Python 3.7.1, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 -- /.../pubsub/.nox/system-3-7/bin/python3.7
cachedir: .pytest_cache
rootdir: /.../pubsub
collected 12 items                                                             

tests/system.py::test_publish_messages PASSED                            [  8%]
tests/system.py::test_subscribe_to_messages PASSED                       [ 16%]
tests/system.py::test_subscribe_to_messages_async_callbacks ^C
...
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! KeyboardInterrupt !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
/opt/Python-3.7.1/lib/python3.7/threading.py:241: KeyboardInterrupt

@plamut
Copy link
Contributor Author

plamut commented Jul 23, 2019

Did a quick check (for pubsub at least) with the above patch applied, and the async callbacks test hangs when cancelling the future. Might be related to googleapis/google-cloud-python#8616.

Update:
Or maybe not. It seems that the problem here is waiting for recv() while holding the self._wake lock. If the future gets cancelled then, the consumer tries to resume the thread, but resume() method first needs to acquire the very same self._wake lock and gets blocked.

@tseaver tseaver self-assigned this Aug 1, 2019
tseaver referenced this issue in googleapis/google-cloud-python Aug 1, 2019
@tseaver
Copy link
Contributor

tseaver commented Aug 1, 2019

@plamut believes that googleapis/google-cloud-python#8883 did not completely resolve the race, so I updated it to avoid closing this issue.

@tseaver
Copy link
Contributor

tseaver commented Oct 10, 2019

Both BidiRpc.recv and ResumableBidiRpc.recv are implemented by calling next() on the underling gRPC Call object, which provides no timeout mechanism.

PR googleapis/google-cloud-python#9337 deals with the race / hang by timing out the call to join the daemonized worker thread, but it doesn't really fix the issue here.

I've filed grpc/grpc#20562 to ask that the grpc._channel._Rendezvous class expose an alternative method which takes a timeout: we could then close this issue by calling that method rather than using next(self.call).

@tseaver tseaver removed their assignment Oct 10, 2019
@busunkim96 busunkim96 transferred this issue from googleapis/google-cloud-python Feb 7, 2020
@busunkim96 busunkim96 added external This issue is blocked on a bug with the actual product. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Feb 7, 2020
@vchudnov-g
Copy link
Contributor

UPDATE: PR googleapis/google-cloud-python#9337 was merged, but it's not immediately clear whether grpc/grpc#20562 was addressed. Pinging this issue so we consider it in future planning.

@vchudnov-g
Copy link
Contributor

Pinged grpc/grpc#20562 to see whether that was implemented, which would unblock us from implementing the rest of the solution here.

@vchudnov-g
Copy link
Contributor

It looks like grpc/grpc#20562 won't happen this quarter at least. We're waiting on that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
external This issue is blocked on a bug with the actual product. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

4 participants