Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in handling health checks for pub-sub #1720

Closed
bmerry opened this issue Nov 17, 2021 · 2 comments · Fixed by #1737
Closed

Race condition in handling health checks for pub-sub #1720

bmerry opened this issue Nov 17, 2021 · 2 comments · Fixed by #1737

Comments

@bmerry
Copy link
Contributor

bmerry commented Nov 17, 2021

Version: What redis-py and what redis version is the issue happening on?
redis-py 4.0.0, redis 6.2.6 (have also reproduced a failure in redis-py 3.5.3, although with a different failure mode)

Platform: What platform / version? (For example Python 3.5.1 on Windows 7 / Ubuntu 15.10 / Azure)
Python 3.8, Ubuntu 20.04

Description:
PubSub.execute_command checks self.subscribed before deciding whether to issue a health check, but if we've issued an unsubscribe to the last channel but not yet received the unsubscribe confirmation to the server, self.subscribed will still be true. The response to that (a string "PONG") can then be misinterpreted as a pub-sub response. This shows up as get_message returning {'type': 80, 'pattern': None, 'channel': 79, 'data': 78}, where 80, 79, 78 are the ASCII codes of P, O, N (they're integers because indexing a bytes gives integers).

With redis-py 3.5.3 I think I hit the opposite problem, where a 'PING redis-py-health-check' is issued thinking that the connection has subscriptions, and expecting a multi-bulk response ["ping", "redis-py-health-check"], but getting just "redis-py-health-check" back on the wire because there are no subscriptions. I think I once saw the same thing happen with redis-py 4.0.0, but I've been unable to reproduce it since and I have been jumping between the versions to compare, so I may be mistaken.

Here is some minimal code that illustrates the issue. It doesn't reliably reproduce it - I have to run it in a loop and can take 5-20 times to reproduce. When it does, it prints the following. I haven't dug into the backtrace and it might be just something in my own code - it's the message with numbers in the fields that is the smoking gun.

I think both cases can be dealt with by ignoring responses that are a single string instead of a multi-bulk when trying to parse a pubsub message, although it is a bit hacky.

#!/usr/bin/env python3

import threading
import time

import redis


def poll(ps):
    while True:
        message = ps.get_message(timeout=5)
        if message is not None:
            print(message)
        else:
            break


def main():
    r = redis.Redis.from_url("redis://localhost", health_check_interval=1)
    ps = r.pubsub()
    ps.subscribe("foo")
    poller = threading.Thread(target=poll, args=(ps,))
    poller.start()
    ps.unsubscribe("foo")
    time.sleep(3)
    ps.subscribe("baz")
    poller.join()


main()

Output:

{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 'subscribe', 'pattern': None, 'channel': b'baz', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 'subscribe', 'pattern': None, 'channel': b'baz', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 'subscribe', 'pattern': None, 'channel': b'baz', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 'subscribe', 'pattern': None, 'channel': b'baz', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 'subscribe', 'pattern': None, 'channel': b'baz', 'data': 1}
{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 80, 'pattern': None, 'channel': 79, 'data': 78}
Traceback (most recent call last):
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 430, in read_from_socket
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 430, in read_from_socket
    bufflen = self._sock.recv_into(self._buffer)
socket.timeout: timed out

