-
Notifications
You must be signed in to change notification settings - Fork 147
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Rails 6 backport for ActiveRecord
This fixes the "can't create Thread: Resource temporarily unavailable" error.
- Loading branch information
Showing
1 changed file
with
98 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
raise "Remove no-longer-needed #{__FILE__}!" if Rails::VERSION::MAJOR >= 6 | ||
|
||
require "weakref" | ||
|
||
module ActiveRecord | ||
# Backport https://github.com/rails/rails/pull/36998 and https://github.com/rails/rails/pull/36999 | ||
# to avoid `ThreadError: can't create Thread: Resource temporarily unavailable` issues | ||
module ConnectionAdapters | ||
class ConnectionPool | ||
class Reaper | ||
@mutex = Mutex.new | ||
@pools = {} | ||
@threads = {} | ||
|
||
class << self | ||
def register_pool(pool, frequency) # :nodoc: | ||
@mutex.synchronize do | ||
unless @threads[frequency]&.alive? | ||
@threads[frequency] = spawn_thread(frequency) | ||
end | ||
@pools[frequency] ||= [] | ||
@pools[frequency] << WeakRef.new(pool) | ||
end | ||
end | ||
|
||
private | ||
def spawn_thread(frequency) | ||
Thread.new(frequency) do |t| | ||
running = true | ||
while running | ||
sleep t | ||
@mutex.synchronize do | ||
@pools[frequency].select!(&:weakref_alive?) | ||
@pools[frequency].each do |p| | ||
p.reap | ||
p.flush | ||
rescue WeakRef::RefError | ||
end | ||
|
||
if @pools[frequency].empty? | ||
@pools.delete(frequency) | ||
@threads.delete(frequency) | ||
running = false | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
|
||
def run | ||
return unless frequency && frequency > 0 | ||
self.class.register_pool(pool, frequency) | ||
end | ||
end | ||
|
||
def reap | ||
stale_connections = synchronize do | ||
return unless @connections | ||
@connections.select do |conn| | ||
conn.in_use? && !conn.owner.alive? | ||
end.each do |conn| | ||
conn.steal! | ||
end | ||
end | ||
|
||
stale_connections.each do |conn| | ||
if conn.active? | ||
conn.reset! | ||
checkin conn | ||
else | ||
remove conn | ||
end | ||
end | ||
end | ||
|
||
def flush(minimum_idle = @idle_timeout) | ||
return if minimum_idle.nil? | ||
|
||
idle_connections = synchronize do | ||
return unless @connections | ||
@connections.select do |conn| | ||
!conn.in_use? && conn.seconds_idle >= minimum_idle | ||
end.each do |conn| | ||
conn.lease | ||
|
||
@available.delete conn | ||
@connections.delete conn | ||
end | ||
end | ||
|
||
idle_connections.each do |conn| | ||
conn.disconnect! | ||
end | ||
end | ||
end | ||
end | ||
end |