Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another race condition in health checks and pubsub #1740

Closed
bmerry opened this issue Nov 23, 2021 · 3 comments · Fixed by #1737
Closed

Another race condition in health checks and pubsub #1740

bmerry opened this issue Nov 23, 2021 · 3 comments · Fixed by #1737

Comments

@bmerry
Copy link
Contributor

bmerry commented Nov 23, 2021

Version: redis-py 4.0.2, Redis 6.2.6. Also reproduces with #1733 or #1737 applied.

Platform: Python 3.8, Ubuntu 20.04

Description: This is somewhat similar to #1720, but different. If get_message is called between issuing an UNSUBSCRIBE command that removes that last subscription, and getting the unsubscribe message back, and parse_response decides it needs to do a health check, then the reply to the health check is mis-interpreted. This is because redis responds differently to PING depending on whether there are active subscriptions or not, and parse_response only recognises the one form.

aioredis has the same bug, and the fix (recognise both possible responses) would probably work for redis-py too.

Code to reproduce the issue (it's a race condition so might not work every time, but it's been reliable for me). Depends on a redis server on localhost:

#!/usr/bin/env python3

import threading
import time

import redis


def poll(ps, event):
    print(ps.get_message(timeout=5))
    event.wait()
    while True:
        message = ps.get_message(timeout=5)
        if message is not None:
            print(message)
        else:
            break


def main():
    r = redis.Redis.from_url("redis://localhost", health_check_interval=1)
    ps = r.pubsub()
    ps.subscribe("foo")

    event = threading.Event()
    poller = threading.Thread(target=poll, args=(ps, event))
    poller.start()

    time.sleep(2)
    event.set()
    ps.unsubscribe("foo")
    poller.join()


main()

It subscribes, then waits 2 seconds (to make the health check interval kick in), then races to call get_message and unsubscribe.

Output:

{'type': 'subscribe', 'pattern': None, 'channel': b'foo', 'data': 1}
{'type': 'unsubscribe', 'pattern': None, 'channel': b'foo', 'data': 0}
{'type': 114, 'pattern': None, 'channel': 101, 'data': 100}

The last line is the problem. It's caused by trying to treat a byte string (b"redis-py-health-check") as a list and selecting elements from it (114, 101, 100 are the ASCII codes for 'r', 'e', 'd').

@barshaul
Copy link
Contributor

Hey @bmerry,
I ran it over 30 minutes with the fix I posted in #1737 (there's a fresh new commit), and it didn't get reproduced.
Could you try to run it with the new changes and update if you still see this issue?

@bmerry
Copy link
Contributor Author

bmerry commented Nov 23, 2021

Thanks, I'll take a look when I can - but it will probably only be next week as I have some fires to put out first.

@barshaul
Copy link
Contributor

barshaul commented Nov 23, 2021

Using this issue I found #1737 have a different error:
If UNSUBSCRIBE response is received before the PING response received: in this case get_message will poll the unsubscribe response, then it will change the subscribed_event flag to false, and then running get_message again will result in None until a new subscription is made. However, in this phase, if we call subscribe again, we have the b"redis-py-health-check" response queued to the socket, so, when we'll run the health check within the execute_command (since now self.subscribed == False), an error will occur:
redis.exceptions.ConnectionError: Bad response from PING health check
because the response is b"redis-py-health-check" .
This issue can be fixed if we'll run the health check from the execute_command method only on the first command execution.

After I fixed it and added a subscription at the end of the script, I was able to reproduce the issue you mentioned.
I will work on publishing a fix for both.

The bug in 1737 can be reproduced with:

#!/usr/bin/env python3

import threading
import time

from redis import Redis


def poll(ps, event=None):
    print(ps.get_message(timeout=5))
    event.wait()
    while True:
        message = ps.get_message(timeout=5)
        if message is not None:
            print(message)
        else:
            break

def main():
    r = Redis.from_url("redis://localhost", health_check_interval=1)
    ps = r.pubsub()
    ps.subscribe("foo")

    event = threading.Event()
    poller = threading.Thread(target=poll, args=(ps, event))
    poller.start()

    time.sleep(2)
    event.set()
    ps.unsubscribe("foo")
    time.sleep(1)
    ps.subscribe("foo")
    poller.join()

while True:
    main()

barshaul added a commit to barshaul/redis-py that referenced this issue Dec 8, 2021
…m the execute_command method only in the first command execution.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants