Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RIP Celluloid, Hello concurrent-ruby/Thread pool #291

Merged
merged 47 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f1df7ab
WIP
Dec 19, 2016
1f3d3c4
WIP simple thread pool
Dec 19, 2016
b8f0830
Clean up celluloid references
Dec 19, 2016
ca8504f
Fix hard shutdown.
Dec 19, 2016
cb2a0e7
Clean TTIN
Dec 19, 2016
be67d4b
Implement AutoExtendVisibility using Concurrent::TimerTask
Dec 19, 2016
85c612d
Remove no longer needed methods
Dec 19, 2016
ad0a57e
Only delay hard shutdown if there's any busy worker
Dec 19, 2016
a33838c
Remove sleep used for debugging
Dec 19, 2016
ca15687
Clean up
Dec 19, 2016
da429bc
Fix specs
Dec 19, 2016
ff73931
Disable brakeman
Dec 19, 2016
e8f784f
Merge branch 'master' into thread-pool
Dec 19, 2016
dd9bfb4
Merge branch 'master' into thread-pool
Dec 19, 2016
606f0ca
Merge branch 'master' into thread-pool
Dec 19, 2016
ffd3517
Merge branch 'master' into thread-pool
Dec 19, 2016
79705a1
Remove safe navigation operator it isn't supported prior Ruby 2.3 :sad:
Dec 19, 2016
7315786
Delay is no longer needed for soft shutdown
Dec 19, 2016
dfee6d4
Wrap dispatch_later with a mutex to avoid concurrency comming from the
Dec 19, 2016
f20b676
Naming refactor
Dec 19, 2016
3bad1e0
watchdog is no longer needed
Dec 19, 2016
9b9d4b3
Remove Mutex in favor of `make_true`
Dec 19, 2016
4bd68ea
Turn dispatch/dispatch_later into a heartbeat
Dec 19, 2016
ef295c8
Fix specs
Dec 19, 2016
474838e
Use `Concurrent::TimerTask` as a heartbeat
Dec 19, 2016
8cd2817
Clean up
Dec 19, 2016
90a1363
Clean auto extend visibility
Dec 19, 2016
7803300
Better hard_shutdown_in handling with wait_for_termination
Dec 19, 2016
3962069
Naming refactor
Dec 21, 2016
eb2edff
Decrease the heartbeat interval to 0.25
Dec 23, 2016
8aa9124
Test PutsReq
Dec 23, 2016
766fe99
Bump delay to 1 sec
Dec 23, 2016
9563b08
Change delay to 0.10
Dec 23, 2016
91c3c2f
Add screenshots
Dec 23, 2016
f61c566
Dispatch when done
Dec 23, 2016
5ebfed5
non-stop dispatch
Dec 24, 2016
ac52dab
Dispatch when done
Dec 24, 2016
1fc4146
Test wait_time_seconds:10
Dec 24, 2016
1eb7092
execution_interval: 0.05
Dec 24, 2016
59bc99b
execution_interval: 0.15
Dec 24, 2016
540b45a
A manager every 10 concurrency
Dec 24, 2016
7331989
Test performance 1 processor 1 fetcher
Dec 24, 2016
93bb041
Stop logging out when 0 messages found.
Jan 12, 2017
f75a943
Clean up PutsReq tests
Feb 13, 2017
fed9d81
:lipstick:
Feb 13, 2017
c36d58f
Merge branch 'master' into thread-pool
Feb 13, 2017
7f18e02
Fix spec
Feb 13, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ end
desc 'Open Shoryuken pry console'
task :console do
require 'pry'
require 'celluloid/current'
require 'shoryuken'

config_file = File.join File.expand_path('..', __FILE__), 'shoryuken.yml'
Expand Down
5 changes: 5 additions & 0 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'yaml'
require 'json'
require 'aws-sdk-core'
require 'time'
require 'concurrent'

require 'shoryuken/version'
require 'shoryuken/core_ext'
Expand All @@ -22,6 +24,9 @@
require 'shoryuken/sns_arn'
require 'shoryuken/topic'
require 'shoryuken/polling'
require 'shoryuken/manager'
require 'shoryuken/launcher'


