Skip to content

Commit

Permalink
Improve how connection pools operate in forked/child proceeses.
Browse files Browse the repository at this point in the history
Sometimes a process with an active connection to Redis forks and creates
child processes taht also want to talk to Redis. Prior to this change there
were a number of potential conflicts that could cause this to fail.

Retrieving a connection from the pool and releasing a connection back
to the pool check the current proceeses PID. If it's different than the
PID that created the pool, reset() is called to get a fresh set of connections
for the current process. However in doing so, pool.disconnect() was caused
which closes the file descriptors that the parent may still be using. Further
when the available_connections and in_use_connections lists are reset, all of
those connections inherited from the parent are GC'd and the connection's
`__del__` was called, which also closed the socket and file descriptor.

This change prevents pool.disconnect() from being called when a pid is changed.
It also removes the `__del__` destructor from connections. Neither of these
are necessary or practical. Child processes still reset() their copy of the
pool when first accessed causing their own connections to be created.

`ConnectionPool.disconnect()` now checks the current process ID
so that a child or parent can't disconnect the other's connections.

Additionally, `Connection.disconnect()` now checks the current process ID
and only calls `socket.shutdown()` if `disconnect()` is called by the same
process that created the connection. This allows for a child process that
inherited a connection to call `Connection.disconnect()` and not shutdown
the parent's copy of the socket.

Fixes #863
Fixes #784
Fixes #732
Fixes #1085
Fixes #504
  • Loading branch information
andymccurdy committed Feb 1, 2019
1 parent e24e977 commit 4e1e748
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 29 deletions.
12 changes: 4 additions & 8 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,6 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
def __repr__(self):
return self.description_format % self._description_args

def __del__(self):
try:
self.disconnect()
except Exception:
pass

def register_connect_callback(self, callback):
self._connect_callbacks.append(callback)

Expand Down Expand Up @@ -580,7 +574,8 @@ def disconnect(self):
if self._sock is None:
return
try:
self._sock.shutdown(socket.SHUT_RDWR)
if os.getpid() == self.pid:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error:
pass
Expand Down Expand Up @@ -973,7 +968,6 @@ def _checkpid(self):
# another thread already did the work while we waited
# on the lock.
return
self.disconnect()
self.reset()

def get_connection(self, command_name, *keys, **options):
Expand Down Expand Up @@ -1012,6 +1006,7 @@ def release(self, connection):

def disconnect(self):
"Disconnects all connections in the pool"
self._checkpid()
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
Expand Down Expand Up @@ -1133,5 +1128,6 @@ def release(self, connection):

def disconnect(self):
"Disconnects all connections in the pool."
self._checkpid()
for connection in self._connections:
connection.disconnect()
66 changes: 45 additions & 21 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ class TestMultiprocessing(object):
# Test connection sharing between forks.
# See issue #1085 for details.

def test_connection(self):
def test_close_connection_in_child(self):
"""
A connection owned by a parent and closed by a child doesn't
destroy the file descriptors so a parent can still use it.
"""
conn = Connection()
assert conn.send_command('ping') is None
conn.send_command('ping')
assert conn.read_response() == b'PONG'

def target(conn):
assert conn.send_command('ping') is None
conn.send_command('ping')
assert conn.read_response() == b'PONG'
conn.disconnect()

Expand All @@ -33,20 +37,29 @@ def target(conn):
proc.join(3)
assert proc.exitcode is 0

# Check that connection is still alive after fork process has exited.
with pytest.raises(ConnectionError):
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'
# The connection was created in the parent but disconnected in the
# child. The child called socket.close() but did not call
# socket.shutdown() because it wasn't the "owning" process.
# Therefore the connection still works in the parent.
conn.send_command('ping')
assert conn.read_response() == b'PONG'

def test_close_connection_in_main(self):
def test_close_connection_in_parent(self):
"""
A connection owned by a parent is unusable by a child if the parent
(the owning process) closes the connection.
"""
conn = Connection()
assert conn.send_command('ping') is None
conn.send_command('ping')
assert conn.read_response() == b'PONG'

def target(conn, ev):
ev.wait()
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'
# the parent closed the connection. because it also created the
# connection, the connection is shutdown and the child
# cannot use it.
with pytest.raises(ConnectionError):
conn.send_command('ping')

ev = multiprocessing.Event()
proc = multiprocessing.Process(target=target, args=(conn, ev))
Expand All @@ -56,21 +69,27 @@ def target(conn, ev):
ev.set()

proc.join(3)
assert proc.exitcode is 1
assert proc.exitcode is 0

@pytest.mark.parametrize('max_connections', [1, 2, None])
def test_pool(self, max_connections):
"""
A child will create its own connections when using a pool created
by a parent.
"""
pool = ConnectionPool.from_url('redis://localhost',
max_connections=max_connections)

conn = pool.get_connection('ping')
main_conn_pid = conn.pid
with exit_callback(pool.release, conn):
assert conn.send_command('ping') is None
conn.send_command('ping')
assert conn.read_response() == b'PONG'

def target(pool):
with exit_callback(pool.disconnect):
conn = pool.get_connection('ping')
assert conn.pid != main_conn_pid
with exit_callback(pool.release, conn):
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'
Expand All @@ -80,15 +99,19 @@ def target(pool):
proc.join(3)
assert proc.exitcode is 0

# Check that connection is still alive after fork process has exited.
# Check that connection is still alive after fork process has exited
# and disconnected the connections in its pool
conn = pool.get_connection('ping')
with exit_callback(pool.release, conn):
with pytest.raises(ConnectionError):
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'

@pytest.mark.parametrize('max_connections', [1, 2, None])
def test_close_pool_in_main(self, max_connections):
"""
A child process that uses the same pool as its parent isn't affected
when the parent disconnects all connections within the pool.
"""
pool = ConnectionPool.from_url('redis://localhost',
max_connections=max_connections)

Expand All @@ -115,12 +138,13 @@ def target(pool, disconnect_event):
proc.join(3)
assert proc.exitcode is 0

def test_redis(self, r):
def test_redis_client(self, r):
"A redis client created in a parent can also be used in a child"
assert r.ping() is True

def target(redis):
assert redis.ping() is True
del redis
def target(client):
assert client.ping() is True
del client

proc = multiprocessing.Process(target=target, args=(r,))
proc.start()
Expand Down

0 comments on commit 4e1e748

Please sign in to comment.