During handling of the above exception, another exception occurred:

    bufflen = self._sock.recv_into(self._buffer)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./crashit_redis.py", line 30, in <module>
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    main()
  File "./crashit_redis.py", line 26, in main
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    ps.subscribe("baz")
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1394, in subscribe
    self._target(*self._args, **self._kwargs)
  File "./crashit_redis.py", line 11, in poll
    ret_val = self.execute_command('SUBSCRIBE', *new_channels.keys())
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1279, in execute_command
    message = ps.get_message(timeout=5)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1431, in get_message
    self._execute(connection, connection.send_command, *args, **kwargs)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1300, in _execute
    response = self.parse_response(block=False, timeout=timeout)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1316, in parse_response
    and not self._execute(conn, conn.can_read, timeout=timeout)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1300, in _execute
    return conn.retry.call_with_retry(
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/retry.py", line 35, in call_with_retry
    return conn.retry.call_with_retry(
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/retry.py", line 32, in call_with_retry
    fail(error)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1302, in <lambda>
    return do()
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1301, in <lambda>
    lambda error: self._disconnect_raise_connect(conn, error))
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1289, in _disconnect_raise_connect
    lambda: command(*args, **kwargs),
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 752, in can_read
    raise error
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/retry.py", line 32, in call_with_retry
    return self._parser.can_read(timeout)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 419, in can_read
    return do()
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/client.py", line 1301, in <lambda>
    return self.read_from_socket(timeout=timeout,
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 459, in read_from_socket
    lambda: command(*args, **kwargs),
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 744, in send_command
    sock.settimeout(self._socket_timeout)
OSError: [Errno 9] Bad file descriptor
    self.send_packed_command(self.pack_command(*args),
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 720, in send_packed_command
    self.check_health()
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 712, in check_health
    self.retry.call_with_retry(self._send_ping, self._ping_failed)
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/retry.py", line 37, in call_with_retry
    raise error
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/retry.py", line 32, in call_with_retry
    return do()
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 702, in _send_ping
    if str_if_bytes(self.read_response()) != 'PONG':
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 757, in read_response
    response = self._parser.read_response()
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 473, in read_response
    self.read_from_socket()
  File "/home/bmerry/work/sdp/env3/lib/python3.8/site-packages/redis/connection.py", line 445, in read_from_socket
    raise TimeoutError("Timeout reading from socket")
redis.exceptions.TimeoutError: Timeout reading from socket
@barshaul
Copy link
Contributor

barshaul commented Nov 21, 2021

Hey,
Thanks for finding it and bringing it up!
I was able to reproduce the bug and to find the RC.
When the PubSub's 'execute_command' method is being called it passes a 'health_check' bool to determine if it needs to run a health check. The 'health_check' value is set to not self.subscribed, which checks if the pubsub instance has any items in the channels/patterns lists. That means, that we perform a health check within the execute_command function only if we are not yet subscribed. All subsequent commands, after the first subscription, should be executed without performing a health check, since the channels/patterns list is no longer empty.
The pubsub's 'get_messages()' method can be used to poll published messages after a pubsub instance has been created. If a poller thread is created (thread that waits on get_message()) it will listen on the same socket as the pubsub execute_command is listening to when it performs a health check. Hence, we should not send a healthcheck using the pubsub execute_command function after the poller thread is initiated, since then it will be racing the poller thread to read the response from the socket.

In the posted example we see the following flow:

  1. PubSub instance is being created
  2. Channel 'foo' is being subscribed - health check is performed since self.channels is empty
  3. 'PONG' is received
  4. A poller thread is being started, looping on 'get_messages()'
  5. The poller thread polls the 'subscribe' response
  6. Channel 'foo' is being unsubscribed, health check is not being performed since self.channels still contains 'foo'. 'foo' is removed from self.channels
  7. The poller thread polls the 'unsubscribe' response
  8. Channel 'baz' is being subscribed - health check is performed since self.channels is empty again
  9. The poller thread tries to poll the 'subscribe' response and gets the 'PONG' response instead
  10. The health check waits for a PONG response that has already been received by the poller thread, and is therefore timed out

Therefore, we shouldn't use self.channels and self.patterns to determine whether a health check needs to be executed, but we should have another variable to indicate whether this is the first command execution, and if so, to run a health check.

However, a poller thread may be started before subscribing to a channel, e.g. :

  1. ps = r.pubsub()
  2. poller = threading.Thread(target=poll, args=(ps,))
  3. ps.subscribe('foo')

In this case, the health check will be performed and we will still get a race reading from the socket with the poller thread.
So, my suggestion is to add a new 'cmd_execution_health_check' variable initiated with 'True' to the pubsub class and to set it to False on:

  1. The end of execute_command method, so the health check will be performed only on the first execution), or
  2. get_message() function, so the health check will not be performed from the execute_command function at all.
    health checks are being done by the get_message() method, so no need to execute it also from the main command execution.
    This change fixes the reported bug.

@bmerry
Copy link
Contributor Author

bmerry commented Nov 21, 2021

In this case, the health check will be performed and we will still get a race reading from the socket with the poller thread. So, my suggestion is to add a new 'cmd_execution_health_check' variable initiated with 'True' to the pubsub class and to set it to False on:

1. The end of execute_command method, so the health check will be performed only on the first execution), or

2. get_message() function, so the health check will not be performed from the execute_command function at all.
   health checks are being done by the get_message() method, so no need to execute it also from the main command execution.

Isn't there still a race between the the main thread checking the flag when issuing the first command, and the poller thread clearing the flag in get_message?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants