Skip to content

Commit

Permalink
SentinelConnectionPool plays better with threaded applications.
Browse files Browse the repository at this point in the history
Prevent the pool from closing sockets on connections that are actively in use
by other threads when the master address changes. Connections returned to the
pool that are still connected to the old master will be disconnected
gracefully.

Fixes #1345
  • Loading branch information
andymccurdy committed Jun 1, 2020
1 parent 7973bd9 commit e06af2a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* Restore try/except clauses to __del__ methods. These will be removed
in 4.0 when more explicit resource management if enforced. #1339
* Update the master_address when Sentinels promote a new master. #847
* Update SentinelConnectionPool to not forcefully disconnect other in-use
connections which can negatively affect threaded applications. #1345
* 3.5.2 (May 14, 2020)
* Tune the locking in ConnectionPool.get_connection so that the lock is
not held while waiting for the socket to establish and validate the
Expand Down
49 changes: 40 additions & 9 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,18 +1230,43 @@ def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
with self._lock:
if connection.pid != self.pid:
try:
self._in_use_connections.remove(connection)
except KeyError:
# Gracefully fail when a connection is returned to this pool
# that the pool doesn't actually own
pass

if self.owns_connection(connection):
self._available_connections.append(connection)
else:
# pool doesn't own this connection. do not add it back
# to the pool and decrement the count so that another
# connection can take its place if needed
self._created_connections -= 1
connection.disconnect()
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)

def disconnect(self):
"Disconnects all connections in the pool"
def owns_connection(self, connection):
return connection.pid == self.pid

def disconnect(self, inuse_connections=True):
"""
Disconnects connections in the pool
If ``inuse_connections`` is True, disconnect connections that are
current in use, potentially by other threads. Otherwise only disconnect
connections that are idle in the pool.
"""
self._checkpid()
with self._lock:
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
if inuse_connections:
connections = chain(self._available_connections,
self._in_use_connections)
else:
connections = self._available_connections

for connection in connections:
connection.disconnect()


Expand Down Expand Up @@ -1375,7 +1400,13 @@ def release(self, connection):
"Releases the connection back to the pool."
# Make sure we haven't changed process.
self._checkpid()
if connection.pid != self.pid:
if not self.owns_connection(connection):
# pool doesn't own this connection. do not add it back
# to the pool. instead add a None value which is a placeholder
# that will cause the pool to recreate the connection if
# its needed.
connection.disconnect()
self.pool.put_nowait(None)
return

# Put the connection back into the pool.
Expand Down
16 changes: 11 additions & 5 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,22 @@ def reset(self):
self.master_address = None
self.slave_rr_counter = None

def owns_connection(self, connection):
check = not self.is_master or \
(self.is_master and
self.master_address == (connection.host, connection.port))
parent = super(SentinelConnectionPool, self)
return check and parent.owns_connection(connection)

def get_master_address(self):
master_address = self.sentinel_manager.discover_master(
self.service_name)
if self.is_master:
if self.master_address is None:
self.master_address = master_address
elif master_address != self.master_address:
# Master address changed, disconnect all clients in this pool
if self.master_address != master_address:
self.master_address = master_address
self.disconnect()
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
self.disconnect(inuse_connections=False)
return master_address

def rotate_slaves(self):
Expand Down

0 comments on commit e06af2a

Please sign in to comment.