module Shoryuken
DEFAULTS = {
Expand Down
39 changes: 13 additions & 26 deletions lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ class CLI
include Util
include Singleton

attr_accessor :launcher

def run(args)
self_read, self_write = IO.pipe

Expand All @@ -41,9 +39,8 @@ def run(args)

loader.load

load_celluloid
configure_concurrent_logger

require 'shoryuken/launcher'
@launcher = Shoryuken::Launcher.new

if callback = Shoryuken.start_callback
Expand All @@ -54,43 +51,33 @@ def run(args)
fire_event(:startup)

begin
launcher.run
@launcher.run

while (readable_io = IO.select([self_read]))
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
launcher.stop(shutdown: true)
@launcher.stop(shutdown: true)
exit 0
end
end

private

def load_celluloid
require 'celluloid/current'
Celluloid.logger = (Shoryuken.options[:verbose] ? Shoryuken.logger : nil)

require 'shoryuken/manager'
end
def configure_concurrent_logger
return unless Shoryuken.logger

def celluloid_loaded?
defined?(::Celluloid)
Concurrent.global_logger = lambda do |level, progname, msg = nil, &block|
Shoryuken.logger.log(level, msg, progname, &block)
end
end

def daemonize(options)
return unless options[:daemon]

fail ArgumentError, "You really should set a logfile if you're going to daemonize" unless options[:logfile]

if celluloid_loaded?
# Celluloid can't be loaded until after we've daemonized
# because it spins up threads and creates locks which get
# into a very bad state if forked.
raise "Celluloid cannot be required until here, or it will break Shoryuken's daemonization"
end

files_to_reopen = []
ObjectSpace.each_object(File) do |file|
files_to_reopen << file unless file.closed?
Expand Down Expand Up @@ -187,7 +174,7 @@ def handle_signal(sig)
when 'USR1'
logger.info { 'Received USR1, will soft shutdown down' }

launcher.stop
@launcher.stop
fire_event(:quiet, true)
exit 0
when 'TTIN'
Expand All @@ -200,11 +187,11 @@ def handle_signal(sig)
end
end

ready = launcher.manager.instance_variable_get(:@ready).size
busy = launcher.manager.instance_variable_get(:@busy).size
queues = launcher.manager.instance_variable_get(:@queues)
# ready = launcher.manager.instance_variable_get(:@ready).size
# busy = launcher.manager.instance_variable_get(:@busy).size
# queues = launcher.manager.instance_variable_get(:@queues)

logger.info { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{unparse_queues(queues)}" }
# logger.info { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{unparse_queues(queues)}" }
else
logger.info { "Received #{sig}, will shutdown down" }

Expand Down
29 changes: 5 additions & 24 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
@@ -1,42 +1,23 @@
module Shoryuken
class Launcher
include Celluloid
include Util

trap_exit :actor_died

attr_accessor :manager

def initialize
@condvar = Celluloid::Condition.new
@manager = Shoryuken::Manager.new_link(@condvar)

@done = false

manager.fetcher = Shoryuken::Fetcher.new
manager.polling_strategy = Shoryuken.options[:polling_strategy].new(Shoryuken.queues)
@manager = Shoryuken::Manager.new(Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
end

def stop(options = {})
watchdog('Launcher#stop') do
@done = true

manager.async.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout])
@condvar.wait
manager.terminate
@manager.stop(shutdown: !!options[:shutdown],
timeout: Shoryuken.options[:timeout])
end
end

def run
watchdog('Launcher#run') do
manager.async.start
@manager.start
end
end

def actor_died(actor, reason)
return if @done
logger.warn { "Shoryuken died due to the following error, cannot recover, process exiting: #{reason}" }
exit 1
end
end
end
121 changes: 36 additions & 85 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,24 @@

module Shoryuken
class Manager
include Celluloid
include Util

attr_accessor :fetcher
attr_accessor :polling_strategy

exclusive :dispatch

trap_exit :processor_died

BATCH_LIMIT = 10

def initialize(condvar)
def initialize(fetcher, polling_strategy)
@count = Shoryuken.options[:concurrency] || 25
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0

@queues = Shoryuken.queues.dup.uniq
@finished = condvar

@done = false
@done = Concurrent::AtomicBoolean.new(false)

@fetcher = fetcher
@polling_strategy = polling_strategy

@ready = Concurrent::AtomicFixnum.new(@count)

@busy_processors = []
@busy_threads = {}
@ready_processors = @count.times.map { build_processor }
@pool = Concurrent::FixedThreadPool.new(@count)
end

def start
Expand All @@ -36,7 +31,7 @@ def start

def stop(options = {})
watchdog('Manager#stop died') do
@done = true
@done.make_true

if (callback = Shoryuken.stop_callback)
logger.info { 'Calling Shoryuken.on_stop block' }
Expand All @@ -45,14 +40,7 @@ def stop(options = {})

fire_event(:shutdown, true)

logger.info { "Shutting down #{@ready_processors.size} quiet workers" }

@ready_processors.each do |processor|
processor.terminate if processor.alive?
end
@ready_processors.clear

return after(0) { @finished.signal } if @busy_processors.empty?
logger.info { "Shutting down workers" }

if options[:shutdown]
hard_shutdown_in(options[:timeout])
Expand All @@ -62,64 +50,40 @@ def stop(options = {})
end
end

def processor_done(queue, processor)
def processor_done(queue)
watchdog('Manager#processor_done died') do
logger.debug { "Process done for '#{queue}'" }

@busy_processors.delete(processor)
@busy_threads.delete(processor.object_id)
@ready.increment

if stopped?
processor.terminate if processor.alive?
return after(0) { @finished.signal } if @busy_processors.empty?
else
@ready_processors << processor
async.dispatch
end
dispatch_later unless @done.true?
end
end

def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
logger.error { "Process died, reason: #{reason}" }

@busy_processors.delete(processor)
@busy_threads.delete(processor.object_id)

if stopped?
return after(0) { @finished.signal } if @busy_processors.empty?
else
@ready_processors << build_processor
async.dispatch
end
end
end

def stopped?
@done
def busy
@ready.value - @count
end

def dispatch
return if stopped?
return if @done.true?

logger.debug { "Ready: #{@ready_processors.size}, Busy: #{@busy_processors.size}, Active Queues: #{polling_strategy.active_queues}" }
logger.debug { "Ready: #{@ready.value}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

if @ready_processors.empty?
if @ready.value == 0
logger.debug { 'Pausing fetcher, because all processors are busy' }
dispatch_later
return
end

queue = polling_strategy.next_queue
if queue.nil?
unless queue = @polling_strategy.next_queue
logger.debug { 'Pausing fetcher, because all queues are paused' }
dispatch_later
return
end

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)

async.dispatch
dispatch_later
end

private
Expand All @@ -135,23 +99,21 @@ def assign(queue, sqs_msg)
watchdog('Manager#assign died') do
logger.debug { "Assigning #{sqs_msg.message_id}" }

processor = @ready_processors.pop
@busy_threads[processor.object_id] = processor.running_thread
@busy_processors << processor
@ready.decrement

processor.async.process(queue, sqs_msg)
@pool.post { Processor.new(self).process(queue, sqs_msg) }
end
end

def dispatch_batch(queue)
batch = fetcher.fetch(queue, BATCH_LIMIT)
polling_strategy.messages_found(queue.name, batch.size)
batch = @fetcher.fetch(queue, BATCH_LIMIT)
@polling_strategy.messages_found(queue.name, batch.size)
assign(queue.name, patch_batch!(batch))
end

def dispatch_single_messages(queue)
messages = fetcher.fetch(queue, @ready_processors.size)
polling_strategy.messages_found(queue.name, messages.size)
messages = @fetcher.fetch(queue, @ready.value)
@polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| assign(queue.name, message) }
end

Expand All @@ -164,38 +126,27 @@ def delay
end

def build_processor
processor = Processor.new_link(current_actor)
processor.proxy_id = processor.object_id
processor
Processor.new(self)
end

def soft_shutdown(delay)
logger.info { "Waiting for #{@busy_processors.size} busy workers" }
logger.info { "Waiting for #{busy} busy workers" }

if @busy_processors.size > 0
after(delay) { soft_shutdown(delay) }
else
@finished.signal
end
@pool.shutdown
@pool.wait_for_termination
end

def hard_shutdown_in(delay)
logger.info { "Waiting for #{@busy_processors.size} busy workers" }
logger.info { "Waiting for #{busy} busy workers" }
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }

after(delay) do
watchdog('Manager#hard_shutdown_in died') do
if @busy_processors.size > 0
logger.info { "Hard shutting down #{@busy_processors.size} busy workers" }

@busy_processors.each do |processor|
if processor.alive? && t = @busy_threads.delete(processor.object_id)
t.raise Shutdown
end
end
end
if busy > 0
logger.info { "Hard shutting down #{busy} busy workers" }

@finished.signal
@pool.kill
end
end
end
end
Expand Down
Loading